

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Entwickeln Sie benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz
<a name="shared-throughput-consumers"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Wenn Sie keinen spezifischen Durchsatz beim Empfangen von Daten von Kinesis Data Streams und keine Verbreitungswerte für Leseoperationen von unter 200 ms benötigen, können Sie Konsumentenanwendungen wie in den folgenden Themen beschrieben erstellen. Sie können die Kinesis Client Library (KCL) oder die verwenden. AWS SDK für Java

**Topics**
+ [Entwickeln Sie mithilfe von KCL benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz](custom-kcl-consumers.md)

Für Informationen zum Erstellen von Verbrauchern, die Datensätze aus Kinesis-Datenströmen mit dediziertem Durchsatz empfangen können, siehe [Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz](enhanced-consumers.md).

# Entwickeln Sie mithilfe von KCL benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz
<a name="custom-kcl-consumers"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Eine der Methoden zur Entwicklung einer benutzerdefinierten Verbraucheranwendung mit gemeinsamem Durchsatz ist die Verwendung der Kinesis Client Library (KCL). 

Wählen Sie eines der folgenden Themen für die KCL-Version, die Sie verwenden.

**Topics**
+ [Entwickeln Sie KCL 1.x-Verbraucher](developing-consumers-with-kcl.md)
+ [Entwickeln Sie KCL 2.x-Verbraucher](developing-consumers-with-kcl-v2.md)

# Entwickeln Sie KCL 1.x-Verbraucher
<a name="developing-consumers-with-kcl"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 end-of-support verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können eine Verbraucheranwendung für Amazon Kinesis Data Streams entwickeln, indem Sie die Kinesis Client Library (KCL) verwenden. 

Weitere Informationen zu KCL finden Sie unter [Über KCL (frühere Versionen)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Wählen Sie je nach der Option, die Sie verwenden möchten, aus den folgenden Themen.

**Topics**
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in Java](kinesis-record-processor-implementation-app-java.md)
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in Python](kinesis-record-processor-implementation-app-py.md)
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Entwickeln Sie einen Kinesis Client Library-Consumer in Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Java behandelt. [Die Javadoc-Referenz finden Sie im Javadoc-Thema für Class.AWS AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)

