

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Entwickeln Sie mit KCL 2.x erweiterte Fan-Out-Nutzer
<a name="building-enhanced-consumers-kcl-retired"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 end-of-support verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Verbraucher, die ein *erweitertes Rundsenden* in Amazon Kinesis Data Streams verwenden, können Datensätze aus einem Datenstrom mit einem dedizierten Durchsatz von bis zu 2 MB Daten pro Sekunde pro Shard empfangen. Diese Art Verbraucher muss nicht mit anderen Verbrauchern konkurrieren, die Daten aus dem Stream empfangen. Weitere Informationen finden Sie unter [Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz](enhanced-consumers.md).

Sie können die Version 2.0 oder höher der Kinesis Client Library (KCL) verwenden, um Anwendungen zu entwickeln, die erweitertes Rundsenden verwenden, um Daten aus Streams zu empfangen. Die KCL abonniert Ihre Anwendung automatisch für alle Shards eines Streams und stellt sicher, dass Ihre Privatanwenderanwendung mit einem Durchsatzwert von 2 pro Shard lesen kann. MB/sec Weitere Informationen zur Verwendung der KCL ohne Aktivierung von erweitertem Rundsenden finden Sie unter [Entwickeln von Verbrauchern mit der Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Entwickeln Sie erweiterte Fan-Out-Consumer mithilfe von KCL 2.x in Java](building-enhanced-consumers-kcl-java.md)

# Entwickeln Sie erweiterte Fan-Out-Consumer mithilfe von KCL 2.x in Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Wichtig**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 end-of-support verfügbar sein. Wir **empfehlen dringend**, dass Sie Ihre KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 auf die neueste KCL-Version migrieren. Die neueste KCL-Version finden Sie auf der [Seite Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Informationen zu den neuesten KCL-Versionen finden Sie unter. [Verwenden Sie die Kinesis-Clientbibliothek](kcl.md) Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter. [Migrieren von KCL 1.x zu KCL 3.x](kcl-migration-1-3.md)

Sie können die Version 2.0 oder höher der Kinesis Client Library (KCL) verwenden, um Anwendungen in Amazon Kinesis Data Streams zu entwickeln, die Daten aus Streams mit erweitertem Rundsenden empfangen. Der folgende Code zeigt eine Beispielimplementierung in Java für `ProcessorFactory` und `RecordProcessor`.

Es wird empfohlen, dass Sie `KinesisClientUtil` zum Erstellen von `KinesisAsyncClient` und zum Konfigurieren von `maxConcurrency` in `KinesisAsyncClient` verwenden.

**Wichtig**  
Für den Amazon Kinesis Client kann sich die Latenz möglicherweise signifikant erhöhen, sofern Sie `KinesisAsyncClient` nicht für einen `maxConcurrency`-Wert konfigurieren, der hoch genug ist, um alle Leases plus zusätzliche Verwendungen von `KinesisAsyncClient` zu ermöglichen.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```