

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 공유 처리량으로 사용자 지정 소비자 개발
<a name="shared-throughput-consumers"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Data Streams로부터 데이터를 수신할 때 전용 처리량이 필요하지 않은 경우 그리고 200ms 미만의 읽기 전파 지연이 필요하지 않은 경우에는 다음 섹션에서 설명한 대로 소비자 애플리케이션을 구축하면 됩니다. Kinesis Client Library(KCL) 또는 AWS SDK for Java를 사용할 수 있습니다.

**Topics**
+ [KCL를 사용하여 공유 처리량으로 사용자 지정 소비자 개발](custom-kcl-consumers.md)

전용 처리량으로 Kinesis 데이터 스트림에서 레코드를 수신할 수 있는 소비자를 구축하는 방법에 대한 자세한 내용은 [전용 처리량으로 향상된 팬아웃 소비자 개발](enhanced-consumers.md) 섹션을 참조하세요.

# KCL를 사용하여 공유 처리량으로 사용자 지정 소비자 개발
<a name="custom-kcl-consumers"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

공유 처리량으로 사용자 지정 소비자 애플리케이션을 개발하는 방법 중 하나는 Kinesis Client Library(KCL)를 사용하는 것입니다.

사용 중인 KCL 버전에 대한 다음 주제 중에서 선택하세요.

**Topics**
+ [KCL 1.x 소비자 개발](developing-consumers-with-kcl.md)
+ [KCL 2.x 소비자 개발](developing-consumers-with-kcl-v2.md)

# KCL 1.x 소비자 개발
<a name="developing-consumers-with-kcl"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Amazon Kinesis Data Streams의 소비자 애플리케이션을 개발할 수 있습니다.

KCL에 대한 자세한 내용은 [KCL 정보(이전 버전)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview) 단원을 참조하십시오.

사용하려는 옵션에 따라 다음 주제 중에서 선택하세요.

**Topics**
+ [Java로 Kinesis Client Library 소비자 개발](kinesis-record-processor-implementation-app-java.md)
+ [Node.js로 Kinesis Client Library 소비자 개발](kinesis-record-processor-implementation-app-nodejs.md)
+ [.NET으로 Kinesis Client Library 소비자 개발](kinesis-record-processor-implementation-app-dotnet.md)
+ [Python으로 Kinesis Client Library 소비자 개발](kinesis-record-processor-implementation-app-py.md)
+ [Ruby로 Kinesis Client Library 소비자 개발](kinesis-record-processor-implementation-app-ruby.md)

# Java로 Kinesis Client Library 소비자 개발
<a name="kinesis-record-processor-implementation-app-java"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 Java에 대해 설명합니다. Javadoc 참조를 보려면 [Class AmazonKinesisClient의AWS Javadoc 항목](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)으로 이동하세요.

