

# Kinesis 연결
<a name="aws-glue-programming-etl-connect-kinesis-home"></a>

Data Catalog 테이블에 저장된 정보를 사용하거나 데이터 스트림에 직접 액세스할 수 있는 정보를 제공하여 Kinesis 연결을 통해 Amazon Kinesis 데이터 스트림에서 읽고 쓸 수 있습니다. Kinesis의 정보를 Spark DataFrame으로 읽은 다음 AWS Glue DynamicFrame으로 변환할 수 있습니다. DynamicFrame을 JSON 형식으로 Kinesis에 쓸 수 있습니다. 데이터 스트림에 직접 액세스하는 경우 이러한 옵션을 사용하여 데이터 스트림에 액세스하는 방법에 대한 정보를 제공합니다.

`getCatalogSource` 또는 `create_data_frame_from_catalog`를 통해 Kinesis 스트리밍 소스의 레코드를 사용하는 경우 작업에 데이터 카탈로그 데이터베이스 및 테이블 이름 정보가 있으며, 해당 정보를 사용하여 Kinesis 스트리밍 소스에서 읽기 위한 몇 가지 기본 파라미터를 얻을 수 있습니다. `getSource`, `getSourceWithFormat`, `createDataFrameFromOptions` 또는 `create_data_frame_from_options`를 사용하는 경우 여기에 설명된 연결 옵션을 통해 이러한 기본 파라미터를 지정해야 합니다.

`GlueContext` 클래스에 지정된 메서드에 대해 다음 인수를 사용하여 Kinesis에 대한 연결 옵션을 지정할 수 있습니다.
+ Scala
  + `connectionOptions`: `getSource`, `createDataFrameFromOptions`, `getSink`와(과) 함께 사용 
  + `additionalOptions`: `getCatalogSource`, `getCatalogSink`와 함께 사용
  + `options`: `getSourceWithFormat`, `getSinkWithFormat`와 함께 사용
+ Python
  + `connection_options`: `create_data_frame_from_options`, `write_dynamic_frame_from_options`와 함께 사용
  + `additional_options`: `create_data_frame_from_catalog`, `write_dynamic_frame_from_catalog`와 함께 사용
  + `options`: `getSource`, `getSink`와 함께 사용

