

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 자습서: KPL 및 KCL 2.x를 사용하여 실시간 주식 데이터 처리


이 자습서의 시나리오에서는 스트림에 주식 거래를 가져와 데이터 스트림에 대한 계산을 수행하는 기본 Amazon Kinesis Data Streams 애플리케이션을 작성합니다. 레코드의 스트림을 Kinesis Data Streams에 전송하고, 거의 실시간으로 레코드를 사용하고 처리하는 애플리케이션을 구현하는 방법에 대해 알아봅니다.

**중요**  
스트림을 생성한 후에는 Kinesis Data Streams가 AWS 프리 티어에 적합하지 않기 때문에 계정에 Kinesis Data Streams 사용에 대한 일반 요금이 발생합니다. 소비자 애플리케이션이 시작된 후 Amazon DynamoDB 사용량에 대해 일반 요금도 발생합니다. 소비자 애플리케이션은 DynamoDB를 사용하여 처리 상태를 추적합니다. 이 애플리케이션을 완료하면 AWS 리소스를 삭제하여 요금 발생을 중지하세요. 자세한 내용은 [리소스 정리](tutorial-stock-data-kplkcl2-finish.md) 단원을 참조하십시오.

이 코드는 실제 주식 시장 데이터에는 액세스하지 않지만, 대신 주식 거래의 스트림을 시뮬레이션합니다. 2015년 2월 현재 시가 총액 상위 25개 주식에 대한 실제 시장 데이터의 시작점이 있는 임의의 주식 거래 생성기를 사용하여 이를 수행합니다. 주식 거래의 실시간 스트림에 액세스할 수 있는 경우 스트림에서 유용하고 시기 적절한 통계를 추출하고 싶을 때도 있습니다. 예를 들어, 마지막 5분 이내에 구매한 가장 인기 있는 주식을 결정하는 슬라이딩 윈도우 분석을 수행하려고 할 수 있습니다. 또는 너무 많은 판매 주문(즉, 너무 많은 공유)이 있을 때마다 알림을 원할 수도 있습니다. 이 시리즈의 코드를 확장하여 이러한 기능을 제공할 수 있습니다.

데스크톱 또는 랩톱 컴퓨터에서 이 자습서의 단계를 완료하고, 동일한 머신에서 또는 정의된 요구 사항을 지원하는 플랫폼에서 생산자 코드와 소비자 코드를 모두 실행할 수 있습니다.

표시된 예제는 미국 서부(오레곤) 리전을 사용하지만 이 예제는 [Kinesis Data Streams를 지원하는 모든AWS 리전](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)에 적용됩니다.

**Topics**
+ [

# 사전 조건 완료
](tutorial-stock-data-kplkcl2-begin.md)
+ [

# 데이터 스트림 생성
](tutorial-stock-data-kplkcl2-create-stream.md)
+ [

# IAM 정책 및 사용자 생성
](tutorial-stock-data-kplkcl2-iam.md)
+ [

# 코드 다운로드 및 빌드
](tutorial-stock-data-kplkcl2-download.md)
+ [

# 생산자 구현
](tutorial-stock-data-kplkcl2-producer.md)
+ [

# 소비자 구현
](tutorial-stock-data-kplkcl2-consumer.md)
+ [

# (선택 사항) 소비자 확장
](tutorial-stock-data-kplkcl2-consumer-extension.md)
+ [

# 리소스 정리
](tutorial-stock-data-kplkcl2-finish.md)

# 사전 조건 완료


이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.

## Amazon Web Services 계정 생성 및 사용


시작하기 전에 [Amazon Kinesis Data Streams 용어 및 개념](key-concepts.md)에서 설명하는 개념을 잘 알고 있어야 합니다. 특히 스트림, 샤드, 생산자 및 소비자 개념을 잘 알아 두세요. 또한 [자습서: Kinesis Data Streams AWS CLI 용 설치 및 구성](kinesis-tutorial-cli-installation.md) 가이드의 단계를 완료하면 도움이 됩니다.