GitHub에서 Java KCL을 다운로드하려면 [Kinesis Client Library(Java)](https://github.com/awslabs/amazon-kinesis-client)로 이동하세요. Apache Maven에서 Java KCL을 찾으려면 [KCL 검색 결과](https://search.maven.org/#search|ga|1|amazon-kinesis-client) 페이지로 이동하세요. GitHub에서 Java KCL 소비자 애플리케이션용 샘플 코드를 다운로드하려면 GitHub의 [KCL for Java 샘플 프로젝트](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) 페이지로 이동하세요.

샘플 애플리케이션에 [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html)이 사용됩니다. `configure` 파일에 정의된 정적 `AmazonKinesisApplicationSample.java` 메서드에서 로깅 구성을 변경할 수 있습니다. Log4j 및 AWS Java 애플리케이션에서 Apache Commons Logging을 사용하는 방법에 대한 자세한 내용은 *AWS SDK for Java 개발자 안내서*의 [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)를 참조하세요.

Java로 KCL 소비자 애플리케이션을 구현할 때 다음 작업을 완료해야 합니다.

**Topics**
+ [IRecordProcessor 메서드 구현](#kinesis-record-processor-implementation-interface-java)
+ [IRecordProcessor 인터페이스를 위한 클래스 팩토리 구현](#kinesis-record-processor-implementation-factory-java)
+ [작업자 생성](#kcl-java-worker)
+ [구성 속성 수정](#kinesis-record-processor-initialization-java)
+ [레코드 프로세서 인터페이스 버전 2로 마이그레이션](#kcl-java-v2-migration)

## IRecordProcessor 메서드 구현
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL은 현재 두 버전의 `IRecordProcessor` 인터페이스를 지원합니다. KCL의 첫 번째 버전에 제공되는 원래 인터페이스와 KCL 버전 1.5.0부터 사용할 수 있는 버전 2입니다. 두 인터페이스 모두 완벽하게 지원되며, 특정 시나리오 요구 사항에 따라 선택할 수 있습니다. 모든 차이점을 보려면 로컬로 빌드된 Javadoc 또는 소스 코드를 참조하십시오. 다음 단원에서는 시작에 필요한 최소한의 구현을 설명합니다.

**Topics**
+ [원래의 인터페이스(버전 1)](#kcl-java-interface-original)
+ [업데이트된 인터페이스(버전 2)](#kcl-java-interface-v2)

### 원래의 인터페이스(버전 1)
<a name="kcl-java-interface-original"></a>

원래의 `IRecordProcessor` 인터페이스(`package com.amazonaws.services.kinesis.clientlibrary.interfaces`)는 소비자가 구현할 다음과 같은 레코드 프로세서 메서드를 노출합니다. 이 샘플에서는 시작점으로 사용할 수 있는 구현을 제공합니다(`AmazonKinesisApplicationSampleRecordProcessor.java` 참조).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**초기화**  
KCL은 레코드 프로세서가 인스턴스화될 때 특정 샤드 ID를 파라미터로 전달하여 `initialize` 메서드를 직접적으로 호출합니다. 이 레코드 프로세서는 해당 샤드만 처리하고 일반적으로 반대의 경우도 마찬가지입니다. 이 샤드는 해당 레코드 프로세서로만 처리됩니다. 하지만 소비자는 데이터 레코드가 두 번 이상 처리될 가능성을 고려해야 합니다. Kinesis Data Streams에서는 소비자의 워커가 샤드의 모든 데이터 레코드를 적어도 한 번은 처리한다는 *적어도 한 번* 의미론이 통용됩니다. 둘 이상의 작업자가 특정 샤드를 처리할 수 있는 경우에 대한 자세한 내용은 [리샤딩, 규모 조정 및 병렬 처리를 사용하여 샤드 수 변경](kinesis-record-processor-scaling.md)를 참조하십시오.

```
public void initialize(String shardId)
```

**processRecords**  
KCL은 `processRecords` 메서드에 지정된 샤드의 데이터 레코드 목록을 전달하여 `initialize(shardId)` 메서드를 직접적으로 호출합니다. 레코드 프로세서는 소비자의 의미론에 따라 이 레코드의 데이터를 처리합니다. 예를 들어, 워커가 데이터를 전환한 후 그 결과를 Amazon Simple Storage Service(S3) 버킷에 저장할 수 있습니다.

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

데이터 자체뿐 아니라 시퀀스 번호와 파티션 키도 데이터 레코드에 포함됩니다. 작업자가 데이터를 처리할 때 이 값을 사용할 수 있습니다. 예를 들어, 작업자는 파티션 키의 값을 기반으로 데이터를 저장할 S3 버킷을 선택할 수 있습니다. `Record` 클래스는 레코드의 데이터, 시퀀스 번호 및 파티션 키에 대한 액세스를 제공하는 다음 메서드를 노출합니다.

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

이 샘플의 프라이빗 메서드 `processRecordsWithRetries`에는 작업자가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하는 방법을 보여주는 코드가 있습니다.

Kinesis Data Streams는 샤드에서 이미 처리된 레코드를 추적하도록 레코드 프로세서에 요구합니다. KCL은 체크포인터(`IRecordProcessorCheckpointer`)를 `processRecords`에 전달하여 이 추적을 처리합니다. 레코드 프로세서는 해당 인터페이스에서 `checkpoint` 메서드를 직접적으로 호출하여 샤드의 레코드 처리가 얼마나 진행되었는지 KCL에 알려줍니다. 워커가 실패할 경우 KCL은 이 정보를 사용하여 마지막으로 처리된 레코드에서 샤드 처리를 다시 시작합니다.

분할 또는 병합 작업의 경우 소스 샤드의 프로세서가 `checkpoint`를 직접적으로 호출하여 소스 샤드의 모든 처리가 완료되었다고 표시할 때까지 KCL은 새 샤드의 처리를 시작하지 않습니다.

파라미터를 전달하지 않으면 KCL은 `checkpoint`에 대한 호출이 레코드 프로세서에 전달된 마지막 레코드까지 모두 처리되었다는 의미로 간주합니다. 따라서 레코드 프로세서는 전달된 목록에 있는 모든 레코드를 반드시 처리한 후에 `checkpoint`를 호출해야 합니다. 레코드 프로세서는 `checkpoint`를 호출할 때마다 `processRecords`를 호출할 필요가 없습니다. 예를 들어, 프로세서는 `checkpoint`를 세 번째 호출할 때마다 `processRecords`를 호출할 수 있습니다. 선택적으로 레코드의 정확한 시퀀스 번호를 `checkpoint`의 파라미터로 지정할 수도 있습니다. 이 경우 KCL은 모든 레코드가 해당 레코드까지만 처리되었다고 간주합니다.

이 샘플에서는 프라이빗 메서드 `checkpoint`가 적절한 예외 처리 및 재시도 로직을 사용하여 `IRecordProcessorCheckpointer.checkpoint`를 호출하는 방법을 보여줍니다.

KCL은 `processRecords`를 사용하여 데이터 레코드를 처리할 때 발생하는 모든 예외를 처리합니다. `processRecords`에서 예외가 발생하면 KCL은 예외 이전에 전달된 데이터 레코드를 건너뜁니다. 이러한 레코드는 예외가 발생한 프로세서 또는 소비자의 다른 레코드 프로세서로 다시 전송되지 않습니다.

**종료**  
처리가 종료될 때(종료 이유가 `TERMINATE`) 또는 워커가 더 이상 응답하지 않을 때(종료 이유가 `ZOMBIE`) KCL은 `shutdown` 메서드를 직접적으로 호출합니다.

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

샤드 분할이나 병합 또는 스트림 삭제로 인해 레코드 프로세서가 샤드에서 추가 레코드를 수신하지 않으면 처리가 종료됩니다.

또한 KCL은 `IRecordProcessorCheckpointer` 인터페이스를 `shutdown`에 전달합니다. 종료 이유가 `TERMINATE`이면 레코드 프로세서가 데이터 레코드 처리를 완료하고 이 인터페이스의 `checkpoint` 메서드를 호출해야 합니다.

### 업데이트된 인터페이스(버전 2)
<a name="kcl-java-interface-v2"></a>

업데이트된 `IRecordProcessor` 인터페이스(`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`)는 소비자가 구현할 다음과 같은 레코드 프로세서 메서드를 노출합니다.

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

원래 인터페이스 버전의 모든 인수는 컨테이너 객체에서 get 메서드를 통해 액세스할 수 있습니다. 예를 들어, `processRecords()`를 사용하여 `processRecordsInput.getRecords()`의 레코드 목록을 검색할 수 있습니다.

이 인터페이스 버전 2에서(KCL 1.5.0 이상) 원래의 인터페이스가 제공하는 입력 외에도 다음과 같은 새 입력을 사용할 수 있습니다.

시작 시퀀스 번호  
`InitializationInput` 작업에 전달된 `initialize()` 객체에서 레코드가 레코드 프로세서 인스턴스에 제공될 시작 시퀀스 번호이며, 전에 같은 샤드를 처리하는 레코드 프로세서가 마지막으로 검사한 시퀀스 번호입니다. 애플리케이션에 이 정보가 필요할 경우 제공됩니다.

보류 중인 체크포인트 시퀀스 번호  
`initialize()` 작업에 전달된 `InitializationInput` 객체에서 이전 레코드 프로세서 인스턴스가 중단되기 전에 커밋되지 못한 보류 중인 체크포인트 시퀀스 번호(있는 경우)입니다.

## IRecordProcessor 인터페이스를 위한 클래스 팩토리 구현
<a name="kinesis-record-processor-implementation-factory-java"></a>

레코드 프로세서 메서드를 구현하는 클래스 팩토리도 구현해야 합니다. 소비자가 작업자를 인스턴스화할 때 이 팩토리에 참조를 전달합니다.

이 샘플은 원래의 레코드 프로세서 인터페이스를 사용하여 `AmazonKinesisApplicationSampleRecordProcessorFactory.java` 파일에서 팩토리 클래스를 구현합니다. 클래스 팩토리에서 레코드 프로세서 버전 2를 만들려면 패키지 이름 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`를 사용하십시오.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## 작업자 생성
<a name="kcl-java-worker"></a>

[IRecordProcessor 메서드 구현](#kinesis-record-processor-implementation-interface-java)에서 설명한 대로 두 가지 KCL 레코드 프로세서 인터페이스 버전 중에서 선택할 수 있으며 이 선택은 워커 생성 방법에 영향을 줍니다. 원래의 레코드 프로세서 인터페이스는 다음 코드 구조를 사용하여 작업자를 생성합니다.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

레코드 프로세서 인터페이스 버전 2를 통해 인수의 순서와 사용할 생성자를 고민할 필요 없이 `Worker.Builder`를 사용하여 작업자를 만들 수 있습니다. 업데이트된 레코드 프로세서 인터페이스는 다음 코드 구조를 사용하여 작업자를 생성합니다.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 구성 속성 수정
<a name="kinesis-record-processor-initialization-java"></a>

이 샘플은 구성 속성의 기본값을 제공합니다. 그러면 작업자의 이 구성 데이터가 `KinesisClientLibConfiguration` 객체에 통합됩니다. `IRecordProcessor`의 클래스 팩토리에 대한 참조와 이 객체는 작업자를 인스턴스화하는 호출에서 전달됩니다. Java 속성 파일을 사용하여 이 속성을 사용자의 값으로 재정의할 수 있습니다(`AmazonKinesisApplicationSample.java` 참조).

### 애플리케이션 이름
<a name="configuration-property-application-name"></a>

KCL에는 애플리케이션 및 같은 리전의 Amazon DynamoDB 테이블에서 고유한 애플리케이션 이름이 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.
+ 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 이 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 동일한 애플리케이션 코드의 추가 인스턴스를 다른 애플리케이션 이름으로 실행하는 경우 KCL은 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 취급합니다.
+ KCL은 애플리케이션 이름이 있는 DynamoDB 테이블을 생성하고 테이블을 사용하여 애플리케이션의 상태 정보(예: 체크포인트 및 워커와 샤드의 매핑)를 보관합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 내용은 [리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable) 단원을 참조하십시오.

### 보안 인증 설정
<a name="kinesis-record-processor-cred-java"></a>

기본 AWS 자격 증명 공급자 체인의 자격 증명 공급자 중 하나가 자격 증명을 사용할 수 있도록 해야 합니다. 예를 들어, EC2 인스턴스에서 소비자를 실행하는 경우 IAM 역할로 인스턴스를 시작하는 것이 좋습니다. 이 IAM 역할과 연결된 권한을 반영하는 AWS 보안 인증은 인스턴스 메타데이터를 통해 인스턴스의 애플리케이션에 제공됩니다. 이것이 EC2 인스턴스에서 실행되는 소비자의 자격 증명을 관리하는 가장 안전한 방법입니다.

먼저 샘플 애플리케이션이 인스턴스 메타데이터에서 IAM 보안 인증 검색을 시도합니다.

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

샘플 애플리케이션이 인스턴스 메타데이터에서 자격 증명을 가져오지 못하면 속성 파일에서 자격 증명 검색을 시도합니다.

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

인스턴스 메타데이터에 대한 자세한 내용은 **Amazon EC2 사용 설명서의 [인스턴스 메타데이터](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)를 참조하세요.

### 여러 인스턴스에 작업자 ID 사용
<a name="kinesis-record-processor-workerid-java"></a>

샘플 초기화 코드는 로컬 컴퓨터 이름을 사용하고 다음 코드 조각과 같이 전역적으로 고유한 식별자를 추가하여 작업자의 ID인 `workerId`를 만듭니다. 이 방법은 단일 컴퓨터에서 소비자 애플리케이션의 여러 인스턴스가 실행되는 시나리오를 지원합니다.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## 레코드 프로세서 인터페이스 버전 2로 마이그레이션
<a name="kcl-java-v2-migration"></a>

위에서 설명한 단계 외에도 원래의 인터페이스를 사용하는 코드를 마이그레이션하려면 다음 단계가 필요합니다.

1. 버전 2 레코드 프로세서 인터페이스를 가져오도록 레코드 프로세서 클래스를 변경합니다.

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. 컨테이너 객체에서 `get` 메서드를 사용하도록 참조를 변경합니다. 예를 들어, `shutdown()` 작업에서 "`checkpointer`"를 "`shutdownInput.getCheckpointer()`"로 변경합니다.

1. 버전 2 레코드 프로세서 팩토리 인터페이스를 가져오도록 레코드 프로세서 팩토리 클래스를 변경합니다.

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. `Worker.Builder`를 사용하도록 작업자의 구성을 변경합니다. 예제:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Node.js로 Kinesis Client Library 소비자 개발
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 Node.js에 대해 설명합니다.

KCL은 Java 라이브러리이며, *MultiLangDaemon*이라는 다중 언어 인터페이스를 통해 Java 이외의 언어에 대한 지원이 제공됩니다. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 따라서 Node.js용 KCL을 설치하고 Node.js로만 소비자 앱을 작성한 경우에도 MultiLangDaemon 때문에 시스템에 Java를 설치해야 합니다. 또한 MultiLangDaemon에는 연결한 AWS 리전과 같이 사용 사례에 맞게 사용자 지정해야 할 몇 가지 기본 설정이 있습니다. GitHub의 MultiLangDaemon에 대한 자세한 내용은 [KCL MultiLangDaemon 프로젝트](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) 페이지를 참조하세요.

GitHub에서 Node.js KCL을 다운로드하려면 [Kinesis Client Library(Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs)로 이동하세요.

**샘플 코드 다운로드**

Node.js KCL에 두 가지 코드 샘플을 사용할 수 있습니다.
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Node.js로 KCL 소비자 애플리케이션을 빌드하기 위한 기초를 설명하기 위해 다음 섹션에서 사용됩니다.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   기본 샘플 코드에 익숙해진 후에 실제 시나리오를 사용할 수 있는 약간 높은 수준의 코드입니다. 여기서는 이 샘플을 설명하지 않으며 자세한 내용은 README 파일에 있습니다.

Node.js로 KCL 소비자 애플리케이션을 구현할 때 다음 작업을 완료해야 합니다.

**Topics**
+ [레코드 프로세서 구현](#kinesis-record-processor-implementation-interface-nodejs)
+ [구성 속성 수정](#kinesis-record-processor-initialization-nodejs)

## 레코드 프로세서 구현
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

Node.js용 KCL을 사용하는 가능한 한 가장 단순한 소비자는 `initialize`, `processRecords` 및 `shutdown` 함수를 차례로 포함하는 `recordProcessor` 함수를 구현해야 합니다. 이 샘플에서는 시작점으로 사용할 수 있는 구현을 제공합니다(`sample_kcl_app.js` 참조).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**초기화**  
레코드 프로세서가 시작되면 KCL은 `initialize` 함수를 직접적으로 호출합니다. 이 레코드 프로세서는 `initializeInput.shardId`로 전달된 샤드 ID만 처리하고 일반적으로 반대의 경우도 마찬가지입니다. 이 샤드는 해당 레코드 프로세서로만 처리됩니다. 하지만 소비자는 데이터 레코드가 두 번 이상 처리될 가능성을 고려해야 합니다. Kinesis Data Streams에서는 소비자의 워커가 샤드의 모든 데이터 레코드를 적어도 한 번은 처리한다는 *적어도 한 번* 의미론이 통용되기 때문입니다. 둘 이상의 작업자가 특정 샤드를 처리할 수 있는 경우에 대한 자세한 내용은 [리샤딩, 규모 조정 및 병렬 처리를 사용하여 샤드 수 변경](kinesis-record-processor-scaling.md)를 참조하십시오.

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL은 지정된 샤드에서 `initialize` 함수까지 데이터 레코드 목록을 포함하는 입력으로 이 함수를 직접적으로 호출합니다. 구현하는 레코드 프로세서가 소비자의 의미론에 따라 이 레코드의 데이터를 처리합니다. 예를 들어, 워커가 데이터를 전환한 후 그 결과를 Amazon Simple Storage Service(S3) 버킷에 저장할 수 있습니다.

```
processRecords: function(processRecordsInput, completeCallback)
```

데이터 자체뿐 아니라 작업자가 데이터를 처리할 때 사용할 수 있는 시퀀스 번호와 파티션 키도 데이터 레코드에 포함됩니다. 예를 들어, 작업자는 파티션 키의 값을 기반으로 데이터를 저장할 S3 버킷을 선택할 수 있습니다. `record` 딕셔너리가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하기 위해 다음 키-값 페어를 노출합니다.

```
record.data
record.sequenceNumber
record.partitionKey
```

데이터가 Base64로 인코딩됩니다.

기본 샘플에서 `processRecords` 함수에는 작업자가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하는 방법을 보여주는 코드가 있습니다.

Kinesis Data Streams는 샤드에서 이미 처리된 레코드를 추적하도록 레코드 프로세서에 요구합니다. KCL은 `processRecordsInput.checkpointer`로 전달된 `checkpointer` 객체를 통해 이 추적을 처리합니다. 레코드 프로세서는 `checkpointer.checkpoint` 함수를 직접적으로 호출하여 샤드의 레코드 처리가 얼마나 진행되었는지 KCL에 알려줍니다. 워커가 실패할 경우 KCL은 샤드 처리를 다시 시작할 때 마지막으로 처리된 레코드에서 계속되도록 이 정보를 사용합니다.

분할 또는 병합 작업의 경우 소스 샤드의 프로세서가 `checkpoint`를 직접적으로 호출하여 소스 샤드의 모든 처리가 완료되었다고 표시할 때까지 KCL은 새 샤드의 처리를 시작하지 않습니다.

`checkpoint` 함수에 시퀀스 번호를 전달하지 않으면 KCL은 `checkpoint`에 대한 호출이 레코드 프로세서에 전달된 마지막 레코드까지 모두 처리되었다는 의미로 간주합니다. 따라서 레코드 프로세서는 전달된 목록의 모든 레코드를 처리한 후에**만** `checkpoint`를 직접적으로 호출해야 합니다. 레코드 프로세서는 `checkpoint`를 호출할 때마다 `processRecords`를 호출할 필요가 없습니다. 예를 들어, 프로세서는 세 번째 호출마다 `checkpoint` 또는 구현된 사용자 지정 검증/확인 서비스와 같은 레코드 프로세서 외부의 이벤트를 호출할 수 있습니다.

선택적으로 레코드의 정확한 시퀀스 번호를 `checkpoint`의 파라미터로 지정할 수도 있습니다. 이 경우 KCL은 모든 레코드가 해당 레코드까지만 처리되었다고 간주합니다.

기본 샘플 애플리케이션은 `checkpointer.checkpoint` 함수의 가장 단순하고 가능한 호출을 보여줍니다. 소비자에 필요한 다른 검사 로직을 함수의 이 지점에 추가할 수 있습니다.

**종료**  
처리가 종료될 때(`shutdownInput.reason`이 `TERMINATE`) 또는 워커가 더 이상 응답하지 않을 때(`shutdownInput.reason`이 `ZOMBIE`) KCL은 `shutdown` 함수를 직접적으로 호출합니다.

```
shutdown: function(shutdownInput, completeCallback)
```

샤드 분할이나 병합 또는 스트림 삭제로 인해 레코드 프로세서가 샤드에서 추가 레코드를 수신하지 않으면 처리가 종료됩니다.

또한 KCL은 `shutdownInput.checkpointer` 객체를 `shutdown`에 전달합니다. 종료 이유가 `TERMINATE`이면 레코드 프로세서가 모든 데이터 레코드 처리를 완료했는지 확인하고 이 인터페이스의 `checkpoint` 함수를 호출해야 합니다.

## 구성 속성 수정
<a name="kinesis-record-processor-initialization-nodejs"></a>

이 샘플은 구성 속성의 기본값을 제공합니다. 이 속성을 사용자의 값으로 재정의할 수 있습니다(기본 샘플의 `sample.properties` 참조).

### 애플리케이션 이름
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL에는 애플리케이션 및 같은 리전의 Amazon DynamoDB 테이블에서 고유한 애플리케이션이 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.
+ 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 이 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 동일한 애플리케이션 코드의 추가 인스턴스를 다른 애플리케이션 이름으로 실행하는 경우 KCL은 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 취급합니다.
+ KCL은 애플리케이션 이름이 있는 DynamoDB 테이블을 생성하고 테이블을 사용하여 애플리케이션의 상태 정보(예: 체크포인트 및 워커와 샤드의 매핑)를 보관합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 내용은 [리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable) 단원을 참조하십시오.

### 보안 인증 설정
<a name="kinesis-record-processor-credentials-nodejs"></a>

기본 AWS 자격 증명 공급자 체인의 자격 증명 공급자 중 하나가 자격 증명을 사용할 수 있도록 해야 합니다. `AWSCredentialsProvider` 속성을 사용하여 자격 증명 공급자를 설정할 수 있습니다. `sample.properties` 파일에서 [기본 자격 증명 공급자 체인](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)의 자격 증명 공급자 중 하나에 자격 증명을 사용할 수 있도록 해야 합니다. Amazon EC2 인스턴스에서 소비자를 실행하는 경우 IAM 역할로 인스턴스를 구성하는 것이 좋습니다.이 IAM 역할과 연결된 권한을 반영하는 AWS 자격 증명은 인스턴스 메타데이터를 통해 인스턴스의 애플리케이션에 제공됩니다. 이것이 EC2 인스턴스에서 실행되는 소비자 애플리케이션의 자격 증명을 관리하는 가장 안전한 방법입니다.

다음 예제는 `sample_kcl_app.js`에 제공된 레코드 프로세서를 사용하여 `kclnodejssample`이라는 Kinesis 데이터 스트림을 처리하도록 KCL을 구성합니다.

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# .NET으로 Kinesis Client Library 소비자 개발
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 .NET에 대해 설명합니다.

KCL은 Java 라이브러리이며, *MultiLangDaemon*이라는 다중 언어 인터페이스를 통해 Java 이외의 언어에 대한 지원이 제공됩니다. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 따라서 .NET용 KCL을 설치하고 .NET으로만 소비자 앱을 작성한 경우에도 MultiLangDaemon 때문에 시스템에 Java를 설치해야 합니다. 또한 MultiLangDaemon에는 연결한 AWS 리전과 같이 사용 사례에 맞게 사용자 지정해야 할 몇 가지 기본 설정이 있습니다. GitHub의 MultiLangDaemon에 대한 자세한 내용은 [KCL MultiLangDaemon 프로젝트](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) 페이지를 참조하세요.

GitHub에서 .NET KCL을 다운로드하려면 [Kinesis Client Library(.NET)](https://github.com/awslabs/amazon-kinesis-client-net)로 이동하세요. .NET KCL 소비자 애플리케이션용 샘플 코드를 다운로드하려면 GitHub의 [.NET용 KCL 샘플 소비자 프로젝트](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) 페이지로 이동하세요.

.NET으로 KCL 소비자 애플리케이션을 구현할 때 다음 작업을 완료해야 합니다.

**Topics**
+ [IRecordProcessor 클래스 메서드 구현](#kinesis-record-processor-implementation-interface-dotnet)
+ [구성 속성 수정](#kinesis-record-processor-initialization-dotnet)

## IRecordProcessor 클래스 메서드 구현
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

소비자는 `IRecordProcessor`를 위해 다음의 메서드를 구현해야 합니다. 이 샘플 소비자는 시작점으로 사용할 수 있는 구현을 제공합니다(`SampleRecordProcessor`의 `SampleConsumer/AmazonKinesisSampleConsumer.cs` 클래스 참조).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**초기화**  
KCL은 레코드 프로세서가 인스턴스화될 때 `input` 파라미터(`input.ShardId`)에서 특정 샤드 ID를 전달하여 이 메서드를 직접적으로 호출합니다. 이 레코드 프로세서는 해당 샤드만 처리하고 일반적으로 반대의 경우도 마찬가지입니다. 이 샤드는 해당 레코드 프로세서로만 처리됩니다. 하지만 소비자는 데이터 레코드가 두 번 이상 처리될 가능성을 고려해야 합니다. Kinesis Data Streams에서는 소비자의 워커가 샤드의 모든 데이터 레코드를 적어도 한 번은 처리한다는 *적어도 한 번* 의미론이 통용되기 때문입니다. 둘 이상의 작업자가 특정 샤드를 처리할 수 있는 경우에 대한 자세한 내용은 [리샤딩, 규모 조정 및 병렬 처리를 사용하여 샤드 수 변경](kinesis-record-processor-scaling.md)를 참조하십시오.

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL은 `Initialize` 메서드를 통해 지정된 샤드에서 `input` 파라미터(`input.Records`)의 데이터 레코드 목록을 전달하여 이 메서드를 직접적으로 호출합니다. 구현하는 레코드 프로세서가 소비자의 의미론에 따라 이 레코드의 데이터를 처리합니다. 예를 들어, 워커가 데이터를 전환한 후 그 결과를 Amazon Simple Storage Service(S3) 버킷에 저장할 수 있습니다.

```
public void ProcessRecords(ProcessRecordsInput input)
```

데이터 자체뿐 아니라 시퀀스 번호와 파티션 키도 데이터 레코드에 포함됩니다. 작업자가 데이터를 처리할 때 이 값을 사용할 수 있습니다. 예를 들어, 작업자는 파티션 키의 값을 기반으로 데이터를 저장할 S3 버킷을 선택할 수 있습니다. `Record` 클래스는 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하기 위해 다음 항목을 노출합니다.

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

이 샘플의 메서드 `ProcessRecordsWithRetries`에는 작업자가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하는 방법을 보여주는 코드가 있습니다.

Kinesis Data Streams는 샤드에서 이미 처리된 레코드를 추적하도록 레코드 프로세서에 요구합니다. KCL은 `Checkpointer` 객체를 `ProcessRecords`(`input.Checkpointer`)에 전달하여 이 추적을 처리합니다. 레코드 프로세서는 `Checkpointer.Checkpoint` 메서드를 직접적으로 호출하여 샤드의 레코드 처리가 얼마나 진행되었는지를 KCL에 알려줍니다. 워커가 실패할 경우 KCL은 이 정보를 사용하여 마지막으로 처리된 레코드에서 샤드 처리를 다시 시작합니다.

분할 또는 병합 작업의 경우 소스 샤드의 프로세서가 `Checkpointer.Checkpoint`를 직접적으로 호출하여 소스 샤드의 모든 처리가 완료되었다고 표시할 때까지 KCL은 새 샤드의 처리를 시작하지 않습니다.

파라미터를 전달하지 않으면 KCL은 `Checkpointer.Checkpoint`에 대한 호출이 레코드 프로세서에 전달된 마지막 레코드까지 모두 처리되었다는 의미로 간주합니다. 따라서 레코드 프로세서는 전달된 목록에 있는 모든 레코드를 반드시 처리한 후에 `Checkpointer.Checkpoint`를 호출해야 합니다. 레코드 프로세서는 `Checkpointer.Checkpoint`를 호출할 때마다 `ProcessRecords`를 호출할 필요가 없습니다. 예를 들어, 프로세서는 세 번째 또는 네 번째 호출마다 `Checkpointer.Checkpoint`를 호출할 수 있습니다. 선택적으로 레코드의 정확한 시퀀스 번호를 `Checkpointer.Checkpoint`의 파라미터로 지정할 수도 있습니다. 이 경우 KCL은 레코드가 해당 레코드까지만 처리되었다고 간주합니다.

이 샘플에서는 프라이빗 메서드 `Checkpoint(Checkpointer checkpointer)`가 적절한 예외 처리 및 재시도 로직을 사영하여 `Checkpointer.Checkpoint` 메서드를 호출하는 방법을 보여줍니다.

.NET용 KCL은 데이터 레코드를 처리할 때 발생하는 예외를 처리하지 않는다는 점에서 다른 KCL 언어 라이브러리와는 다르게 예외를 처리합니다. 사용자 코드에서 확인할 수 없는 예외가 발생하면 프로그램이 중단됩니다.

**Shutdown**  
처리가 종료될 때(종료 이유가 `TERMINATE`) 또는 워커가 더 이상 응답하지 않을 때(종료 `input.Reason` 값이 `ZOMBIE`) KCL은 `Shutdown` 메서드를 직접적으로 호출합니다.

```
public void Shutdown(ShutdownInput input)
```

샤드 분할이나 병합 또는 스트림 삭제로 인해 레코드 프로세서가 샤드에서 추가 레코드를 수신하지 않으면 처리가 종료됩니다.

또한 KCL은 `Checkpointer` 객체를 `shutdown`에 전달합니다. 종료 이유가 `TERMINATE`이면 레코드 프로세서가 데이터 레코드 처리를 완료하고 이 인터페이스의 `checkpoint` 메서드를 호출해야 합니다.

## 구성 속성 수정
<a name="kinesis-record-processor-initialization-dotnet"></a>

이 샘플 소비자는 구성 속성의 기본값을 제공합니다. 속성을 사용자의 값으로 재정의할 수 있습니다(`SampleConsumer/kcl.properties` 참조).

### 애플리케이션 이름
<a name="modify-kinesis-record-processor-application-name"></a>

KCL에는 애플리케이션 및 같은 리전의 Amazon DynamoDB 테이블에서 고유한 애플리케이션이 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.
+ 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 이 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 동일한 애플리케이션 코드의 추가 인스턴스를 다른 애플리케이션 이름으로 실행하는 경우 KCL은 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 취급합니다.
+ KCL은 애플리케이션 이름이 있는 DynamoDB 테이블을 생성하고 테이블을 사용하여 애플리케이션의 상태 정보(예: 체크포인트 및 워커와 샤드의 매핑)를 보관합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 내용은 [리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable) 단원을 참조하십시오.

### 보안 인증 설정
<a name="kinesis-record-processor-creds-dotnet"></a>

기본 AWS 자격 증명 공급자 체인의 자격 증명 공급자 중 하나가 자격 증명을 사용할 수 있도록 해야 합니다. `AWSCredentialsProvider` 속성을 사용하여 자격 증명 공급자를 설정할 수 있습니다. [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties)에서 [기본 자격 증명 공급자 체인](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)의 자격 증명 공급자 중 하나에 대해 자격 증명을 사용할 수 있도록 해야 합니다. EC2 인스턴스에서 소비자 애플리케이션을 실행하는 경우 IAM 역할로 인스턴스를 구성하는 것이 좋습니다. 이 IAM 역할과 연결된 권한을 반영하는 AWS 보안 인증은 인스턴스 메타데이터를 통해 인스턴스의 애플리케이션에 제공됩니다. 이것이 EC2 인스턴스에서 실행되는 소비자의 자격 증명을 관리하는 가장 안전한 방법입니다.

샘플의 속성 파일에서는 `AmazonKinesisSampleConsumer.cs`에 제공된 레코드 프로세서를 사용하여 'words'라는 Kinesis 데이터 스트림을 처리하도록 KCL을 구성합니다.

# Python으로 Kinesis Client Library 소비자 개발
<a name="kinesis-record-processor-implementation-app-py"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 Python에 대해 설명합니다.

KCL은 Java 라이브러리이며, *MultiLangDaemon*이라는 다중 언어 인터페이스를 통해 Java 이외의 언어에 대한 지원이 제공됩니다. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 따라서 Python용 KCL을 설치하고 Python으로만 소비자 앱을 작성한 경우에도 MultiLangDaemon 때문에 시스템에 Java를 설치해야 합니다. 또한 MultiLangDaemon에는 연결한 AWS 리전과 같이 사용 사례에 맞게 사용자 지정해야 할 몇 가지 기본 설정이 있습니다. GitHub의 MultiLangDaemon에 대한 자세한 내용은 [KCL MultiLangDaemon 프로젝트](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) 페이지를 참조하세요.

GitHub에서 Python KCL을 다운로드하려면 [Kinesis Client Library(Python)](https://github.com/awslabs/amazon-kinesis-client-python)로 이동하세요. Python KCL 소비자 애플리케이션용 샘플 코드를 다운로드하려면 GitHub의 [Python용 KCL 샘플 프로젝트](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) 페이지로 이동하세요.

Python으로 KCL 소비자 애플리케이션을 구현할 때 다음 작업을 완료해야 합니다.

**Topics**
+ [RecordProcessor 클래스 메서드 구현](#kinesis-record-processor-implementation-interface-py)
+ [구성 속성 수정](#kinesis-record-processor-initialization-py)

## RecordProcessor 클래스 메서드 구현
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 클래스가 `RecordProcessorBase`를 확장하여 다음 메서드를 구현해야 합니다. 이 샘플에서는 시작점으로 사용할 수 있는 구현을 제공합니다(`sample_kclpy_app.py` 참조).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**초기화**  
KCL은 레코드 프로세서가 인스턴스화될 때 특정 샤드 ID를 파라미터로 전달하여 `initialize` 메서드를 직접적으로 호출합니다. 이 레코드 프로세서는 해당 샤드만 처리하고 일반적으로 반대의 경우도 마찬가지입니다. 이 샤드는 해당 레코드 프로세서로만 처리됩니다. 하지만 소비자는 데이터 레코드가 두 번 이상 처리될 가능성을 고려해야 합니다. Kinesis Data Streams에서는 소비자의 워커가 샤드의 모든 데이터 레코드를 적어도 한 번은 처리한다는 *적어도 한 번* 의미론이 통용되기 때문입니다. 둘 이상의 작업자가 특정 샤드를 처리할 수 있는 경우에 대한 자세한 내용은 [리샤딩, 규모 조정 및 병렬 처리를 사용하여 샤드 수 변경](kinesis-record-processor-scaling.md)를 참조하십시오.

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL은 `initialize` 메서드에 지정된 샤드의 데이터 레코드 목록을 전달하여 이 메서드를 직접적으로 호출합니다. 구현하는 레코드 프로세서가 소비자의 의미론에 따라 이 레코드의 데이터를 처리합니다. 예를 들어, 워커가 데이터를 전환한 후 그 결과를 Amazon Simple Storage Service(S3) 버킷에 저장할 수 있습니다.

```
def process_records(self, records, checkpointer) 
```

데이터 자체뿐 아니라 시퀀스 번호와 파티션 키도 데이터 레코드에 포함됩니다. 작업자가 데이터를 처리할 때 이 값을 사용할 수 있습니다. 예를 들어, 작업자는 파티션 키의 값을 기반으로 데이터를 저장할 S3 버킷을 선택할 수 있습니다. `record` 딕셔너리가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하기 위해 다음 키-값 페어를 노출합니다.

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

데이터가 Base64로 인코딩됩니다.

이 샘플의 메서드 `process_records`에는 작업자가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하는 방법을 보여주는 코드가 있습니다.

Kinesis Data Streams는 샤드에서 이미 처리된 레코드를 추적하도록 레코드 프로세서에 요구합니다. KCL은 `Checkpointer` 객체를 `process_records`에 전달하여 이 추적을 처리합니다. 레코드 프로세서는 해당 객체에서 `checkpoint` 메서드를 직접적으로 호출하여 샤드의 레코드 처리가 얼마나 진행되었는지를 KCL에 알려줍니다. 워커가 실패할 경우 KCL은 이 정보를 사용하여 마지막으로 처리된 레코드에서 샤드 처리를 다시 시작합니다.

분할 또는 병합 작업의 경우 소스 샤드의 프로세서가 `checkpoint`를 직접적으로 호출하여 소스 샤드의 모든 처리가 완료되었다고 표시할 때까지 KCL은 새 샤드의 처리를 시작하지 않습니다.

파라미터를 전달하지 않으면 KCL은 `checkpoint`에 대한 호출이 레코드 프로세서에 전달된 마지막 레코드까지 모두 처리되었다는 의미로 간주합니다. 따라서 레코드 프로세서는 전달된 목록에 있는 모든 레코드를 반드시 처리한 후에 `checkpoint`를 호출해야 합니다. 레코드 프로세서는 `checkpoint`를 호출할 때마다 `process_records`를 호출할 필요가 없습니다. 예를 들어, 프로세서는 세 번째 호출할 때마다 `checkpoint`를 호출할 수 있습니다. 선택적으로 레코드의 정확한 시퀀스 번호를 `checkpoint`의 파라미터로 지정할 수도 있습니다. 이 경우 KCL은 모든 레코드가 해당 레코드까지만 처리되었다고 간주합니다.

이 샘플에서는 프라이빗 메서드 `checkpoint`가 적절한 예외 처리 및 재시도 로직을 사영하여 `Checkpointer.checkpoint` 메서드를 호출하는 방법을 보여줍니다.

KCL은 `process_records`를 사용하여 데이터 레코드를 처리할 때 발생하는 모든 예외를 처리합니다. `process_records`에서 예외가 발생하면 KCL은 예외 이전에 `process_records`에 전달된 데이터 레코드를 건너뜁니다. 이러한 레코드는 예외가 발생한 프로세서 또는 소비자의 다른 레코드 프로세서로 다시 전송되지 않습니다.

**종료**  
 처리가 종료될 때(종료 이유가 `TERMINATE`) 또는 워커가 더 이상 응답하지 않을 때(종료 `reason`이 `ZOMBIE`) KCL은 `shutdown` 메서드를 직접적으로 호출합니다.

```
def shutdown(self, checkpointer, reason)
```

샤드 분할이나 병합 또는 스트림 삭제로 인해 레코드 프로세서가 샤드에서 추가 레코드를 수신하지 않으면 처리가 종료됩니다.

 또한 KCL은 `Checkpointer` 객체를 `shutdown`에 전달합니다. 종료 `reason`이 `TERMINATE`이면 레코드 프로세서가 데이터 레코드 처리를 완료하고 이 인터페이스의 `checkpoint` 메서드를 호출해야 합니다.

## 구성 속성 수정
<a name="kinesis-record-processor-initialization-py"></a>

이 샘플은 구성 속성의 기본값을 제공합니다. 속성을 사용자의 값으로 재정의할 수 있습니다(`sample.properties` 참조).

### 애플리케이션 이름
<a name="kinesis-record-processor-application-name-py"></a>

에는 다른 애플리케이션과 다르고 동일한 리전의 Amazon DynamoDB 테이블에서도 고유한 애플리케이션 이름이 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.
+ 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 이 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 동일한 애플리케이션 코드의 추가 인스턴스를 다른 애플리케이션 이름으로 실행하는 경우 KCL은 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 취급합니다.
+ KCL은 애플리케이션 이름이 있는 DynamoDB 테이블을 생성하고 테이블을 사용하여 애플리케이션의 상태 정보(예: 체크포인트 및 워커와 샤드의 매핑)를 보관합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 내용은 [리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable) 단원을 참조하십시오.

### 보안 인증 설정
<a name="kinesis-record-processor-creds-py"></a>

기본 AWS 자격 증명 공급자 체인의 자격 증명 공급자 중 하나가 자격 증명을 사용할 수 있도록 해야 합니다. `AWSCredentialsProvider` 속성을 사용하여 자격 증명 공급자를 설정할 수 있습니다. [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties)에서 [기본 자격 증명 공급자 체인](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)의 자격 증명 공급자 중 하나에 대해 자격 증명을 사용할 수 있도록 해야 합니다. Amazon EC2 인스턴스에서 소비자 애플리케이션을 실행하는 경우 IAM 역할로 인스턴스를 구성하는 것이 좋습니다. 이 IAM 역할과 연결된 권한을 반영하는 AWS 보안 인증은 인스턴스 메타데이터를 통해 인스턴스의 애플리케이션에 제공됩니다. 이것이 EC2 인스턴스에서 실행되는 소비자 애플리케이션의 자격 증명을 관리하는 가장 안전한 방법입니다.

샘플의 속성 파일에서는 `sample_kclpy_app.py`에 제공된 레코드 프로세서를 사용하여 'words'라는 Kinesis 데이터 스트림을 처리하도록 KCL을 구성합니다.

# Ruby로 Kinesis Client Library 소비자 개발
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 Ruby에 대해 설명합니다.

KCL은 Java 라이브러리이며, *MultiLangDaemon*이라는 다중 언어 인터페이스를 통해 Java 이외의 언어에 대한 지원이 제공됩니다. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 따라서 Ruby용 KCL을 설치하고 Ruby로만 소비자 앱을 작성한 경우에도 MultiLangDaemon 때문에 시스템에 Java를 설치해야 합니다. 또한 MultiLangDaemon에는 연결한 AWS 리전과 같이 사용 사례에 맞게 사용자 지정해야 할 몇 가지 기본 설정이 있습니다. GitHub의 MultiLangDaemon에 대한 자세한 내용은 [KCL MultiLangDaemon 프로젝트](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) 페이지를 참조하세요.

GitHub에서 Ruby KCL을 다운로드하려면 [Kinesis Client Library(Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby)로 이동하세요. Ruby KCL 소비자 애플리케이션용 샘플 코드를 다운로드하려면 GitHub의 [Ruby용 KCL 샘플 프로젝트](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) 페이지로 이동하세요.

KCL Ruby 지원 라이브러리에 대한 자세한 내용은 [KCL Ruby Gems 설명서](http://www.rubydoc.info/gems/aws-kclrb)를 참조하세요.

# KCL 2.x 소비자 개발
<a name="developing-consumers-with-kcl-v2"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

이 주제에서는 Kinesis Client Library(KCL) 버전 2.0을 사용하는 방법을 설명합니다.

KCL에 대한 자세한 내용은 [Kinesis Client Library 1.x를 사용하여 소비자 개발](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)에 나와 있는 개요를 참조하세요.

사용하려는 옵션에 따라 다음 주제 중에서 선택하세요.

**Topics**
+ [Java로 Kinesis Client Library 소비자 개발](kcl2-standard-consumer-java-example.md)
+ [Python으로 Kinesis Client Library 소비자 개발](kcl2-standard-consumer-python-example.md)
+ [KCL 2.x를 사용하여 향상된 팬아웃 소비자 개발](building-enhanced-consumers-kcl-retired.md)

# Java로 Kinesis Client Library 소비자 개발
<a name="kcl2-standard-consumer-java-example"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

다음 코드는 Java에서 `ProcessorFactory` 및 `RecordProcessor`의 구현 예를 보여 줍니다. 향상된 팬아웃 기능을 활용하고 싶다면 [향상된 팬아웃을 사용하는 소비자 만들기](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html)를 참조하십시오.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Python으로 Kinesis Client Library 소비자 개발
<a name="kcl2-standard-consumer-python-example"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 Python에 대해 설명합니다.

KCL은 Java 라이브러리이며, *MultiLangDaemon*이라는 다중 언어 인터페이스를 통해 Java 이외의 언어에 대한 지원이 제공됩니다. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 따라서 Python용 KCL을 설치하고 Python으로만 소비자 앱을 작성한 경우에도 MultiLangDaemon 때문에 시스템에 Java를 설치해야 합니다. 또한 MultiLangDaemon에는 연결한 AWS 리전과 같이 사용 사례에 맞게 사용자 지정해야 할 몇 가지 기본 설정이 있습니다. GitHub의 MultiLangDaemon에 대한 자세한 내용은 [KCL MultiLangDaemon 프로젝트](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) 페이지를 참조하세요.

GitHub에서 Python KCL을 다운로드하려면 [Kinesis Client Library(Python)](https://github.com/awslabs/amazon-kinesis-client-python)로 이동하세요. Python KCL 소비자 애플리케이션용 샘플 코드를 다운로드하려면 GitHub의 [Python용 KCL 샘플 프로젝트](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) 페이지로 이동하세요.

Python으로 KCL 소비자 애플리케이션을 구현할 때 다음 작업을 완료해야 합니다.

**Topics**
+ [RecordProcessor 클래스 메서드 구현](#kinesis-record-processor-implementation-interface-py)
+ [구성 속성 수정](#kinesis-record-processor-initialization-py)

## RecordProcessor 클래스 메서드 구현
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 클래스가 `RecordProcessorBase` 클래스를 확장하여 다음 메서드를 구현해야 합니다.

```
initialize
process_records
shutdown_requested
```

이 샘플의 구현을 시작점으로 이용할 수 있습니다.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## 구성 속성 수정
<a name="kinesis-record-processor-initialization-py"></a>

이 샘플은 아래 스크립트에 나와 있는 구성 속성의 기본값을 제공합니다. 속성을 원하는 값으로 재정의할 수 있습니다.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### 애플리케이션 이름
<a name="kinesis-record-processor-application-name-py"></a>

KCL에는 다른 애플리케이션과 다르고 동일한 리전의 Amazon DynamoDB 테이블에서도 고유한 애플리케이션 이름이 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.
+ 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 동일한 애플리케이션 코드의 추가 인스턴스를 다른 애플리케이션 이름으로 실행하는 경우 KCL은 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 취급합니다.
+ KCL은 애플리케이션 이름이 있는 DynamoDB 테이블을 생성하고 테이블을 사용하여 애플리케이션의 상태 정보(예: 체크포인트 및 워커와 샤드의 매핑)를 보관합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 내용은 [리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable) 단원을 참조하십시오.

### 자격 증명
<a name="kinesis-record-processor-creds-py"></a>

기본 AWS 자격 증명 공급자 [체인의 자격 증명 공급자 중 하나가 자격 증명을](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) 사용할 수 있도록 해야 합니다. `AWSCredentialsProvider` 속성을 사용하여 자격 증명 공급자를 설정할 수 있습니다. Amazon EC2 인스턴스에서 소비자 애플리케이션을 실행하는 경우 IAM 역할로 인스턴스를 구성하는 것이 좋습니다.이 IAM 역할과 연결된 권한을 반영하는 AWS 자격 증명은 인스턴스 메타데이터를 통해 인스턴스의 애플리케이션에 제공됩니다. 이것이 EC2 인스턴스에서 실행되는 소비자 애플리케이션의 자격 증명을 관리하는 가장 안전한 방법입니다.

# KCL 2.x를 사용하여 향상된 팬아웃 소비자 개발
<a name="building-enhanced-consumers-kcl-retired"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Amazon Kinesis Data Streams에서 **향상된 팬아웃을 사용하는 소비자는 샤드당 초당 최대 데이터 2MB의 전용 처리량으로 데이터 스트림에서 레코드를 수신할 수 있습니다. 이 유형의 소비자는 스트림으로부터 데이터를 수신하는 다른 소비자와 경쟁할 필요가 없습니다. 자세한 내용은 [전용 처리량으로 향상된 팬아웃 소비자 개발](enhanced-consumers.md) 단원을 참조하십시오.

Kinesis Client Library(KCL) 버전 2.0 이상을 사용하면 향상된 팬아웃을 사용하여 스트림으로부터 데이터를 수신하는 애플리케이션을 개발할 수 있습니다. KCL은 애플리케이션을 스트림의 모든 샤드에 자동으로 등록하므로 소비자 애플리케이션이 샤드당 2MB/sec의 처리 속도로 읽을 수 있습니다. 향상된 팬아웃을 켜지 않고 KCL을 사용하고 싶다면 [Developing Consumers Using the Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html)을 참조하세요.

**Topics**
+ [Java에서 KCL 2.x를 사용하여 향상된 팬아웃 소비자 개발](building-enhanced-consumers-kcl-java.md)

# Java에서 KCL 2.x를 사용하여 향상된 팬아웃 소비자 개발
<a name="building-enhanced-consumers-kcl-java"></a>

**중요**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션은 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션할 것을 **적극 권장**합니다. 최신 KCL 버전을 찾으려면 [GitHub의 Amazon Kinesis Client Library 페이지](https://github.com/awslabs/amazon-kinesis-client)를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](kcl.md) 섹션을 참조하세요. KCL 1.x에서 KCL 3.x로 마이그레이션하는 방법에 대한 자세한 내용은 [KCL 1.x에서 KCL 3.x로 마이그레이션](kcl-migration-1-3.md) 섹션을 참조하세요.

Kinesis Client Library(KCL) 버전 2.0 이상을 사용하면 Amazon Kinesis Data Streams에서 향상된 팬아웃을 통해 스트림에서 데이터를 수신하는 애플리케이션을 개발할 수 있습니다. 다음 코드는 Java에서 `ProcessorFactory` 및 `RecordProcessor`의 구현 예를 보여 줍니다.

`KinesisClientUtil`을 사용하여 `KinesisAsyncClient`를 만들고 `KinesisAsyncClient`에 `maxConcurrency`를 구성하는 것이 좋습니다.

**중요**  
Amazon Kinesis Client는 `KinesisAsyncClient`를 구성하여 `KinesisAsyncClient`의 전체 임대 수에 더해 추가 사용량까지 허용할 수 있을 만큼 `maxConcurrency`를 충분히 높이지 않을 경우 지연 시간이 크게 증가할 수 있습니다.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```