스트리밍 ETL 작업에 대한 참고 및 제한 사항은 [스트리밍 ETL 참고 사항 및 제한 사항](add-job-streaming.md#create-job-streaming-restrictions) 섹션을 참조하세요.

## Kinesis 구성
<a name="aws-glue-programming-etl-connect-kinesis-configure"></a>

AWS Glue Spark 작업에서 Kinesis 데이터 스트림을 연결하려면 몇 가지 필수 조건이 필요합니다.
+ 읽는 경우 AWS Glue 작업에는 Kinesis 데이터 스트림에 대한 읽기 액세스 수준의 IAM 권한이 있어야 합니다.
+ 쓰는 경우 AWS Glue 작업에는 Kinesis 데이터 스트림에 대한 쓰기 액세스 수준의 IAM 권한이 있어야 합니다.

경우에 따라 추가 필수 조건을 구성해야 합니다.
+ AWS Glue 작업이 **추가 네트워크 연결**(일반적으로 다른 데이터 세트에 연결)로 구성되었고 이러한 연결 중 하나에서 Amazon VPC **네트워크 옵션**을 제공하는 경우 작업에 Amazon VPC를 통해 통신하도록 지시합니다. 이 경우 Amazon VPC를 통해 통신하도록 Kinesis 데이터 스트림도 구성해야 합니다. Amazon VPC와 Kinesis 데이터 스트림 사이에서 인터페이스 VPC 엔드포인트를 생성하면 됩니다. 자세한 내용은 [Using Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com//streams/latest/dev/vpc.html)를 참조하세요.
+ 다른 계정에서 Amazon Kinesis Data Streams를 지정할 때 크로스 계정 액세스를 허용하도록 역할과 정책을 설정해야 합니다. 자세한 내용은 [예: 다른 계정의 Kinesis 스트림에서 읽기](https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html)를 참조하세요.

스트리밍 ETL 작업 필수 조건에 대한 자세한 내용은 [AWS Glue에서 스트리밍 ETL 작업](add-job-streaming.md) 섹션을 참조하세요.

## 예제: Kinesis 스트림에서 읽기
<a name="aws-glue-programming-etl-connect-kinesis-read"></a>

### 예제: Kinesis 스트림에서 읽기
<a name="section-etl-connect-kinesis-read"></a>

[forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch)과(와) 함께 사용합니다.

Amazon Kinesis 스트리밍 소스의 예:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## 예: Kinesis 스트림에 쓰기
<a name="aws-glue-programming-etl-connect-kinesis-write"></a>

### 예제: Kinesis 스트림에서 읽기
<a name="section-etl-connect-kinesis-read"></a>

[forEachBatch](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch)과(와) 함께 사용합니다.

Amazon Kinesis 스트리밍 소스의 예:

```
kinesis_options =
   { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
     "startingPosition": "TRIM_HORIZON", 
     "inferSchema": "true", 
     "classification": "json" 
   }
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
```

## Kinesis 연결 옵션 참조
<a name="aws-glue-programming-etl-connect-kinesis"></a>

Amazon Kinesis Data Streams에 대한 연결 옵션을 지정합니다.

Kinesis 스트리밍 데이터 원본에 대한 다음 연결 옵션을 사용합니다.
+ `"streamARN"` (필수) 읽기/쓰기에 사용됩니다. Kinesis 데이터 스트림의 ARN.
+ `"classification"` (읽기 필수) 읽기에 사용됩니다. 레코드의 데이터에서 사용하는 파일 형식. 데이터 카탈로그를 통해 제공되지 않는 한 필수입니다.
+ `"streamName"` – (선택 사항) 읽기에 사용됩니다. 읽을 Kinesis 데이터 스트림의 이름. `endpointUrl`와(과) 함께 사용됩니다.
+ `"endpointUrl"` – (선택 사항) 읽기에 사용됩니다. 기본값: "https://kinesis.us-east-1.amazonaws.com". Kinesis 스트림의 AWS 엔드포인트. 특정 지역에 연결하는 경우가 아니면 변경하지 않아도 됩니다.
+ `"partitionKey"` – (선택 사항) 쓰기에 사용됩니다. 레코드를 생성할 때 사용되는 Kinesis 파티션 키.
+ `"delimiter"` (선택 사항) 읽기에 사용됩니다. `classification`이(가) CSV일 때 사용되는 값 구분 기호입니다. 기본값은 '`,`'입니다.
+ `"startingPosition"`: (선택 사항) 읽기에 사용됩니다. 데이터를 읽을 Kinesis 데이터 스트림의 시작 위치입니다. 가능한 값은 `yyyy-mm-ddTHH:MM:SSZ` 패턴에서 UTC 형식의 타임스탬프 문자열이나 `"latest"`, `"trim_horizon"` 또는 `"earliest"`입니다(여기서, `Z`는 UTC 시간대 오프셋(\$1/-)임, 예: '2023-04-04T08:00:00-04:00'). 기본값은 `"latest"`입니다. 참고: `"startingPosition"`에 대한 UTC 형식의 타임스탬프 문자열은 AWS Glue 버전 4.0 이상에서만 지원됩니다.
+ `"failOnDataLoss"`: (선택 사항) 활성 샤드가 누락되거나 만료된 경우 작업이 실패합니다. 기본값은 `"false"`입니다.
+ `"awsSTSRoleARN"`: (선택 사항) 읽기/쓰기에 사용됩니다. AWS Security Token Service(AWS STS)을(를) 사용하여 맡을 역할의 Amazon 리소스 이름(ARN). 이 역할에는 Kinesis 데이터 스트림에 대한 레코드 작업을 설명하거나 읽을 수 있는 권한이 있어야 합니다. 다른 계정의 데이터 스트림에 액세스할 때 이 파라미터를 사용해야 합니다. `"awsSTSSessionName"`과(와) 함께 사용합니다.
+ `"awsSTSSessionName"`: (선택 사항) 읽기/쓰기에 사용됩니다. AWS STS을(를) 사용하여 역할을 맡는 세션의 식별자입니다. 다른 계정의 데이터 스트림에 액세스할 때 이 파라미터를 사용해야 합니다. `"awsSTSRoleARN"`과(와) 함께 사용합니다.
+ `"awsSTSEndpoint"`: (선택 사항) 위임 받은 역할을 사용하여 Kinesis에 연결할 때 사용할 AWS STS 엔드포인트. 이를 통해 VPC에서 리전의 AWS STS 엔드포인트를 사용할 수 있지만, 기본 글로벌 엔드포인트로는 불가능합니다.
+ `"maxFetchTimeInMs"`: (선택 사항) 읽기에 사용됩니다. 작업 실행기가 Kinesis 데이터 스트림에서 현재 배치에 대한 레코드를 읽는 데 걸리는 최대 시간(밀리초(ms) 단위로 지정)입니다. 이 시간 내에 여러 개의 `GetRecords` API 호출을 할 수 있습니다. 기본값은 `1000`입니다.
+ `"maxFetchRecordsPerShard"`: (선택 사항) 읽기에 사용됩니다. 마이크로 배치에 따라 Kinesis 데이터 스트림에서 샤드당 가져올 최대 레코드 수입니다. 참고: 스트리밍 작업이 이미 Kinesis의 동일한 get-records 호출에서 추가 레코드를 읽은 경우 클라이언트가 이 제한을 초과할 수 있습니다. `maxFetchRecordsPerShard`가 엄격해야 한다면 `maxRecordPerRead`의 배수여야 합니다. 기본값은 `100000`입니다.
+ `"maxRecordPerRead"`: (선택 사항) 읽기에 사용됩니다. 각 `getRecords` 작업에서 Kinesis 데이터 스트림에서 가져올 최대 레코드 수. 기본값은 `10000`입니다.
+ `"addIdleTimeBetweenReads"`: (선택 사항) 읽기에 사용됩니다. 연속 두 `getRecords` 작업 사이에 시간 지연을 추가합니다. 기본값은 `"False"`입니다. 이 옵션은 Glue 버전 2.0 이상에서만 구성할 수 있습니다.
+ `"idleTimeBetweenReadsInMs"`: (선택 사항) 읽기에 사용됩니다. 연속 두 `getRecords` 작업 사이의 최소 시간 지연으로, ms 단위로 지정됩니다. 기본값은 `1000`입니다. 이 옵션은 Glue 버전 2.0 이상에서만 구성할 수 있습니다.
+ `"describeShardInterval"`: (선택 사항) 읽기에 사용됩니다. 스크립트가 리샤딩을 고려하기 위한 두 `ListShards` API 호출 사이의 최소 시간 간격. 자세한 내용은 *Amazon Kinesis Data Streams Developer Guide*의 [Strategies for Resharding](https://docs.aws.amazon.com//streams/latest/dev/kinesis-using-sdk-java-resharding-strategies.html)을 참조하세요. 기본값은 `1s`입니다.
+ `"numRetries"`: (선택 사항) 읽기에 사용됩니다. Kinesis Data Streams API 요청의 최대 재시도 횟수입니다. 기본값은 `3`입니다.
+ `"retryIntervalMs"`: (선택 사항) 읽기에 사용됩니다. Kinesis Data Streams API 호출을 재시도하기 전의 휴지 기간(ms 단위로 지정)입니다. 기본값은 `1000`입니다.
+ `"maxRetryIntervalMs"`: (선택 사항) 읽기에 사용됩니다. Kinesis Data Streams API 호출을 두 번 재시도하는 사이의 최대 휴지 시간(ms 단위로 지정)입니다. 기본값은 `10000`입니다.
+ `"avoidEmptyBatches"`: (선택 사항) 읽기에 사용됩니다. 배치가 시작되기 전에 Kinesis 데이터 스트림에서 읽지 않은 데이터를 확인하여 빈 마이크로 배치 작업 생성을 방지합니다. 기본값은 `"False"`입니다.
+ `"schema"`: (inferSchema가 거짓으로 설정된 경우 필수 사항) 읽기에 사용됩니다. 페이로드를 처리하는 데 사용하는 스키마. 분류가 `avro`인 경우 제공된 스키마는 Avro 스키마 형식이어야 합니다. 분류가 `avro`가 아닌 경우 제공된 스키마는 DDL 스키마 형식이어야 합니다.

  다음은 스키마의 예입니다.

------
#### [ Example in DDL schema format ]

  ```
  `column1` INT, `column2` STRING , `column3` FLOAT
  ```

------
#### [ Example in Avro schema format ]

  ```
  {
    "type":"array",
    "items":
    {
      "type":"record",
      "name":"test",
      "fields":
      [
        {
          "name":"_id",
          "type":"string"
        },
        {
          "name":"index",
          "type":
          [
            "int",
            "string",
            "float"
          ]
        }
      ]
    }
  }
  ```

------
+ `"inferSchema"`: (선택 사항) 읽기에 사용됩니다. 기본값은 'false'입니다. 'true'로 설정하면 스키마가 런타임 시 `foreachbatch` 내의 페이로드에서 감지됩니다.
+ `"avroSchema"`: (더 이상 사용되지 않음) 읽기에 사용됩니다. Avro 형식을 사용할 때 Avro 데이터의 스키마를 지정하는 데 사용되는 파라미터입니다. 이 파라미터는 이제 사용 중단되었습니다. `schema` 파라미터를 사용합니다.
+ `"addRecordTimestamp"`: (선택 사항) 읽기에 사용됩니다. 이 옵션이 'true'로 설정되면 데이터 출력에는 이름이 '\$1\$1src\$1timestamp'라는 추가 열이 포함됩니다. 이 열은 스트림에서 해당 레코드를 수신한 시간을 나타냅니다. 기본값은 'false'입니다. 이 옵션은 AWS Glue 버전 4.0 이상에서 지원됩니다.
+ `"emitConsumerLagMetrics"`: (선택 사항) 읽기에 사용됩니다. 옵션을 '참'으로 설정하면 각 배치에 대해 스트림에서 수신한 가장 오래된 레코드와 AWS Glue에 도착한 시간 사이의 지표를 CloudWatch로 내보냅니다. 지표의 이름은 'glue.driver.streaming.maxConsumerLagInMs'입니다. 기본값은 'false'입니다. 이 옵션은 AWS Glue 버전 4.0 이상에서 지원됩니다.
+ `"fanoutConsumerARN"`: (선택 사항) 읽기에 사용됩니다. `streamARN`에서 지정된 스트림에 대한 Kinesis 스트림 소비자의 ARN. Kinesis 연결에 대해 향상된 팬아웃 모드를 활성화하는 데 사용됩니다. 향상된 팬아웃과 함께 Kinesis 스트림을 사용하는 방법에 대한 자세한 내용은 [Kinesis 스트리밍 작업에서 향상된 팬아웃 사용](aws-glue-programming-etl-connect-kinesis-efo.md) 섹션을 참조하세요.
+ `"recordMaxBufferedTime"` – (선택 사항) 쓰기에 사용됩니다. 기본값: 1000(ms). 레코드 쓰기를 대기하는 중 레코드가 버퍼링되는 최대 시간.
+ `"aggregationEnabled"` – (선택 사항) 쓰기에 사용됩니다. 기본값: true. Kinesis로 보내기 전에 레코드를 집계해야 하는지 여부를 지정합니다.
+ `"aggregationMaxSize"` – (선택 사항) 쓰기에 사용됩니다. 기본값: 51200(바이트). 레코드가 이 한도보다 크면 애그리게이터를 우회합니다. 참고 Kinesis는 레코드 크기를 50KB로 제한합니다. 이 값이 50KB를 초과하도록 설정하면 Kinesis에서 크기가 초과된 레코드를 거부합니다.
+ `"aggregationMaxCount"` – (선택 사항) 쓰기에 사용됩니다. 기본값: 4294967295. 집계된 레코드에 포함할 최대 항목 수.
+ `"producerRateLimit"` – (선택 사항) 쓰기에 사용됩니다. 기본값: 150(%). 단일 생성자(예: 작업)에서 보내는 샤드당 처리량을 백엔드 한도의 백분율로 제한합니다.
+ `"collectionMaxCount"` – (선택 사항) 쓰기에 사용됩니다. 기본값: 500. PutRecords 요청에 포함할 최대 항목 수.
+ `"collectionMaxSize"` – (선택 사항) 쓰기에 사용됩니다. 기본값: 5242880(바이트). PutRecords 요청으로 전송할 수 있는 데이터 최대량.

# Kinesis 스트리밍 작업에서 향상된 팬아웃 사용
<a name="aws-glue-programming-etl-connect-kinesis-efo"></a>

향상된 팬아웃 소비자는 일반 소비자보다 더 높은 전용 처리량으로 Kinesis 스트림에서 레코드를 수신할 수 있습니다. 이는 Kinesis 소비자에게 데이터(예: 작업)를 제공하는 데 사용되는 전송 프로토콜을 최적화하는 방식으로 지원됩니다. Kinesis의 향상된 팬아웃에 대한 자세한 내용은 [Kinesis 설명서](https://docs.aws.amazon.com//streams/latest/dev/enhanced-consumers.html)를 참조하세요.

향상된 팬아웃 모드에서는 `maxRecordPerRead` 및 `idleTimeBetweenReadsInMs` 연결 옵션이 더 이상 적용되지 않습니다. 향상된 팬아웃을 사용할 때 해당 파라미터는 구성할 수 없기 때문입니다. 재시도를 위한 구성 옵션은 설명한 대로 수행됩니다.

다음 절차를 사용하여 스트리밍 작업에 대한 향상된 팬아웃을 활성화 및 비활성화합니다. 스트림에서 데이터를 소비하는 각 작업에 대해 스트림 소비자를 등록해야 합니다.

**작업에서 향상된 팬아웃 소비를 활성화하려면:**

1. Kinesis API를 사용하여 작업에 대한 스트림 소비자를 등록합니다. [Kinesis 설명서](https://docs.aws.amazon.com//streams/latest/dev/building-enhanced-consumers-api)의 *register a consumer with enhanced fan-out using the Kinesis Data Streams API*에 대한 지침을 따릅니다. 첫 번째 단계인 [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)를 직접적으로 호출합니다. 요청은 ARN, *consumerARN*을 반환해야 합니다.

1. 연결 메서드 인수에서 `fanoutConsumerARN` 연결 옵션을 *consumerARN*으로 설정합니다.

1. 작업을 다시 시작합니다.

**작업에서 향상된 팬아웃 소비를 비활성화하려면:**

1. 메서드 직접 호출에서 `fanoutConsumerARN` 연결 옵션을 제거합니다.

1. 작업을 다시 시작합니다.

1. [Kinesis 설명서](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-console.html)의 *deregister a consumer*에 대한 지침을 따릅니다. 이 지침은 콘솔에도 적용되지만 Kinesis API를 통해서도 수행할 수 있습니다. Kinesis API를 통한 스트림 소비자 등록 취소에 대한 자세한 내용은 Kinesis 설명서의 [DeregisterStreamConsumer](https://docs.aws.amazon.com//kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)를 참조하세요.