

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon Kinesis Data Streams와 함께 OpenSearch Ingestion 파이프라인 사용
<a name="configure-client-kinesis"></a>

Amazon Kinesis Data Streams와 함께 OpenSearch Ingestion 파이프라인을 사용하여 여러 스트림의 스트림 레코드 데이터를 Amazon OpenSearch Service 도메인 및 컬렉션으로 수집합니다. OpenSearch Ingestion 파이프라인은 스트리밍 수집 인프라를 통합하여 Kinesis에서 스트림 레코드를 짧은 지연 시간으로, 대규모로, 지속적으로 수집할 수 있도록 지원합니다.

**Topics**
+ [소스로서의 Amazon Kinesis Data Streams](#confluent-cloud-kinesis)
+ [소스로서의 Amazon Kinesis Data Streams 교차 계정](#kinesis-cross-account-source)

## 소스로서의 Amazon Kinesis Data Streams
<a name="confluent-cloud-kinesis"></a>

다음 절차에서는 Amazon Kinesis Data Streams를 데이터 소스로 사용하는 OpenSearch Ingestion 파이프라인을 설정하는 방법을 알아봅니다. 이 섹션에서는 OpenSearch Service 도메인 또는 OpenSearch Serverless 컬렉션을 생성하고, 파이프라인 역할을 구성하고, 파이프라인을 생성하는 단계를 안내하는 등 필요한 사전 조건을 다룹니다.

### 사전 조건
<a name="s3-prereqs"></a>

파이프라인을 설정하려면 활성 Kinesis Data Streams가 하나 이상 필요합니다. 이 스트림은 레코드를 수신하거나 다른 소스에서 레코드를 수신할 준비가 되어 있어야 합니다. 자세한 내용은 [OpenSearch Ingestion 개요](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)를 참조하세요.

**파이프라인을 설정하려면**

1. 

**OpenSearch Service 도메인 또는 OpenSearch Serverless 컬렉션 생성**

   도메인 또는 컬렉션을 생성하려면 [OpenSearch Ingestion 시작하기](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)를 참조하세요.

   컬렉션 또는 도메인에 쓰기 데이터에 액세스할 수 있는 올바른 권한이 있는 IAM 역할을 생성하려면 [리소스 기반 정책](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)을 참조하세요.

1. 

**권한을 사용하여 파이프라인 역할 구성**

   [파이프라인 구성에 사용할 파이프라인 역할을 설정](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink)하고 다음 권한을 추가합니다. *자리 표시자*를 자신의 정보로 바꿉니다.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowReadFromStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:DescribeStreamConsumer",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards",
                   "kinesis:ListStreams",
                   "kinesis:ListStreamConsumers",
                   "kinesis:RegisterStreamConsumer",
                   "kinesis:SubscribeToShard"
               ],
               "Resource": [
                   "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name"
               ]
           }
       ]
   }
   ```

------

   스트림에서 서버 측 암호화가 활성화된 경우 다음 AWS KMS 정책은 레코드를 복호화하도록 허용합니다. *자리 표시자*를 자신의 정보로 바꿉니다.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowDecryptionOfCustomManagedKey",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

   파이프라인이 도메인에 데이터를 쓰려면 도메인에 **sts\$1role\$1arn** 파이프라인 역할이 도메인에 액세스할 수 있도록 허용하는 [도메인 수준 액세스 정책](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)이 있어야 합니다.

   다음은 이전 단계에서 생성한 파이프라인 역할(`pipeline-role`)을 사용하여 `ingestion-domain` 도메인에 데이터를 쓰도록 허용하는 도메인 액세스 정책의 예입니다. *자리 표시자*를 자신의 정보로 바꿉니다.

   ```
   {
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::your-account-id:role/pipeline-role"
         },
         "Action": ["es:DescribeDomain", "es:ESHttp*"],
         "Resource": "arn:aws:es:AWS 리전:account-id:domain/domain-name/*"
       }
     ]
   }
   ```

1. 

**파이프라인 생성**

   **Kinesis-data-streams**를 원본으로 지정하는 OpenSearch Ingestion 파이프라인을 구성합니다. OpenSearch Ingestion 콘솔에서 이러한 파이프라인을 생성하는 데 사용 가능한 준비된 블루프린트를 찾을 수 있습니다. (선택 사항)를 사용하여 파이프라인을 생성하려면 "**`AWS-KinesisDataStreamsPipeline`**"라는 블루프린트를 사용할 AWS CLI수 있습니다. *자리 표시자*를 자신의 정보로 바꿉니다.

   ```
   version: "2"
   kinesis-pipeline:
     source:
       kinesis_data_streams:
         acknowledgments: true
         codec:
           # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records.
           # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch.
           # json:
             # key_name: "logEvents"
             # These keys contain the metadata sent by CloudWatch Subscription Filters
             # in addition to the individual log events:
             # include_keys: [ 'owner', 'logGroup', 'logStream' ]
           newline:
         streams:
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
   
           # buffer_timeout: "1s"
           # records_to_accumulate: 100
           # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
           # consumer_strategy: "polling"
           # if consumer strategy is set to "polling", enable the polling config below.
           # polling:
             # max_polling_records: 100
             # idle_time_between_reads: "250ms"
         aws:
           # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com
           sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
           # Provide the AWS 리전 of the Data Stream.
           region: "us-east-1"
   
     sink:
       - opensearch:
           # Provide an Amazon OpenSearch Serverless domain endpoint
           hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
           index: "index_${getMetadata(\"stream_name\")}"
           # Ensure adding unique document id as a combination of the metadata attributes available.
           document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
           aws:
             # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
             sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
             # Provide the AWS 리전 of the domain.
             region: "us-east-1"
             # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
             serverless: false
             # serverless_options:
               # Specify a name here to create or update network policy for the serverless collection
               # network_policy_name: "network-policy-name"
           # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
           # distribution_version: "es6"
           # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
           # enable_request_compression: true/false
           # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
           dlq:
             s3:
               # Provide an S3 bucket
               bucket: "your-dlq-bucket-name"
               # Provide a key path prefix for the failed requests
               # key_path_prefix: "kinesis-pipeline/logs/dlq"
               # Provide the region of the bucket.
               region: "us-east-1"
               # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
               sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