Gehen Sie zur [Kinesis Client Library (Java) GitHub, um die Java-KCL](https://github.com/awslabs/amazon-kinesis-client) von herunterzuladen. Um die Java KCL auf Apache Maven zu finden, navigieren Sie zur Seite für die [KCL-Suchergebnisse](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Um Beispielcode für eine Java-KCL-Verbraucheranwendung herunterzuladen GitHub, gehen Sie auf die Projektseite [KCL for Java-Beispielprojekt unter](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis). GitHub 

Die Beispielanwendung verwendet [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). Sie können die Konfiguration der Protokollierung in der statischen Methode `configure` ändern, die in der Datei `AmazonKinesisApplicationSample.java` definiert ist. *Weitere Informationen zur Verwendung von Apache Commons Logging mit Log4j und AWS Java-Anwendungen finden Sie unter [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) im Developer Guide.AWS SDK für Java *

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in Java implementieren:

**Topics**
+ [Implementieren Sie die Methoden des Prozessors IRecord](#kinesis-record-processor-implementation-interface-java)
+ [Implementieren Sie eine Klassenfabrik für die IRecord Prozessorschnittstelle](#kinesis-record-processor-implementation-factory-java)
+ [Erstellen Sie einen Worker](#kcl-java-worker)
+ [Ändern Sie die Konfigurationseigenschaften](#kinesis-record-processor-initialization-java)
+ [Migrieren Sie zu Version 2 der Record Processor-Schnittstelle](#kcl-java-v2-migration)

## Implementieren Sie die Methoden des Prozessors IRecord
<a name="kinesis-record-processor-implementation-interface-java"></a>

Die KCL unterstützt zurzeit zwei Versionen der `IRecordProcessor`-Schnittstelle: die ursprüngliche Schnittstelle, die mit der ersten Version der KCL verfügbar war, und Version 2, die ab KCL Version 1.5.0 verfügbar ist. Beide Schnittstellen werden vollständig unterstützt. Ihre Wahl hängt von den speziellen Anforderungen Ihres Anwendungsfalls ab. Um mehr über Unterschiede zu erfahren, betrachten Sie die lokal entwickelten Javadocs oder den Quellcode. In den folgenden Abschnitten wird die Mindestimplementierung für die ersten Schritte beschrieben.

**Topics**
+ [Ursprüngliche Schnittstelle (Version 1)](#kcl-java-interface-original)
+ [Aktualisierte Schnittstelle (Version 2)](#kcl-java-interface-v2)

### Ursprüngliche Schnittstelle (Version 1)
<a name="kcl-java-interface-original"></a>

Die ursprüngliche `IRecordProcessor` Schnittstelle (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) stellt die folgenden Datensatzverarbeitermethoden bereit, die Ihr Konsument implementieren muss. Das Beispiel stellt Implementierungen bereit, die Sie als Ausgangspunkt verwenden können (siehe `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
Die KCL ruft die Methode `initialize` auf, wenn der Datensatzverarbeiter instanziiert wird, und übergibt eine spezifische Shard-ID als Parameter. Dieser Datensatzverarbeiter verarbeitet nur diese Shard und in der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Kinesis Data Streams besitzt eine Semantik nach dem Grundsatz *mindestens einmal*. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter [Verwenden Sie Resharding, Skalierung und Parallelverarbeitung, um die Anzahl der Shards zu ändern](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
Die KCL ruft die Methode `processRecords` auf und übergibt eine Liste der Datensätze aus der Shard, die von der Methode `initialize(shardId)` angegeben wird. Der Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik des Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern.

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Zusätzlich zu den Daten selbst enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel. Der Auftragnehmer kann diese Werte beim Verarbeiten der Daten verwenden. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Die Klasse `Record` stellt die folgenden Methoden bereit, die Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bieten. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

In diesem Beispiel weist die private Methode `processRecordsWithRetries` Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Die KCL übernimmt diese Nachverfolgung für Sie, indem ein Checkpointer (`IRecordProcessorCheckpointer`) an `processRecords` übergeben wird. Der Datensatzverarbeiter ruft die Methode `checkpoint` auf dieser Schnittstelle auf, um die KCL über die Fortschritte zu informieren, die sie beim Verarbeiten der Datensätze in der Shard gemacht hat. Wenn der Auftragnehmer fehlschlägt, verwendet die KCL diese Informationen, um die Verarbeitung der Shard mit dem letzten bekannten Datensatz neu zu starten.

Im Fall einer Teilungs- oder Zusammenführungsoperation beginnt die KCL erst dann mit der Verarbeitung der neuen Shards, wenn die Verarbeiter für die ursprünglichen Shards `checkpoint` aufgerufen haben, um zu signalisieren, dass die Verarbeitung der ursprünglichen Shards vollständig abgeschlossen ist.

Wenn Sie keinen Parameter übergeben, nimmt die KCL an, dass der Aufruf von `checkpoint` bedeutet, dass alle Datensätze bis zum letzten Datensatz, der an den Datensatzverarbeiter übergeben wurde, verarbeitet wurden. Daher sollte der Datensatzverarbeiter die Methode `checkpoint` erst aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen `checkpoint` nicht bei jedem Aufruf von `processRecords` aufrufen. Ein Prozessor könnte beispielsweise `checkpoint` bei jedem dritten Aufruf von `processRecords` aufrufen. Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für `checkpoint` angeben. In diesem Fall nimmt die KCL an, dass alle Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Im Beispiel zeigt die private Methode `checkpoint`, wie `IRecordProcessorCheckpointer.checkpoint` mithilfe der entsprechenden Ausnahmebehandlung und Wiederholungslogik aufgerufen wird.

Die KCL ist bei der Behandlung von Ausnahmen, die während der Verarbeitung der Datensätze auftreten, von `processRecords` abhängig. Wenn `processRecords` eine Ausnahme aufwirft, überspringt die KCL die Datensätze, die vor der Ausnahme übergeben wurden. Das heißt, diese Datensätze werden nicht erneut an den Datensatzprozessor gesendet, der die Ausnahme ausgelöst hat, oder an einen anderen Datensatzprozessor im Verbraucher.

**shutdown**  
Die KCL ruft die Methode `shutdown` entweder auf, wenn die Verarbeitung beendet wird (Grund für das Herunterfahren ist `TERMINATE`) oder wenn der Auftragnehmer nicht mehr reagiert (Grund für das Herunterfahren ist `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, entweder weil die Shard geteilt oder zusammengeführt wurde oder weil der Stream gelöscht wurde.

Die KCL übergibt auch eine `IRecordProcessorCheckpointer`-Schnittstelle an `shutdown`. Wenn der Grund für das Herunterfahren `TERMINATE` ist, sollte der Datensatzverarbeiter alle Datensätze fertigstellen und dann die Methode `checkpoint` in seiner Schnittstelle aufrufen.

### Aktualisierte Schnittstelle (Version 2)
<a name="kcl-java-interface-v2"></a>

Die aktualisierte `IRecordProcessor` Schnittstelle (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) stellt die folgenden Datensatzverarbeitermethoden bereit, die Ihr Konsument implementieren muss: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Sie können auf alle Argumente aus der ursprünglichen Version der Schnittstelle über Get-Methoden für die Container-Objekte zugreifen. Um die Liste der Datensätze in `processRecords()` abzurufen, können Sie `processRecordsInput.getRecords()` verwenden.

Ab Version 2 dieser Schnittstelle (KCL 1.5.0 und höher) sind zusätzlich zu den Eingaben durch die ursprüngliche Schnittstelle die folgenden neuen Eingaben verfügbar:

Startsequenznummer  
Im `InitializationInput`-Objekt, das an die Operation `initialize()` übergeben wird, die Startsequenznummer, aus der Datensätze für die Datenverarbeiter-Instance bereitgestellt würden. Dies ist die Sequenznummer, die zuletzt durch die Datensatzverarbeiter-Instance überprüft wurde, die dieselbe Shard zuvor verarbeitet hat. Sie wird für den Fall angegeben, dass Ihre Anwendung diese Informationen benötigt. 

Ausstehende Checkpoint-Sequenznummer  
Im `InitializationInput`-Objekt, das an die Operation `initialize()` übergeben wird, die ausstehende Checkpoint-Sequenznummer (wenn vorhanden), die nicht übergeben werden konnte, bevor die vorherige Datensatzverarbeiter-Instance angehalten wurde.

## Implementieren Sie eine Klassenfabrik für die IRecord Prozessorschnittstelle
<a name="kinesis-record-processor-implementation-factory-java"></a>

Sie müssen darüber hinaus eine Factory für die Klasse implementieren, die die Datensatzverarbeitermethoden implementiert. Wenn der Konsument den Auftragnehmer instanziiert, übergibt er dieser Factory eine Referenz.

Im Beispiel wird die Factory-Klasse in der Datei `AmazonKinesisApplicationSampleRecordProcessorFactory.java` mithilfe der ursprünglichen Datensatzverarbeiter-Schnittstelle implementiert. Wenn Sie möchten, dass die Class Factory Datensatzverarbeiter mit Version 2 erstellt, verwenden Sie den Paketnamen `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Erstellen Sie einen Worker
<a name="kcl-java-worker"></a>

Wie in [Implementieren Sie die Methoden des Prozessors IRecord](#kinesis-record-processor-implementation-interface-java) beschrieben, gibt es zwei Versionen der KCL-Datensatzverarbeiterschnittstelle zur Auswahl. Die Version hat Auswirkungen auf die Art, wie Sie einen Worker erstellen. Die ursprüngliche Datensatzverarbeiterschnittstelle verwendet die folgende Codestruktur, um einen Auftragnehmer zu erstellen:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Mit Version 2 der Datensatzverarbeiterschnittstelle können Sie `Worker.Builder` verwenden, um einen Auftragnehmer zu erstellen, ohne sich Gedanken über den Konstruktor und die Reihenfolge der Argumente zu machen. Die aktualisierte Datensatzverarbeiterschnittstelle verwendet die folgende Codestruktur, um einen Auftragnehmer zu erstellen:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Ändern Sie die Konfigurationseigenschaften
<a name="kinesis-record-processor-initialization-java"></a>

Das Beispiel zeigt Standardwerte für Konfigurationseigenschaften. Diese Konfigurationsdaten für den Auftragnehmer werden anschließend in einem `KinesisClientLibConfiguration`-Objekt konsolidiert. Dieses Objekt und eine Referenz auf die Class Factory für `IRecordProcessor` werden an den Aufruf übergeben, der den Auftragnehmer instanziiert. Sie können diese Eigenschaften mithilfe einer Java-Eigenschaftendatei (siehe `AmazonKinesisApplicationSample.java`) durch eigene Werte überschreiben.

### Anwendungsname
<a name="configuration-property-application-name"></a>

Die KCL erfordert einen Anwendungsnamen, der unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:
+ Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.
+ Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter [Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der KCL-Consumer-Anwendung verarbeitet wurden](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Richten Sie Anmeldeinformationen ein
<a name="kinesis-record-processor-cred-java"></a>

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der Standardanmeldedienstanbieter zur Verfügung stellen. Wenn Sie beispielsweise Ihren Konsumenten auf einer EC2-Instance ausführen, empfehlen wir, die Instance mit einer IAM-Rolle zu starten. AWS -Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden den Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Art, Anmeldeinformationen für einen Konsumenten zu verwalten, der auf einer EC2-Instance ausgeführt wird.

Die Beispielanwendung versucht zunächst, IAM-Anmeldeinformationen aus den Instance-Metadaten abzurufen: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Wenn die Beispielanwendung keine Anmeldeinformationen aus den Instance-Metadaten abrufen kann, versucht sie, Anmeldeinformationen aus einer Eigenschaftendatei abzurufen:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Weitere Informationen zu Instance-Metadaten finden Sie unter [Instance-Metadaten](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) im *Amazon EC2 EC2-Benutzerhandbuch*.

### Verwenden Sie die Worker-ID für mehrere Instances
<a name="kinesis-record-processor-workerid-java"></a>

Derselbe Initialisierungscode erstellt unter Verwendung des Namens des lokalen Computers und Anfügung eines global eindeutigen Bezeichners eine ID für den Auftragnehmer, `workerId`, wie im folgenden Codeauszug gezeigt. Dieser Ansatz unterstützt das Szenario mit mehreren Instances der Konsumentenanwendung, die auf einem einzigen Computer ausgeführt werden.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrieren Sie zu Version 2 der Record Processor-Schnittstelle
<a name="kcl-java-v2-migration"></a>

Wenn Sie Code migrieren möchten, der die ursprüngliche Schnittstelle verwendet, sind zusätzlich zu den zuvor beschriebenen Schritten die folgenden Schritte erforderlich:

1. Ändern der Datensatzverarbeiterklasse, um Version 2 der Datensatzverarbeiterschnittstelle zu importieren:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Ändern der Referenzen zu Eingaben, um `get`-Methoden für die Container-Objekte zu verwenden. In der Operation `shutdown()` ändern Sie beispielsweise „`checkpointer`“ in „`shutdownInput.getCheckpointer()`“.

1. Ändern der Datensatzverarbeiter-Factory, um Version 2 der Datensatzverarbeiter-Factory zu importieren:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Ändern der Konstruktion für den Auftragnehmer, um `Worker.Builder` zu verwenden. Beispiel:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Entwickeln Sie einen Kinesis Client Library-Consumer in Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Node.js behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle bereitgestellt, die als. *MultiLangDaemon* Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für Node.js installieren und Ihre App für Privatanwender vollständig in Node.js schreiben, müssen Sie trotzdem Java auf Ihrem System installieren, und zwar aufgrund der. MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der [ MultiLangDaemon KCL-Projektseite](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Um die Node.js KCL von herunterzuladen GitHub, gehen Sie zur [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Downloads von Beispiel-Code**

Es gibt zwei Code-Beispiele für die KCL in Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Wird in den folgenden Abschnitten verwendet, um die Grundlagen zum Erstellen einer KCL-Konsumentenanwendung in Node.js zu zeigen.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Etwas komplexere und verwendet ein reales Szenario, nachdem Sie sich mit dem grundlegenden Beispiel-Code vertraut gemacht haben. Dieses Beispiel wird hier nicht behandelt. Es gibt jedoch eine Readme-Datei mit weiteren Informationen.

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in Node.js implementieren:

**Topics**
+ [Implementieren Sie den Record Processor](#kinesis-record-processor-implementation-interface-nodejs)
+ [Ändern Sie die Konfigurationseigenschaften](#kinesis-record-processor-initialization-nodejs)

## Implementieren Sie den Record Processor
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

Der einfachste Konsument, der die KCL für Node.js verwenden kann, muss die Funktion `recordProcessor` implementieren. Diese enthält wiederum die Funktionen `initialize`, `processRecords`, und `shutdown`. Das Beispiel zeigt eine Implementierung, die Sie als Ausgangspunkt verwenden können (siehe `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
Die KCL ruft die Funktion `initialize` auf, wenn der Datensatzverarbeiter gestartet wird. Dieser Datensatzverarbeiter verarbeitet nur die Shard-ID, die als `initializeInput.shardId` übergeben wird. In der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Das liegt daran, dass Kinesis Data Streams eine Semantik nach dem Grundsatz *mindestens einmal* hat. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter [Verwenden Sie Resharding, Skalierung und Parallelverarbeitung, um die Anzahl der Shards zu ändern](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 Die KCL ruft diese Funktion mit einer Eingabe auf, die eine Liste von Datensätzen aus der für die Funktion `initialize` angegebenen Shard enthält. Der von Ihnen implementierte Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik Ihres Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern. 

```
processRecords: function(processRecordsInput, completeCallback)
```

Zusätzlich zu den Daten enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel, die der Auftragnehmer beim Verarbeiten der Daten verwenden kann. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Das `record`-Anmeldeverzeichnis stellt die folgenden Schlüssel-Wert-Paare für den Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bereit:

```
record.data
record.sequenceNumber
record.partitionKey
```

Beachten Sie, dass die Daten Base64-kodiert sind.

Im einfachen Beispiel weist die Funktion `processRecords` Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Die KCL übernimmt die Nachverfolgung durch ein `checkpointer`-Objekt, das als `processRecordsInput.checkpointer` übergeben wird. Der Datensatzverarbeiter ruft die Funktion `checkpointer.checkpoint` auf, um die KCL über die Fortschritte zu informieren, die er beim Verarbeiten der Datensätze in der Shard gemacht hat. Wenn der Auftragnehmer fehlschlägt, verwendet die KCL diese Informationen, wenn Sie die Verarbeitung der Shard erneut starten, damit sie den Vorgang ab dem letzten bekannten Datensatz fortsetzt.

Im Fall einer Teilungs- oder Zusammenführungsoperation beginnt die KCL erst dann mit der Verarbeitung der neuen Shards, wenn die Verarbeiter für die ursprünglichen Shards `checkpoint` aufgerufen haben, um zu signalisieren, dass die Verarbeitung der ursprünglichen Shards vollständig abgeschlossen ist.

Wenn Sie die Sequenznummer nicht an die `checkpoint`-Funktion übergeben, nimmt die KCL an, dass der Aufruf von `checkpoint` bedeutet, dass alle Datensätze bis zum letzten Datensatz, der an den Datensatzverarbeiter übergeben wurde, verarbeitet wurden. Daher sollte der Datensatzverarbeiter die Methode `checkpoint` **erst** aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen `checkpoint` nicht bei jedem Aufruf von `processRecords` aufrufen. Ein Prozessor könnte beispielsweise bei jedem dritten Anruf oder `checkpoint` bei einem Ereignis außerhalb Ihres Aufzeichnungsprozessors, z. B. bei einem benutzerdefinierten verification/validation Dienst, den Sie implementiert haben, aufrufen. 

Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für `checkpoint` angeben. In diesem Fall nimmt die KCL an, dass alle Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Die einfache Beispielanwendung zeigt den einfachsten möglichen Aufruf der Funktion `checkpointer.checkpoint`. Sie können weitere Checkpoint-Logik hinzufügen, die Sie an diesem Punkt in der Funktion für Ihren Konsumenten benötigen.

**shutdown**  
Die KCL ruft die Funktion `shutdown` entweder auf, wenn die Verarbeitung beendet wird (`shutdownInput.reason` ist `TERMINATE`) oder wenn der Auftragnehmer nicht mehr reagiert (`shutdownInput.reason` ist `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, entweder weil die Shard geteilt oder zusammengeführt wurde oder weil der Stream gelöscht wurde.

Die KCL übergibt auch ein `shutdownInput.checkpointer`-Objekt an `shutdown`. Wenn der Grund für das Herunterfahren `TERMINATE` ist, sollten Sie sicherstellen, dass der Datensatzverarbeiter die Verarbeitung aller Datensätze fertiggestellt hat, und dann die Funktion `checkpoint` in seiner Schnittstelle aufrufen.

## Ändern Sie die Konfigurationseigenschaften
<a name="kinesis-record-processor-initialization-nodejs"></a>

Das Beispiel zeigt Standardwerte für die Konfigurationseigenschaften. Sie können diese Eigenschaften mit eigenen Werten überschreiben (siehe `sample.properties` im einfachen Beispiel).

### Anwendungsname
<a name="kinesis-record-processor-application-name-nodejs"></a>

Die KCL erfordert eine Anwendung, die unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:
+ Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.
+ Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter [Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der KCL-Consumer-Anwendung verarbeitet wurden](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Richten Sie Anmeldeinformationen ein
<a name="kinesis-record-processor-credentials-nodejs"></a>

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der Standardanmeldedienstanbieter zur Verfügung stellen. Sie können die Eigenschaft `AWSCredentialsProvider` verwenden, um einen Anmeldeinformationsanbieter einzurichten. Die `sample.properties`-Datei muss Anmeldeinformationen einem der Anmeldeinformationsanbieter in der [Anmeldeinformationsanbieter-Standardkette](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) bereitstellen. Wenn Sie Ihren Consumer auf einer Amazon EC2 EC2-Instance ausführen, empfehlen wir Ihnen, die Instance mit einer IAM-Rolle zu konfigurieren. AWS Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Art, Anmeldeinformationen für eine Konsumentenanwendung zu verwalten, die auf einer EC2-Instance ausgeführt wird.

Im folgenden Beispiel wird eine KCL konfiguriert, um einen Kinesis-Datenstrom namens `kclnodejssample` mittels des Datensatzverarbeiters zu verarbeiten, der in `sample_kcl_app.js` bereitgestellt wird:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Entwickeln Sie einen Kinesis Client Library-Consumer in .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird .NET behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle bereitgestellt, die als. *MultiLangDaemon* Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für .NET installieren und Ihre Consumer-App vollständig in .NET schreiben, müssen Sie trotzdem Java auf Ihrem System installieren, da MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der [ MultiLangDaemon KCL-Projektseite](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Gehen Sie zur [Kinesis Client Library (.NET) GitHub, um die.NET-KCL](https://github.com/awslabs/amazon-kinesis-client-net) von herunterzuladen. Um Beispielcode für eine.NET-KCL-Consumer-Anwendung herunterzuladen, rufen Sie die Projektseite [KCL for .NET-Beispielanwendung für Privatanwender](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) auf. GitHub

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in .NET implementieren:

**Topics**
+ [Implementieren Sie die Methoden der IRecord Prozessorklasse](#kinesis-record-processor-implementation-interface-dotnet)
+ [Ändern Sie die Konfigurationseigenschaften](#kinesis-record-processor-initialization-dotnet)

## Implementieren Sie die Methoden der IRecord Prozessorklasse
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

Der Konsument muss die folgenden Methoden für `IRecordProcessor` implementieren. Der Konsument im Beispiel stellt Implementierungen bereit, die Sie als Ausgangspunkt verwenden können (siehe die `SampleRecordProcessor`-Klasse in `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialisieren**  
Die KCL ruft diese Methode auf, wenn der Datensatzverarbeiter instanziiert wird, und übergibt eine spezifische Shard-ID an den `input`-Parameter (`input.ShardId`). Dieser Datensatzverarbeiter verarbeitet nur diese Shard und in der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Das liegt daran, dass Kinesis Data Streams eine Semantik nach dem Grundsatz *mindestens einmal* hat. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter [Verwenden Sie Resharding, Skalierung und Parallelverarbeitung, um die Anzahl der Shards zu ändern](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
Die KCL ruft diese Methode auf und übergibt eine Liste der Datensätze an den `input`-Parameter (`input.Records`) aus der Shard, die von der Methode `Initialize` angegeben wird. Der von Ihnen implementierte Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik Ihres Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern.

```
public void ProcessRecords(ProcessRecordsInput input)
```

Zusätzlich zu den Daten selbst enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel. Der Auftragnehmer kann diese Werte beim Verarbeiten der Daten verwenden. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Die Klasse `Record` stellt die folgenden Methoden bereit, die Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bieten:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Im Beispiel weist die Methode `ProcessRecordsWithRetries` Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Die KCL übernimmt diese Nachverfolgung für Sie, indem ein `Checkpointer`-Objekt an `ProcessRecords` (`input.Checkpointer`) übergeben wird. Der Datensatzverarbeiter ruft die Methode `Checkpointer.Checkpoint` auf, um die KCL über die Fortschritte zu informieren, die sie beim Verarbeiten der Datensätze in der Shard gemacht hat. Wenn der Auftragnehmer fehlschlägt, verwendet die KCL diese Informationen, um die Verarbeitung der Shard mit dem letzten bekannten Datensatz neu zu starten.

Im Fall einer Teilungs- oder Zusammenführungsoperation beginnt die KCL erst dann mit der Verarbeitung der neuen Shards, wenn die Verarbeiter für die ursprünglichen Shards `Checkpointer.Checkpoint` aufgerufen haben, um zu signalisieren, dass die Verarbeitung der ursprünglichen Shards vollständig abgeschlossen ist.

Wenn Sie keinen Parameter übergeben, nimmt die KCL an, dass der Aufruf von `Checkpointer.Checkpoint` bedeutet, dass alle Datensätze bis zum letzten Datensatz, der an den Datensatzverarbeiter übergeben wurde, verarbeitet wurden. Daher sollte der Datensatzverarbeiter die Methode `Checkpointer.Checkpoint` erst aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen `Checkpointer.Checkpoint` nicht bei jedem Aufruf von `ProcessRecords` aufrufen. Ein Prozessor könnte beispielsweise `Checkpointer.Checkpoint` bei jedem dritten oder vierten Aufruf aufrufen. Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für `Checkpointer.Checkpoint` angeben. In diesem Fall nimmt die KCL an, dass die Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Im Beispiel zeigt die private Methode `Checkpoint(Checkpointer checkpointer)`, wie die `Checkpointer.Checkpoint`-Methode mithilfe der entsprechenden Ausnahmebehandlung und Wiederholungslogik aufgerufen wird.

Die KCL für .NET verarbeitet Ausnahmen anders als andere KCL-Sprachbibliotheken, da sie keine Ausnahmen verarbeitet, die aus der Verarbeitung der Datensätze entstanden sind. Alle nicht abgefangenen Ausnahmen vom Benutzer-Code bringen das Programm zum Absturz.

**Herunterfahren**  
Die KPL ruft die Methode `Shutdown` entweder auf, wenn die Verarbeitung beendet wird (Grund für das Herunterfahren ist `TERMINATE`) oder wenn der Auftragnehmer nicht mehr reagiert (der `input.Reason`-Wert für das Herunterfahren ist `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, weil die Shard geteilt oder zusammengeführt wurde oder der Stream gelöscht wurde.

Die KCL übergibt auch ein `Checkpointer`-Objekt an `shutdown`. Wenn der Grund für das Herunterfahren `TERMINATE` ist, sollte der Datensatzverarbeiter alle Datensätze fertigstellen und dann die Methode `checkpoint` in seiner Schnittstelle aufrufen.

## Ändern Sie die Konfigurationseigenschaften
<a name="kinesis-record-processor-initialization-dotnet"></a>

Der Beispielkonsument zeigt Standardwerte für die Konfigurationseigenschaften. Sie können diese Eigenschaften mit eigenen Werten überschreiben (siehe `SampleConsumer/kcl.properties`).

### Anwendungsname
<a name="modify-kinesis-record-processor-application-name"></a>

Die KCL erfordert eine Anwendung, die unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:
+ Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.
+ Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter [Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der KCL-Consumer-Anwendung verarbeitet wurden](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Richten Sie Anmeldeinformationen ein
<a name="kinesis-record-processor-creds-dotnet"></a>

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der Standardanmeldedienstanbieter zur Verfügung stellen. Sie können die Eigenschaft `AWSCredentialsProvider` verwenden, um einen Anmeldeinformationsanbieter einzurichten. Die [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) muss Ihre Anmeldeinformationen einem der Anmeldeinformationsanbieter in der [Anmeldeinformationsanbieter-Standardkette](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) bereitstellen. Wenn Sie Ihre Konsumentenanwendung auf einer EC2-Instance ausführen, empfehlen wir, die Instance mit einer IAM-Rolle zu konfigurieren. AWS -Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden den Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Art, Anmeldeinformationen für einen Konsumenten zu verwalten, der auf einer EC2-Instance ausgeführt wird.

Die Eigenschaftendatei des Beispiels konfiguriert KCL, um einen Kinesis-Datenstrom namens „words“ mittels des Datensatzverarbeiters zu verarbeiten, der in `AmazonKinesisSampleConsumer.cs` bereitgestellt wird. 

# Entwickeln Sie einen Kinesis Client Library-Consumer in Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Python behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle bereitgestellt, die als. *MultiLangDaemon* Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für Python installieren und Ihre Consumer-App vollständig in Python schreiben, muss Java aufgrund der MultiLangDaemon trotzdem auf Ihrem System installiert sein. Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der [ MultiLangDaemon KCL-Projektseite](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Um die Python-KCL von herunterzuladen GitHub, gehen Sie zur [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Um Beispielcode für eine Python-KCL-Consumer-Anwendung herunterzuladen, gehen Sie zur [KCL for Python-Beispielprojektseite unter](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples). GitHub

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in Python implementieren:

**Topics**
+ [Implementieren Sie die Klassenmethoden RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Ändern Sie die Konfigurationseigenschaften](#kinesis-record-processor-initialization-py)

## Implementieren Sie die Klassenmethoden RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

Die `RecordProcess`-Klasse muss die `RecordProcessorBase` erweitern, um die folgenden Methoden zu implementieren. Das Beispiel stellt Implementierungen bereit, die Sie als Ausgangspunkt verwenden können (siehe `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
Die KCL ruft die Methode `initialize` auf, wenn der Datensatzverarbeiter instanziiert wird, und übergibt eine spezifische Shard-ID als Parameter. Dieser Datensatzverarbeiter verarbeitet nur diese Shard und in der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Das liegt daran, dass Kinesis Data Streams eine Semantik nach dem Grundsatz *mindestens einmal* hat. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter [Verwenden Sie Resharding, Skalierung und Parallelverarbeitung, um die Anzahl der Shards zu ändern](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 Die KCL ruft diese Methode auf und übergibt eine Liste der Datensätze aus der Shard, die von der Methode `initialize` angegeben wird. Der von Ihnen implementierte Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik Ihres Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern.

```
def process_records(self, records, checkpointer) 
```

Zusätzlich zu den Daten selbst enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel. Der Auftragnehmer kann diese Werte beim Verarbeiten der Daten verwenden. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Das `record`-Anmeldeverzeichnis stellt die folgenden Schlüssel-Wert-Paare für den Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bereit:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Beachten Sie, dass die Daten Base64-kodiert sind.

Im Beispiel weist die Methode `process_records` Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Die KCL übernimmt diese Nachverfolgung für Sie, indem ein `Checkpointer`-Objekt an `process_records` übergeben wird. Der Datensatzverarbeiter ruft die Methode `checkpoint` auf diesem Objekt auf, um die KCL über die Fortschritte zu informieren, die er beim Verarbeiten der Datensätze in der Shard gemacht hat. Wenn der Auftragnehmer fehlschlägt, verwendet die KCL diese Informationen, um die Verarbeitung der Shard mit dem letzten bekannten Datensatz neu zu starten.

Im Fall einer Teilungs- oder Zusammenführungsoperation beginnt die KCL erst dann mit der Verarbeitung der neuen Shards, wenn die Verarbeiter für die ursprünglichen Shards `checkpoint` aufgerufen haben, um zu signalisieren, dass die Verarbeitung der ursprünglichen Shards vollständig abgeschlossen ist.

Wenn Sie keinen Parameter übergeben, nimmt die KCL an, dass der Aufruf von `checkpoint` bedeutet, dass alle Datensätze bis zum letzten Datensatz, der an den Datensatzverarbeiter übergeben wurde, verarbeitet wurden. Daher sollte der Datensatzverarbeiter die Methode `checkpoint` erst aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen `checkpoint` nicht bei jedem Aufruf von `process_records` aufrufen. Ein Prozessor könnte beispielsweise `checkpoint` bei jedem dritten Aufruf aufrufen. Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für `checkpoint` angeben. In diesem Fall nimmt die KCL an, dass alle Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Im Beispiel zeigt die private Methode `checkpoint`, wie die `Checkpointer.checkpoint`-Methode mithilfe der entsprechenden Ausnahmebehandlung und Wiederholungslogik aufgerufen wird.

Die KCL ist bei der Behandlung von Ausnahmen, die während der Verarbeitung der Datensätze auftreten, von `process_records` abhängig. Wenn `process_records` eine Ausnahme auslöst, überspringt die KCL die Datensätze, die vor der Ausnahme an `process_records` übergeben wurden. Das heißt, diese Datensätze werden nicht erneut an den Datensatzprozessor gesendet, der die Ausnahme ausgelöst hat, oder an einen anderen Datensatzprozessor im Verbraucher.

**shutdown**  
 Die KCL ruft die Methode `shutdown` entweder auf, wenn die Verarbeitung beendet wird (Grund für das Herunterfahren ist `TERMINATE`) oder wenn der Auftragnehmer nicht mehr reagiert (das Herunterfahren `reason` ist `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, entweder weil die Shard geteilt oder zusammengeführt wurde oder weil der Stream gelöscht wurde.

 Die KCL übergibt auch ein `Checkpointer`-Objekt an `shutdown`. Wenn der `reason` für das Herunterfahren `TERMINATE` ist, sollte der Datensatzverarbeiter alle Datensätze fertigstellen und dann die Methode `checkpoint` in seiner Schnittstelle aufrufen.

## Ändern Sie die Konfigurationseigenschaften
<a name="kinesis-record-processor-initialization-py"></a>

Das Beispiel zeigt Standardwerte für die Konfigurationseigenschaften. Sie können diese Eigenschaften mit eigenen Werten überschreiben (siehe `sample.properties`).

### Anwendungsname
<a name="kinesis-record-processor-application-name-py"></a>

Die KCL erfordert einen Anwendungsnamen, der unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:
+ Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.
+ Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter [Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der KCL-Consumer-Anwendung verarbeitet wurden](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Richten Sie Anmeldeinformationen ein
<a name="kinesis-record-processor-creds-py"></a>

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der Standardanmeldedienstanbieter zur Verfügung stellen. Sie können die Eigenschaft `AWSCredentialsProvider` verwenden, um einen Anmeldeinformationsanbieter einzurichten. Die [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) muss Ihre Anmeldeinformationen einem der Anmeldeinformationsanbieter in der [Anmeldeinformationsanbieter-Standardkette](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) bereitstellen. Wenn Sie Ihre Konsumentenanwendung auf einer Amazon-EC2-Instance ausführen, empfehlen wir, die Instance mit einer IAM-Rolle zu konfigurieren. AWS -Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden den Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Art, Anmeldeinformationen für eine Konsumentenanwendung zu verwalten, die auf einer EC2-Instance ausgeführt wird.

Die Eigenschaftendatei des Beispiels konfiguriert KCL, um einen Kinesis-Datenstrom namens „words“ mittels des Datensatzverarbeiters zu verarbeiten, der in `sample_kclpy_app.py` bereitgestellt wird. 

# Entwickeln Sie einen Kinesis Client Library-Consumer in Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Ruby behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle bereitgestellt, die als. *MultiLangDaemon* Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für Ruby installieren und Ihre Consumer-App vollständig in Ruby schreiben, müssen Sie trotzdem Java auf Ihrem System installieren, da MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der [ MultiLangDaemon KCL-Projektseite](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Um die Ruby KCL von herunterzuladen GitHub, gehen Sie zur [Kinesis Client Library (Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby). Den Beispielcode für eine Ruby-KCL-Anwendung für Privatanwender können Sie auf der Projektseite [KCL for Ruby unter](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) herunterladen. GitHub

Weitere Informationen über die KCL-Bibliothek für die Unterstützung von Ruby finden Sie unter [Dokumentation zu KCL Ruby Gems](http://www.rubydoc.info/gems/aws-kclrb).

# Entwickeln Sie KCL 2.x-Verbraucher
<a name="developing-consumers-with-kcl-v2"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 end-of-support verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

In diesem Thema erhalten Sie Informationen zur Nutzung der Version 2.0 der Kinesis Client Library (KCL). 

Weitere Informationen über die KCL finden Sie in der Übersicht in [Entwickeln von Verbrauchern mit der Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Wählen Sie je nach der Option, die Sie verwenden möchten, aus den folgenden Themen.

**Topics**
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in Java](kcl2-standard-consumer-java-example.md)
+ [Entwickeln Sie einen Kinesis Client Library-Consumer in Python](kcl2-standard-consumer-python-example.md)
+ [Entwickeln Sie mit KCL 2.x erweiterte Fan-Out-Nutzer](building-enhanced-consumers-kcl-retired.md)

# Entwickeln Sie einen Kinesis Client Library-Consumer in Java
<a name="kcl2-standard-consumer-java-example"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Der folgende Code zeigt eine Beispielimplementierung in Java für `ProcessorFactory` und `RecordProcessor`. Weitere Informationen zur Nutzung der Vorteile der erweiterten Rundsendefunktion finden Sie unter [Verwenden von Verbrauchern mit erweitertem Rundsenden ](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Entwickeln Sie einen Kinesis Client Library-Consumer in Python
<a name="kcl2-standard-consumer-python-example"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird end-of-support am 30. Januar 2026 verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Python behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle bereitgestellt, die als. *MultiLangDaemon* Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL-Sprache als Java verwenden. Wenn Sie also die KCL für Python installieren und Ihre Consumer-App vollständig in Python schreiben, muss Java aufgrund der MultiLangDaemon trotzdem auf Ihrem System installiert sein. Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der [ MultiLangDaemon KCL-Projektseite](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Um die Python-KCL von herunterzuladen GitHub, gehen Sie zur [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Um Beispielcode für eine Python-KCL-Consumer-Anwendung herunterzuladen, gehen Sie zur [KCL for Python-Beispielprojektseite unter](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples). GitHub

Sie müssen die folgenden Aufgaben durchführen, wenn Sie eine KCL-Konsumentenanwendung in Python implementieren:

**Topics**
+ [Implementieren Sie die Klassenmethoden RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Ändern Sie die Konfigurationseigenschaften](#kinesis-record-processor-initialization-py)

## Implementieren Sie die Klassenmethoden RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

Die `RecordProcess`-Klasse muss die `RecordProcessorBase`-Klasse erweitern, um die folgenden Methoden zu implementieren:

```
initialize
process_records
shutdown_requested
```

Dieses Beispiel stellt Implementierungen bereit, die Sie als Ausgangspunkt verwenden können.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Ändern Sie die Konfigurationseigenschaften
<a name="kinesis-record-processor-initialization-py"></a>

Das Beispiel zeigt Standardwerte für die Konfigurationseigenschaften, wie in dem folgenden Skript gezeigt. Sie können diese Eigenschaften mit eigenen Werten überschreiben.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Anwendungsname
<a name="kinesis-record-processor-application-name-py"></a>

Die KCL erfordert einen Anwendungsnamen, der unter Ihren Anwendungen sowie den Amazon-DynamoDB-Tabellen in derselben Region eindeutig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:
+ Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Worker können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, behandelt die KCL die zweite Instance als eine völlig getrennte Anwendung, die ebenfalls im selben Stream arbeitet.
+ Die KCL erstellt eine DynamoDB-Tabelle mit dem Namen der Anwendung und verwendet die Tabelle für die Verwaltung von Statusinformationen für die Anwendung (wie Checkpoints und Auftragnehmer-Shard-Zuweisungen). Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter [Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der KCL-Consumer-Anwendung verarbeitet wurden](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Anmeldeinformationen
<a name="kinesis-record-processor-creds-py"></a>

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der [Standardanmeldedienstanbieter](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) zur Verfügung stellen. Sie können die Eigenschaft `AWSCredentialsProvider` verwenden, um einen Anmeldeinformationsanbieter einzurichten. Wenn Sie Ihre Consumer-Anwendung auf einer Amazon EC2 EC2-Instance ausführen, empfehlen wir Ihnen, die Instance mit einer IAM-Rolle zu konfigurieren. AWS Anmeldeinformationen, die die mit dieser IAM-Rolle verknüpften Berechtigungen widerspiegeln, werden den Anwendungen auf der Instance über deren Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Art, Anmeldeinformationen für eine Konsumentenanwendung zu verwalten, die auf einer EC2-Instance ausgeführt wird.

# Entwickeln Sie mit KCL 2.x erweiterte Fan-Out-Nutzer
<a name="building-enhanced-consumers-kcl-retired"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 end-of-support verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Verbraucher, die ein *erweitertes Rundsenden* in Amazon Kinesis Data Streams verwenden, können Datensätze aus einem Datenstrom mit einem dedizierten Durchsatz von bis zu 2 MB Daten pro Sekunde pro Shard empfangen. Diese Art Verbraucher muss nicht mit anderen Verbrauchern konkurrieren, die Daten aus dem Stream empfangen. Weitere Informationen finden Sie unter [Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz](enhanced-consumers.md).

Sie können die Version 2.0 oder höher der Kinesis Client Library (KCL) verwenden, um Anwendungen zu entwickeln, die erweitertes Rundsenden verwenden, um Daten aus Streams zu empfangen. Die KCL abonniert Ihre Anwendung automatisch für alle Shards eines Streams und stellt sicher, dass Ihre Privatanwenderanwendung mit einem Durchsatzwert von 2 pro Shard lesen kann. MB/sec Weitere Informationen zur Verwendung der KCL ohne Aktivierung von erweitertem Rundsenden finden Sie unter [Entwickeln von Verbrauchern mit der Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Entwickeln Sie erweiterte Fan-Out-Consumer mithilfe von KCL 2.x in Java](building-enhanced-consumers-kcl-java.md)

# Entwickeln Sie erweiterte Fan-Out-Consumer mithilfe von KCL 2.x in Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 end-of-support verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Version 2.0 oder höher der Kinesis Client Library (KCL) verwenden, um Anwendungen in Amazon Kinesis Data Streams zu entwickeln, die Daten aus Streams mit erweitertem Rundsenden empfangen. Der folgende Code zeigt eine Beispielimplementierung in Java für `ProcessorFactory` und `RecordProcessor`.

Es wird empfohlen, dass Sie `KinesisClientUtil` zum Erstellen von `KinesisAsyncClient` und zum Konfigurieren von `maxConcurrency` in `KinesisAsyncClient` verwenden.

**Wichtig**  
Für den Amazon Kinesis Client kann sich die Latenz möglicherweise signifikant erhöhen, sofern Sie `KinesisAsyncClient` nicht für einen `maxConcurrency`-Wert konfigurieren, der hoch genug ist, um alle Leases plus zusätzliche Verwendungen von `KinesisAsyncClient` zu ermöglichen.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```