에 액세스하려면 AWS 계정과 웹 브라우저가 있어야 합니다 AWS Management Console.

콘솔 액세스의 경우 IAM 사용자 이름과 암호를 사용하여 IAM 로그인 페이지에서 [AWS Management Console](https://console.aws.amazon.com/console/home)에 로그인합니다. 프로그래밍 방식 액세스 및 장기 자격 증명의 대안을 포함한 AWS 보안 자격 증명에 대한 자세한 내용은 *IAM 사용 설명서*의 [AWS 보안 자격 증명을](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html) 참조하세요. 에 로그인하는 방법에 대한 자세한 내용은 *AWS 로그인 사용 설명서*의에 로그인하는 방법을 AWS 계정참조하세요. [AWS](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html) 

IAM 및 보안 키 설정 지침에 대한 자세한 내용은 [IAM 사용자 생성](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user)을 참조하세요.

## 시스템 소프트웨어 요구 사항 충족


애플리케이션을 실행하는 데 사용하는 시스템에는 Java 7 이상이 설치되어 있어야 합니다. 최신 JDK(Java Development Kit)를 다운로드하고 설치하려면 [Oracle의 Java SE 설치 사이트](http://www.oracle.com/technetwork/java/javase/downloads/index.html)로 이동하십시오.

최신 [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) 버전이 필요합니다.

소비자 애플리케이션에는 Kinesis Client Library(KCL) 버전 2.2.9 이상이 필요하며, GitHub 사이트 [https://github.com/awslabs/amazon-kinesis-client/tree/master](https://github.com/awslabs/amazon-kinesis-client/tree/master)에서 얻을 수 있습니다.

## 다음 단계


[데이터 스트림 생성](tutorial-stock-data-kplkcl2-create-stream.md)

# 데이터 스트림 생성


먼저, 이 자습서의 후속 단계에서 사용할 데이터 스트림을 만들어야 합니다.

**스트림을 만들려면**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. 탐색 모음에서 리전 선택기를 확장하고 리전을 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택합니다.

1. 데이터 스트림의 이름을 입력합니다(예: **StockTradeStream**).

1. 샤드 수에는 **1**을 입력하고, **필요한 샤드 수 추정**은 축소된 상태로 둡니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택합니다.

**(Kinesis 스트림** 목록 페이지에서 스트림 상태는 스트림이 생성되는 동안 `CREATING`으로 표시됩니다. 스트림을 사용할 준비가 되면 상태가 `ACTIVE`(활성)로 변경됩니다.

스트림 이름을 선택하면 다음에 나타나는 페이지의 **세부 정보** 탭에 데이터 스트림 구성의 요약이 표시됩니다. **Monitoring(모니터링)** 섹션에는 스트림에 대한 모니터링 정보가 표시됩니다.

## 다음 단계


[IAM 정책 및 사용자 생성](tutorial-stock-data-kplkcl2-iam.md)

# IAM 정책 및 사용자 생성


보안 모범 사례에 AWS 따라 세분화된 권한을 사용하여 다양한 리소스에 대한 액세스를 제어해야 합니다. AWS Identity and Access Management (IAM)을 사용하면에서 사용자 및 사용자 권한을 관리할 수 있습니다 AWS. [IAM 정책](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html)에는 허용된 작업과 작업이 적용되는 리소스가 명시적으로 나열됩니다.

다음은 Kinesis Data Streams 생산자 및 소비자에 대해 일반적으로 필요한 최소 권한입니다.


**생산자**  

| 작업 | Resource | 용도 | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis 데이터 스트림 | 레코드를 읽으려고 하기 전에 소비자는 데이터 스트림이 존재하는지, 데이터 스트림이 활성 상태인지, 샤드가 데이터 스트림에 포함되어 있는지를 확인합니다. | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis 데이터 스트림 | 소비자를 구독하고 샤드에 등록합니다. | 
| PutRecord, PutRecords | Kinesis 데이터 스트림 | Kinesis Data Streams에 레코드를 씁니다. | 


**소비자**  

| **작업** | **리소스** | **용도** | 
| --- | --- | --- | 
| DescribeStream | Kinesis 데이터 스트림 | 레코드를 읽으려고 하기 전에 소비자는 데이터 스트림이 존재하는지, 데이터 스트림이 활성 상태인지, 샤드가 데이터 스트림에 포함되어 있는지를 확인합니다. | 
| GetRecords, GetShardIterator  | Kinesis 데이터 스트림 | 샤드에서 레코드를 읽습니다. | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB 테이블 | Kinesis Client Library(KCL)(버전 1.x 또는 2.x)를 사용하여 소비자를 개발하는 경우, 애플리케이션의 처리 상태를 추적하려면 DynamoDB 테이블에 대한 권한이 필요합니다. | 
| DeleteItem | Amazon DynamoDB 테이블 | 소비자가 Kinesis Data Streams 샤드에서 분할/병합 작업을 수행하는 경우에 사용됩니다. | 
| PutMetricData | Amazon CloudWatch 로그 | 또한 KCL은 애플리케이션을 모니터링하는 데 유용한 지표를 CloudWatch에 업로드합니다. | 

이 자습서에서는 위의 모든 권한을 부여하는 단일 IAM 정책을 생성합니다.구현에 사용되므로 프로덕션 환경에서는 생산자와 소비자에 대해 각각 하나씩 정책을 두 개 만들 수 있습니다.

**IAM 정책을 만들려면**

1. 위 단계에서 생성한 새 데이터 스트림의 Amazon 리소스 이름(ARN)을 찾습니다. **세부 정보** 탭 상단에 **스트림 ARN**으로 나열된 이 ARN을 찾을 수 있습니다. ARN 형식은 다음과 같습니다.

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*리전*  
 AWS 리전 코드. 예: `us-west-2`. 자세한 내용은 [리전 및 가용 영역 개념](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones)을 참조하십시오.  
*계정*  
 AWS 계정 [설정에](https://console.aws.amazon.com/billing/home?#/account) 표시된 계정 ID입니다.  
*이름*  
위 단계에서 생성한 데이터 스트림의 이름입니다(`StockTradeStream`).

1. 소비자가 사용할 DynamoDB 테이블의 ARN을 결정합니다(첫 번째 소비자 인스턴스에서 생성됨). 형식은 다음과 같아야 합니다.

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   리전 및 계정 ID는 이 자습서에서 사용하는 데이터 스트림의 ARN에 있는 값과 동일하지만, **이름은 소비자 애플리케이션에서 생성하고 사용하는 DynamoDB 테이블의 이름입니다. KCL은 애플리케이션 이름을 테이블 이름으로 사용합니다. 이 단계에서는 DynamoDB 테이블 이름에 `StockTradesProcessor`를 사용합니다. 이 이름은 이 자습서의 이후 단계에서 사용되는 애플리케이션 이름이기 때문입니다.

1. IAM 콘솔의 **정책**([https://console.aws.amazon.com/iam/home\$1policies](https://console.aws.amazon.com/iam/home#policies))에서 **정책 생성**을 선택합니다. IAM 정책을 사용한 첫 번째 작업인 경우 **시작하기**, **정책 생성**을 선택합니다.

1. **정책 생성기** 옆의 **선택**을 선택합니다.

1. **Amazon Kinesis**를 AWS 서비스로 선택합니다.

1. `DescribeStream`, `GetShardIterator`, `GetRecords`, `PutRecord` 및 `PutRecords`를 허용된 작업으로 선택합니다.

1. 이 자습서에서 사용하는 데이터 스트림의 ARN을 입력합니다.

1. 다음의 각각에 대해 **Add Statement(문 추가)**를 사용합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html)

   별표(`*`)는 ARN이 필요하지 않다고 지정할 때 사용됩니다. 이 경우에는 `PutMetricData` 작업이 간접적으로 호출된 CloudWatch에서 특정 리소스가 없기 때문입니다.

1. **다음 단계**를 선택합니다.

1. **Policy Name(정책 이름)**을 `StockTradeStreamPolicy`로 변경하고, 코드를 검토한 다음 **Create Policy(정책 생성)**를 선택합니다.

결과 정책 문서는 다음과 같아야 합니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**IAM 사용자 생성**

1. IAM 콘솔([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/))을 엽니다.

1. **사용자** 페이지에서 **사용자 추가**를 선택합니다.

1. **사용자 이름**에 `StockTradeStreamUser`을 입력합니다.

1. **Access type(액세스 유형)**에서 **Programmatic access(프로그래밍 방식 액세스)**를 선택한 다음 **Next: Permissions(다음: 권한)**를 선택합니다.

1. **기존 정책 직접 첨부**를 선택합니다.

1. 위의 절차에서 만든 정책을 이름으로 검색합니다(`StockTradeStreamPolicy`). 정책 이름 왼쪽에 있는 확인란을 선택하고 **Next: Review(다음: 검코)**를 선택합니다.

1. 세부 정보와 요약을 검토하고 **Create user(사용자 생성)**를 선택합니다.

1. **Access key ID(액세스 키 ID)**를 복사하고 비공개로 저장합니다. **Secret access key(보안 액세스 키)**에서 **Show(표시)**를 선택하고 키도 비공개로 저장합니다.

1. 액세스 및 보안 키를 사용자만 액세스할 수 있는 안전한 위치에 있는 로컬 파일에 붙여넣습니다. 이 애플리케이션의 경우 ` ~/.aws/credentials`라는 파일 이름을 생성합니다(엄격한 권한 포함). 파일은 다음 형식이어야 합니다.

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**사용자에게 IAM 정책 연결**

1. IAM 콘솔에서 [정책](https://console.aws.amazon.com/iam/home?#policies)을 열고 **정책 작업**을 선택합니다.

1. `StockTradeStreamPolicy` 및 **Attach(연결)**를 선택합니다.

1. `StockTradeStreamUser` 및 **Attach Policy(정책 연결)**를 선택합니다.

## 다음 단계


[코드 다운로드 및 빌드](tutorial-stock-data-kplkcl2-download.md)

# 코드 다운로드 및 빌드


이 주제에서는 데이터 스트림으로 샘플 주식 거래 수집(*생산자*) 및 이 데이터 처리(*소비자*)에 대한 샘플 구현 코드를 제공합니다.

**코드를 다운로드하고 빌드하려면**

1. GitHub 저장소 [https://github.com/aws-samples/amazon-kinesis-learning](https://github.com/aws-samples/amazon-kinesis-learning)에서 컴퓨터로 소스 코드를 다운로드합니다.

1. 제공된 디렉터리 구조를 따라 소스 코드를 사용하여 IDE에서 프로젝트를 생성합니다.

1. 프로젝트에 다음 라이브러리를 추가합니다.
   + Amazon Kinesis Client Library(KCL)
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava(Google Core Libraries For Java)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat: CBOR
   + Joda Time

1. IDE에 따라 프로젝트가 자동으로 빌드될 수 있습니다. 그렇지 않으면 IDE에 적합한 단계를 사용하여 프로젝트를 빌드하십시오.

이러한 단계를 성공적으로 완료한 경우 이제 다음 단원([생산자 구현](tutorial-stock-data-kplkcl2-producer.md))으로 이동할 준비가 되었습니다.

## 다음 단계


[[생산자 구현](tutorial-stock-data-kplkcl2-producer.md)생산자 구현](tutorial-stock-data-kplkcl2-producer.md)

# 생산자 구현


이 자습서에서는 주식 시장 거래 모니터링의 실제 시나리오를 사용합니다. 다음 원칙은 이 시나리오가 생산자와 생산자의 지원 코드 구조에 매핑되는 방법을 간략하게 설명합니다.

[소스 코드](https://github.com/aws-samples/amazon-kinesis-learning )를 참조하여 다음 정보를 검토하십시오.

**StockTrade 클래스**  
개별 주식 거래는 StockTrade 클래스의 인스턴스로 표시됩니다. 이 인스턴스에는 티커 기호, 가격, 공유 수, 거래 유형(구매 또는 판매), 거래를 고유하게 식별하는 ID 등의 속성이 포함됩니다. 이 클래스가 사용자를 위해 구현됩니다.

**스트림 레코드**  
스트림은 레코드의 시퀀스입니다. 레코드는 JSON 형식으로 된 `StockTrade` 인스턴스의 직렬화입니다. 예제:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator 클래스**  
StockTradeGenerator에는 호출될 때마다 임의로 생성된 새 주식 거래를 반환하는 `getRandomTrade()`라는 메서드가 있습니다. 이 클래스가 사용자를 위해 구현됩니다.

**StockTradesWriter 클래스**  
생산자의 `main` 메서드인 StockTradesWriter는 계속적으로 임의의 거래를 검색하고 다음 작업을 수행하여 Kinesis Data Streams에 전송합니다.  

1. 데이터 스트림 이름과 리전 이름을 입력으로 읽습니다.

1. `KinesisAsyncClientBuilder`를 사용하여 리전, 자격 증명 및 클라이언트 구성을 설정합니다.

1. 스트림의 존재 여부와 활성 상태 여부를 확인합니다. 그렇지 않은 경우 오류로 종료됩니다.

1. 연속 루프에서 `StockTradeGenerator.getRandomTrade()` 메서드를 호출하고 `sendStockTrade` 메서드를 호출하여 100밀리초마다 거래를 스트림으로 전송합니다.
`sendStockTrade` 클래스의 `StockTradesWriter` 메서드에는 다음 코드가 있습니다.  

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

다음 코드 세부 분석을 참조하십시오.
+ `PutRecord` API에는 바이트 어레이가 필요하며, 거래를 JSON 형식으로 변환해야 합니다. 이 한 줄의 코드는 다음 작업을 수행합니다.

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 거래를 전송하기 전에 새 `PutRecordRequest` 인스턴스(이 경우 요청이라고 함)를 생성합니다. 각 `request`에는 스트림 이름, 파티션 키 및 데이터 BLOB가 필요합니다.

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  이 예제는 특정 샤드에 레코드를 매핑하는 주식 티커를 파티션 키로 사용합니다. 실제로 레코드가 스트림에 대해 균등하게 분산되도록 샤드당 수백 개 또는 수천 개의 파티션 키가 있어야 합니다. 스트림에 데이터를 추가하는 방법에 대한 자세한 내용은 [Amazon Kinesis Data Streams에 데이터 쓰기](building-producers.md) 단원을 참조하십시오.

  이제 `request`를 클라이언트에 전송할 준비가 되었습니다(put 작업).

  ```
     kinesisClient.putRecord(request).get();
  ```
+ 오류 확인과 로깅 기능은 항상 유용한 추가 기능입니다. 이 코드는 오류 조건을 기록합니다.

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  `put`넣기 작업에 try/catch 블록을 추가합니다.

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  이렇게 하는 이유는 네트워크 오류로 인해 또는 처리량 제한에 도달하여 병목 현상이 발생한 데이터 스트림으로 인해 Kinesis Data Streams put 작업이 실패할 수 있기 때문입니다. 데이터 손실을 방지하기 위해 재시도 사용과 같은 `put` 작업에 대한 재시도 정책을 신중히 고려하는 것이 좋습니다.
+ 상태 로깅은 유용하지만 선택 사항입니다.

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
여기에 표시된 생산자는 Kinesis Data Streams API 단일 레코드 기능인 `PutRecord`를 사용합니다. 실제로 개별 생산자가 많은 레코드를 생성하는 경우 `PutRecords`의 여러 레코드 기능을 사용하고 레코드의 배치를 한 번에 전송하는 것이 더 효율적인 경우가 많습니다. 자세한 내용은 [Amazon Kinesis Data Streams에 데이터 쓰기](building-producers.md) 단원을 참조하십시오.

**생산자를 실행하려면**

1. [IAM 정책 및 사용자 생성](tutorial-stock-data-kplkcl2-iam.md)에서 검색한 액세스 키 및 보안 키 페어가 파일 `~/.aws/credentials`에 저장되었는지 확인합니다.

1. 다음과 같은 인수를 사용하여 `StockTradeWriter` 클래스를 실행합니다.

   ```
   StockTradeStream us-west-2
   ```

   `us-west-2` 이외의 리전에서 스트림을 생성한 경우 해당 리전을 여기에 대신 지정해야 합니다.

다음과 유사한 출력 화면이 표시되어야 합니다.

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

이제 주식 거래가 Kinesis Data Streams에서 수집됩니다.

## 다음 단계


[소비자 구현](tutorial-stock-data-kplkcl2-consumer.md)

# 소비자 구현


이 자습서의 소비자 애플리케이션은 데이터 스트림에서 주식 거래를 지속적으로 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. 애플리케이션은 소비자 앱에 공통적인 과중한 업무를 많이 수행하는 Kinesis Client Library(KCL)를 기반으로 하여 빌드됩니다. 자세한 내용은 [KCL 1.x 및 2.x 정보](shared-throughput-kcl-consumers.md) 단원을 참조하십시오.

소스 코드를 참조하여 다음 정보를 검토하십시오.

**StockTradesProcessor 클래스**  
다음 작업을 수행하는 소비자의 기본 클래스가 제공됩니다.  
+ 인수로 전달된 애플리케이션, 데이터 스트림 및 리전 이름을 읽습니다.
+ 리전 이름을 사용하여 `KinesisAsyncClient` 인스턴스를 생성합니다.
+ `StockTradeRecordProcessorFactory` 인스턴스에 의해 구현된 `ShardRecordProcessor`의 서버 인스턴스를 제공하는 `StockTradeRecordProcessor` 인스턴스를 생성합니다.
+ `KinesisAsyncClient`, `StreamName`, `ApplicationName` 및 `StockTradeRecordProcessorFactory` 인스턴스를 사용하여 `ConfigsBuilder` 인스턴스를 생성합니다. 이 기능은 기본값을 사용하여 모든 구성을 생성하는 경우에 유용합니다.
+ `ConfigsBuilder` 인스턴스를 사용하여 KCL 스케줄러(이전에 KCL 버전 1.x에서는 KCL 작업자라고 함)를 생성합니다.
+ 스케줄러는 각 샤드(이 소비자 인스턴스에 할당된 샤드)에 대해 새 스레드를 생성합니다. 이 스레드는 데이터 스트림에서 계속 반복적으로 레코드를 읽습니다. 그런 다음 `StockTradeRecordProcessor` 인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.

**StockTradeRecordProcessor 클래스**  
`StockTradeRecordProcessor` 인스턴스의 구현입니다. 이 클래스는 다시 `initialize`, `processRecords`, `leaseLost`, `shardEnded` 및 `shutdownRequested`라는 다섯 가지 필수 메서드를 구현합니다.  
`initialize` 및 `shutdownRequested` 메서드는 KCL에서 레코드 수신을 시작할 준비가 될 때 및 레코드 수신을 중지해야 할 때 애플리케이션별 설정 및 종료 작업을 수행할 수 있도록 레코드 프로세서에 알리기 위해 사용됩니다. `leaseLost` 및 `shardEnded`는 리스가 손실되거나 처리가 샤드의 끝에 도달할 때 수행할 작업에 대한 로직을 구현하는 데 사용됩니다. 이 예에서는 이러한 이벤트를 나타내는 메시지만 기록합니다.  
이러한 메서드에 대한 코드가 제공됩니다. `processRecords` 메서드에서 기본 처리가 발생하며, 각 레코드에 대해 `processRecord`를 사용합니다. 이 후자의 메서드는 다음 단계에서 구현할 수 있도록 대부분 비어 있는 스켈레톤 코드로 사용자에게 제공됩니다. 이 코드에 대해서는 다음 단계에서 더 자세히 설명합니다.  
또한 `processRecord`: `reportStats` 및 `resetStats`에 대한 지원 메서드의 구현도 중요합니다. 이러한 메서드는 원래 소스 코드에서 비어 있습니다.  
`processRecords` 메서드가 구현되며 다음 단계를 수행합니다.  
+ 전달된 각 레코드에 대해 `processRecord`를 호출합니다.
+ 마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는 `reportStats()`를 호출한 후 통계를 지우는 `resetStats()`를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다.
+ 다음 보고 시간을 설정합니다.
+ 마지막 체크포인트 이후 1분 이상이 경과된 경우 `checkpoint()`를 호출합니다.
+ 다음 검사 시간을 설정합니다.
이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 체크포인트 수행에 대한 자세한 내용은 [Using the Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)를 참조하세요.

**StockStats 클래스**  
이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.  
+ `addStockTrade(StockTrade)`: 지정된 `StockTrade`를 실행 중인 통계에 주입합니다.
+ `toString()`: 형식이 지정된 문자열로 통계를 반환합니다.
이 클래스는 각 주식에 대한 총 거래 수의 실행 개수와 최대 개수를 유지하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.

다음 단계에 표시된 대로 `StockTradeRecordProcessor` 클래스의 메서드에 코드를 추가합니다.

**소비자를 구현하려면**

1. 정확한 크기의 `processRecord` 객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여 `StockTrade` 메서드를 구현합니다.

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. `reportStats` 메서드를 구현합니다. 기본 설정에 적합하게 출력 형식을 수정합니다.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. `resetStats` 메서드를 구현합니다. 이 메서드는 새 `stockStats` 인스턴스를 생성합니다.

   ```
   stockStats = new StockStats();
   ```

1. `ShardRecordProcessor` 인터페이스에 필요한 다음과 같은 메서드를 구현합니다.

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**소비자를 실행하려면**

1. [[생산자 구현](tutorial-stock-data-kplkcl2-producer.md)생산자 구현](tutorial-stock-data-kplkcl2-producer.md)에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.

1. 앞에서(IAM 사용자를 생성할 때) 검색한 액세스 키 및 보안 키 페어가 `~/.aws/credentials` 파일에 저장되었는지 확인합니다.

1. 다음과 같은 인수를 사용하여 `StockTradesProcessor` 클래스를 실행합니다.

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   `us-west-2` 이외의 리전에 스트림을 생성한 경우 여기에 해당 리전을 대신 지정해야 합니다.

1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## 다음 단계


[(선택 사항) 소비자 확장](tutorial-stock-data-kplkcl2-consumer-extension.md)

# (선택 사항) 소비자 확장


이 단원은 선택 사항이며 더 자세한 시나리오를 위해 소비자 코드를 확장할 수 있는 방법을 보여줍니다.

1분마다 가장 큰 판매 주문을 파악하려는 경우, 세 위치에서 `StockStats` 클래스를 수정하여 이 새로운 우선순위를 수용할 수 있습니다.

**소비자를 확장하려면**

1. 새 인스턴스 변수를 추가합니다.

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. 다음 코드를 `addStockTrade`에 추가합니다.

   ```
   if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. `toString` 메서드를 수정하여 추가 정보를 인쇄합니다.

   ```
    
   public String toString() {
       return String.format(
           "Most popular stock being bought: %s, %d buys.%n" +
           "Most popular stock being sold: %s, %d sells.%n" +
           "Largest sell order: %d shares of %s.",
           getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
           getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
           largestSellOrderQuantity, largestSellOrderStock);
   }
   ```

이제 소비자를 실행하면(생산자도 실행함을 주의) 다음과 비슷한 출력 화면이 표시되어야 합니다.

```
 
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## 다음 단계


[리소스 정리](tutorial-stock-data-kplkcl2-finish.md)

# 리소스 정리


Kinesis 데이터 스트림 사용 요금을 결제하고 있으므로 작업이 완료되면 이 스트림과 해당 Amazon DynamoDB 테이블을 삭제해야 합니다. 레코드를 전송하거나 가져오지 않는 경우에도 활성 스트림에 대해 일반 요금이 부과됩니다. 이는 활성 스트림이 레코드를 가져오라는 요청과 들어오는 레코드를 지속적으로 "수신"하여 리소스를 사용하고 있기 때문입니다.

**스트림과 테이블을 삭제하려면**

1. 계속 실행 중일 수 있는 생산자와 소비자를 종료합니다.

1. [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)에서 Kinesis 콘솔을 엽니다.

1. 이 애플리케이션에 대해 생성한 스트림을 선택합니다(`StockTradeStream`).

1. **Delete Stream(스트림 삭제)**을 선택합니다.

1. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)에서 DynamoDB 콘솔을 엽니다.

1. `StockTradesProcessor` 테이블을 삭제합니다.

## 요약


대량의 데이터를 거의 실시간으로 처리하는 데에는 복잡한 코드를 작성하거나 거대한 인프라를 개발할 필요가 없습니다. 기본 로직을 작성하여 소량의 데이터를 처리할 수 있지만(예: `processRecord(Record)` 작성), Kinesis Data Streams를 사용하여 확장할 수 있으므로 대량의 스트리밍 데이터에도 효과적입니다. Kinesis Data Streams에서 자동으로 처리되므로 처리를 확장하는 방법에 대해 걱정할 필요가 없습니다. 사용자는 스트리밍 레코드를 Kinesis Data Streams에 전송하고 수신된 각 새 레코드를 처리하는 로직을 작성하면 됩니다.

이 애플리케이션에 대한 몇 가지 잠재적 개선 사항이 있습니다.

**모든 샤드에 대한 집계**  
현재는 단일 샤드에서 단일 작업자가 수신한 데이터 레코드의 집계로 인해 생성된 통계를 가져옵니다. (단일 애플리케이션에서 동시에 둘 이상의 작업자가 하나의 샤드를 처리할 수 없음) 물론, 샤드를 확장하여 샤드가 두 개 이상인 경우 모든 샤드에 대해 집계할 수 있습니다. 각 작업자의 출력이 단일 샤드가 있는 다른 스트림으로 공급되어 첫 단계의 출력을 집계하는 작업자가 처리하는 파이프라인 아키텍처를 구축하여 이를 수행할 수 있습니다. 첫 단계의 데이터가 제한(샤드마다 분당 샘플 하나)되므로 샤드 하나로 쉽게 처리할 수 있습니다.

**처리 확장**  
스트림이 여러 샤드가 있도록 확장되면(여러 생산자가 데이터를 전송하기 때문) 더 많은 작업자를 추가하는 방식으로 처리가 확장됩니다. Amazon EC2 인스턴스에서 워커를 실행하고 오토 스케일링을 사용할 수 있습니다.

**Amazon S3/DynamoDB/Amazon Redshift/Storm에 대한 커넥터 사용**  
스트림이 지속적으로 처리되면 출력을 다른 대상으로 전송할 수 있습니다.는 Kinesis Data Streams를 다른 AWS 서비스 및 타사 도구와 통합하기 위한 [커넥터를](https://github.com/awslabs/amazon-kinesis-connectors) AWS 제공합니다.