**구성 옵션**  
Kinesis 구성 옵션은 *OpenSearch* 설명서의 [구성 옵션](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kinesis/#configuration-options)을 참조하세요.

**사용 가능한 메타데이터 속성**
   + **stream\$1name** - 레코드가 수집된 Kinesis Data Streams의 이름
   + **partition\$1key** - 수집 중인 Kinesis Data Streams 레코드의 파티션 키
   + **sequence\$1number** – 수집 중인 Kinesis Data Streams 레코드의 시퀀스 번호
   + **sub\$1sequence\$1number** – 수집 중인 Kinesis Data Streams 레코드의 하위 시퀀스 번호

1. 

**(선택 사항) Kinesis Data Streams 파이프라인의 권장 컴퓨팅 유닛(OCU) 구성**

   둘 이상의 스트림에서 스트림 레코드를 수집하도록 OpenSearch Kinesis Data Streams 소스 파이프라인을 구성할 수도 있습니다. 수집 속도를 높이려면 추가된 새 스트림당 컴퓨팅 유닛을 추가하는 것이 좋습니다.

### 데이터 일관성
<a name="confluent-cloud-kinesis-private"></a>

OpenSearch Ingestion은 데이터 내구성을 보장하는 엔드 투 엔드 승인을 지원합니다. 파이프라인은 Kinesis에서 스트림 레코드를 읽을 때 스트림과 연결된 샤드를 기반으로 스트림 레코드를 읽는 작업을 동적으로 분산합니다. 파이프라인은 OpenSearch 도메인이나 컬렉션에서 모든 레코드를 수집한 후 승인을 받으면 스트림을 자동으로 체크포인트합니다. 이렇게 하면 스트림 레코드의 중복 처리가 방지됩니다.

스트림 이름을 기반으로 인덱스를 생성하려면 OpenSearch 싱크 섹션의 인덱스를 **"index\$1\$1\$1getMetadata(\$1"stream\$1name\$1")\$1"**로 정의합니다.

## 소스로서의 Amazon Kinesis Data Streams 교차 계정
<a name="kinesis-cross-account-source"></a>

OpenSearch Ingestion 파이프라인이 소스로서 다른 계정의 Kinesis Data Streams에 액세스할 수 있도록 Amazon Kinesis Data Streams가 있는 여러 계정에서 액세스 권한을 부여할 수 있습니다. 교차 계정 액세스를 활성화하려면 다음 단계를 수행합니다.

**교차 계정 액세스 구성**

1. 

**Kinesis 스트림이 있는 계정에서 리소스 정책 설정**

   *자리 표시자*를 자신의 정보로 바꿉니다.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "StreamReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name"
           },
           {
               "Sid": "StreamEFOReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name"
           }
       ]
   }
   ```

------

1. 

**(선택 사항) 소비자 및 소비자 리소스 정책 설정**

   이는 선택적 단계이며 스트림 레코드 읽기에 향상된 팬아웃 소비자 전략을 사용할 계획인 경우에만 필요합니다. 자세한 내용은 [전용 처리량으로 향상된 팬아웃 소비자 개발](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)을 참조하세요.

   1. 

**소비자 설정**

      기존 소비자를 재사용하려는 경우 이 단계를 건너뛸 수 있습니다. 자세한 내용은 *Amazon Kinesis Data Streams API 참조*에서 [RegisterStreamConsumer](https://docs.aws.amazon.com/dms/latest/APIReference/API_RegisterStreamConsumer.html)를 참조하세요.

      다음 CLI 명령 예에서는 *자리 표시자 값*을 자신의 정보로 바꿉니다.  
**Example CLI 명령 예제:**  

      ```
      aws kinesis register-stream-consumer \
      --stream-arn "arn:aws:kinesis:AWS 리전:account-id:stream/stream-name" \
      --consumer-name consumer-name
      ```

   1. 

**소비자 리소스 정책 설정**

      다음 문에서는 *자리 표시자 값*을 자신의 정보로 바꿉니다.

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Sid": "ConsumerEFOReadStatementID",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
                  },
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name"
              }
          ]
      }
      ```

