

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 생산자 구현
<a name="tutorial-stock-data-kplkcl2-producer"></a>

이 자습서에서는 주식 시장 거래 모니터링의 실제 시나리오를 사용합니다. 다음 원칙은 이 시나리오가 생산자와 생산자의 지원 코드 구조에 매핑되는 방법을 간략하게 설명합니다.

[소스 코드](https://github.com/aws-samples/amazon-kinesis-learning )를 참조하여 다음 정보를 검토하십시오.

**StockTrade 클래스**  
개별 주식 거래는 StockTrade 클래스의 인스턴스로 표시됩니다. 이 인스턴스에는 티커 기호, 가격, 공유 수, 거래 유형(구매 또는 판매), 거래를 고유하게 식별하는 ID 등의 속성이 포함됩니다. 이 클래스가 사용자를 위해 구현됩니다.

**스트림 레코드**  
스트림은 레코드의 시퀀스입니다. 레코드는 JSON 형식으로 된 `StockTrade` 인스턴스의 직렬화입니다. 예제:   

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator 클래스**  
StockTradeGenerator에는 호출될 때마다 임의로 생성된 새 주식 거래를 반환하는 `getRandomTrade()`라는 메서드가 있습니다. 이 클래스가 사용자를 위해 구현됩니다.

**StockTradesWriter 클래스**  
생산자의 `main` 메서드인 StockTradesWriter는 계속적으로 임의의 거래를 검색하고 다음 작업을 수행하여 Kinesis Data Streams에 전송합니다.  

1. 데이터 스트림 이름과 리전 이름을 입력으로 읽습니다.

1. `KinesisAsyncClientBuilder`를 사용하여 리전, 자격 증명 및 클라이언트 구성을 설정합니다.

1. 스트림의 존재 여부와 활성 상태 여부를 확인합니다. 그렇지 않은 경우 오류로 종료됩니다.

1. 연속 루프에서 `StockTradeGenerator.getRandomTrade()` 메서드를 호출하고 `sendStockTrade` 메서드를 호출하여 100밀리초마다 거래를 스트림으로 전송합니다.
`sendStockTrade` 클래스의 `StockTradesWriter` 메서드에는 다음 코드가 있습니다.  

```
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();
        // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
        if (bytes == null) {
            LOG.warn("Could not get JSON bytes for stock trade");
            return;
        }

        LOG.info("Putting trade: " + trade.toString());
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            LOG.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }
```

다음 코드 세부 분석을 참조하십시오.
+ `PutRecord` API에는 바이트 어레이가 필요하며, 거래를 JSON 형식으로 변환해야 합니다. 이 한 줄의 코드는 다음 작업을 수행합니다.

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 거래를 전송하기 전에 새 `PutRecordRequest` 인스턴스(이 경우 요청이라고 함)를 생성합니다. 각 `request`에는 스트림 이름, 파티션 키 및 데이터 BLOB가 필요합니다.

  ```
  PutPutRecordRequest request = PutRecordRequest.builder()
      .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
      .streamName(streamName)
      .data(SdkBytes.fromByteArray(bytes))
      .build();
  ```

  이 예제는 특정 샤드에 레코드를 매핑하는 주식 티커를 파티션 키로 사용합니다. 실제로 레코드가 스트림에 대해 균등하게 분산되도록 샤드당 수백 개 또는 수천 개의 파티션 키가 있어야 합니다. 스트림에 데이터를 추가하는 방법에 대한 자세한 내용은 [Amazon Kinesis Data Streams에 데이터 쓰기](building-producers.md) 단원을 참조하십시오.

  이제 `request`를 클라이언트에 전송할 준비가 되었습니다(put 작업).

  ```
     kinesisClient.putRecord(request).get();
  ```
+ 오류 확인과 로깅 기능은 항상 유용한 추가 기능입니다. 이 코드는 오류 조건을 기록합니다.

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  `put`넣기 작업에 try/catch 블록을 추가합니다.

  ```
  try {
   	kinesisClient.putRecord(request).get();
  } catch (InterruptedException e) {
              LOG.info("Interrupted, assuming shutdown.");
  } catch (ExecutionException e) {
              LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
  }
  ```

  이렇게 하는 이유는 네트워크 오류로 인해 또는 처리량 제한에 도달하여 병목 현상이 발생한 데이터 스트림으로 인해 Kinesis Data Streams put 작업이 실패할 수 있기 때문입니다. 데이터 손실을 방지하기 위해 재시도 사용과 같은 `put` 작업에 대한 재시도 정책을 신중히 고려하는 것이 좋습니다.
+ 상태 로깅은 유용하지만 선택 사항입니다.

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
여기에 표시된 생산자는 Kinesis Data Streams API 단일 레코드 기능인 `PutRecord`를 사용합니다. 실제로 개별 생산자가 많은 레코드를 생성하는 경우 `PutRecords`의 여러 레코드 기능을 사용하고 레코드의 배치를 한 번에 전송하는 것이 더 효율적인 경우가 많습니다. 자세한 내용은 [Amazon Kinesis Data Streams에 데이터 쓰기](building-producers.md) 단원을 참조하십시오.

**생산자를 실행하려면**

1. [IAM 정책 및 사용자 생성](tutorial-stock-data-kplkcl2-iam.md)에서 검색한 액세스 키 및 보안 키 페어가 파일 `~/.aws/credentials`에 저장되었는지 확인합니다.

1. 다음과 같은 인수를 사용하여 `StockTradeWriter` 클래스를 실행합니다.

   ```
   StockTradeStream us-west-2
   ```

   `us-west-2` 이외의 리전에서 스트림을 생성한 경우 해당 리전을 여기에 대신 지정해야 합니다.

다음과 유사한 출력 화면이 표시되어야 합니다.

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

이제 주식 거래가 Kinesis Data Streams에서 수집됩니다.

## 다음 단계
<a name="tutorial-stock-data-kplkcl2-producer-next"></a>

[소비자 구현](tutorial-stock-data-kplkcl2-consumer.md)