------

1. 

**파이프라인 구성**

   교차 계정 수집의 경우 각 스트림에 대해 `kinesis_data_streams`에 다음 속성을 추가합니다.
   + `stream_arn` - 스트림이 있는 계정에 속한 스트림의 ARN
   + `consumer_arn` - 선택적 속성으로, 기본 향상된 팬아웃 소비자 전략을 선택한 경우 지정해야 합니다. 이 필드에는 실제 소비자 ARN을 지정합니다. *자리 표시자*를 자신의 정보로 바꿉니다.

   ```
   version: "2"
        kinesis-pipeline:
          source:
            kinesis_data_streams:
              acknowledgments: true
              codec:
                newline:
              streams:
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                  # Enable this if ingestion should start from the start of the stream.
                  # initial_position: "EARLIEST"
                  # checkpoint_interval: "PT5M"
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                   # initial_position: "EARLIEST"
        
                # buffer_timeout: "1s"
                # records_to_accumulate: 100
                # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
                # consumer_strategy: "polling"
                # if consumer strategy is set to "polling", enable the polling config below.
                # polling:
                  # max_polling_records: 100
                  # idle_time_between_reads: "250ms"
              aws:
                # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                # Provide the AWS 리전 of the domain.
                region: "us-east-1"
        
          sink:
            - opensearch:
                # Provide an OpenSearch Serverless domain endpoint
                hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
                index: "index_${getMetadata(\"stream_name\")}"
                # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes
                document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
                aws:
                  # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
                  sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                  # Provide the AWS 리전 of the domain.
                  region: "us-east-1"
                  # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
                  serverless: false
                    # serverless_options:
                    # Specify a name here to create or update network policy for the serverless collection
                  # network_policy_name: network-policy-name
                # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
                # distribution_version: "es6"
                # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
                # enable_request_compression: true/false
                # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
                dlq:
                  s3:
                    # Provide an Amazon S3 bucket
                    bucket: "your-dlq-bucket-name"
                    # Provide a key path prefix for the failed requests
                    # key_path_prefix: "alb-access-log-pipeline/logs/dlq"
                    # Provide the AWS 리전 of the bucket.
                    region: "us-east-1"
                    # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                    sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

1. 

**OSI 파이프라인 역할 Kinesis Data Streams**

   1. 

**IAM 정책**

      다음 정책을 파이프라인 역할에 추가합니다. *자리 표시자*를 자신의 정보로 바꿉니다.

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": [
                  "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              },
              {
                  "Sid": "allowReadFromStream",
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStream",
                      "kinesis:DescribeStreamSummary",
                      "kinesis:GetRecords",
                      "kinesis:GetShardIterator",
                      "kinesis:ListShards",
                      "kinesis:ListStreams",
                      "kinesis:ListStreamConsumers",
                      "kinesis:RegisterStreamConsumer"
                  ],
                  "Resource": [
                      "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              }
          ]
      }
      ```

------

   1. 

**신뢰 정책**

      스트림 계정에서 데이터를 수집하려면 파이프라인 수집 역할과 스트림 계정 간에 신뢰 관계를 설정해야 합니다. 파이프라인 역할에 다음을 추가합니다. *자리 표시자*를 자신의 정보로 바꿉니다.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [{
           "Effect": "Allow",
           "Principal": {
             "AWS": "arn:aws:iam::111122223333:root"
            },
           "Action": "sts:AssumeRole"
        }]
      }
      ```

------