

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis Data Streams で OpenSearch Ingestion パイプラインを使用する
<a name="configure-client-kinesis"></a>

Amazon Kinesis Data Streams で OpenSearch Ingestion パイプラインを使用して、複数のストリームから Amazon OpenSearch Service ドメインとコレクションにストリームレコードデータを取り込みます。OpenSearch Ingestion パイプラインには、ストリーミング取り込みインフラストラクチャが組み込まれており、Kinesis からストリームレコードを継続的に取り込むための、高スケールで低レイテンシーな方法を提供します。

**Topics**
+ [Amazon Kinesis Data Streams をソースとする場合](#confluent-cloud-kinesis)
+ [ソースとしての Amazon Kinesis Data Streams クロスアカウント](#kinesis-cross-account-source)

## Amazon Kinesis Data Streams をソースとする場合
<a name="confluent-cloud-kinesis"></a>

次の手順では、Amazon Kinesis Data Streams をデータソースとして使用する OpenSearch Ingestion パイプラインを設定する方法について説明します。このセクションでは、OpenSearch Service ドメインまたは OpenSearch Serverless コレクションの作成、パイプラインロールの設定とパイプラインの作成の手順など、必要な前提条件について説明します。

### 前提条件
<a name="s3-prereqs"></a>

パイプラインを設定するには、1 つ以上のアクティブな Kinesis Data Streams が必要です。これらのストリームは、レコードを受信するか、他のソースからレコードを受信する準備ができている必要があります。詳細については、「[Overview of OpenSearch Ingestion](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)」を参照してください。

**パイプラインを設定するには**

1. 

**OpenSearch Service ドメインまたは OpenSearch Serverless コレクションを作成する**

   ドメインまたはコレクションを作成するには、「[Getting started with OpenSearch Ingestion](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)」を参照してください。

   コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAM ロールを作成するには、「[Resource-based policies](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)」を参照してください。

1. 

**アクセス許可を持つパイプラインロールを設定する**

   パイプライン設定で使用する[パイプラインロールを設定し](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink)、そのロールに次のアクセス許可を追加します。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowReadFromStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:DescribeStreamConsumer",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards",
                   "kinesis:ListStreams",
                   "kinesis:ListStreamConsumers",
                   "kinesis:RegisterStreamConsumer",
                   "kinesis:SubscribeToShard"
               ],
               "Resource": [
                   "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name"
               ]
           }
       ]
   }
   ```

------

   ストリームでサーバー側の暗号化が有効になっている場合、次の AWS KMS ポリシーでは、レコードを復号化できるようにします。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowDecryptionOfCustomManagedKey",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

   パイプラインがドメインにデータを書き込むには、[sts\$1role\$1arn](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) パイプラインロールにドメインへのアクセスを許可する**ドメインレベルのアクセスポリシー**が、このドメインに必要になります。

   次の例は、前のステップで作成した `pipeline-role` と言う名前のパイプラインロールに、`ingestion-domain` と言う名前のドメインへの、データの書き込みが許可されるドメインアクセスポリシーです。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

   ```
   {
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::your-account-id:role/pipeline-role"
         },
         "Action": ["es:DescribeDomain", "es:ESHttp*"],
         "Resource": "arn:aws:es:AWS リージョン:account-id:domain/domain-name/*"
       }
     ]
   }
   ```

1. 

**パイプラインの作成**

   **Kinesis-data-streams** をソースとして指定する OpenSearch 取り込みパイプラインを設定します。このようなパイプラインを作成するために、OpenSearch Ingestion Console で用意されたブループリントを見つけることができます。(オプション) を使用してパイプラインを作成するには AWS CLI、「」という名前のブループリントを使用できます**`AWS-KinesisDataStreamsPipeline`**。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

   ```
   version: "2"
   kinesis-pipeline:
     source:
       kinesis_data_streams:
         acknowledgments: true
         codec:
           # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records.
           # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch.
           # json:
             # key_name: "logEvents"
             # These keys contain the metadata sent by CloudWatch Subscription Filters
             # in addition to the individual log events:
             # include_keys: [ 'owner', 'logGroup', 'logStream' ]
           newline:
         streams:
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
   
           # buffer_timeout: "1s"
           # records_to_accumulate: 100
           # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
           # consumer_strategy: "polling"
           # if consumer strategy is set to "polling", enable the polling config below.
           # polling:
             # max_polling_records: 100
             # idle_time_between_reads: "250ms"
         aws:
           # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com
           sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
           # Provide the AWS リージョン of the Data Stream.
           region: "us-east-1"
   
     sink:
       - opensearch:
           # Provide an Amazon OpenSearch Serverless domain endpoint
           hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
           index: "index_${getMetadata(\"stream_name\")}"
           # Ensure adding unique document id as a combination of the metadata attributes available.
           document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
           aws:
             # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
             sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
             # Provide the AWS リージョン of the domain.
             region: "us-east-1"
             # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
             serverless: false
             # serverless_options:
               # Specify a name here to create or update network policy for the serverless collection
               # network_policy_name: "network-policy-name"
           # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
           # distribution_version: "es6"
           # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
           # enable_request_compression: true/false
           # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
           dlq:
             s3:
               # Provide an S3 bucket
               bucket: "your-dlq-bucket-name"
               # Provide a key path prefix for the failed requests
               # key_path_prefix: "kinesis-pipeline/logs/dlq"
               # Provide the region of the bucket.
               region: "us-east-1"
               # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
               sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

**設定オプション**  
Kinesis 設定オプションについては、*OpenSearch* ドキュメントの「[Configuration options](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kinesis/#configuration-options)」を参照してください。

**使用可能なメタデータ属性**
   + **stream\$1name** – レコードが取り込まれた Kinesis Data Streams の名前
   + **partition\$1key** – 取り込まれる Kinesis Data Streams レコードのパーティションキー
   + **sequence\$1number** – 取り込まれる Kinesis Data Streams レコードのシーケンス番号
   + **sub\$1sequence\$1number** – 取り込まれる Kinesis Data Streams レコードのサブシーケンス番号

1. 

**(オプション) Kinesis Data Streams パイプラインの推奨コンピューティングユニット (OCU) を設定する**

   OpenSearch Kinesis Data Streams ソースパイプラインは、複数のストリームからストリームレコードを取り込むように設定することもできます。取り込みを高速化するには、追加された新しいストリームごとにコンピューティングユニットを追加することをお勧めします。

### データ整合性
<a name="confluent-cloud-kinesis-private"></a>

OpenSearch Ingestion は、データの耐久性を確保するためにエンドツーエンドの確認応答をサポートしています。パイプラインが Kinesis からストリームレコードを読み取ると、ストリームに関連付けられたシャードに基づいてストリームレコードを読み取る作業が動的に分散されます。パイプラインは、OpenSearch ドメインまたはコレクション内のすべてのレコードを取り込んだ後に確認応答を受信すると、自動的にストリームのチェックポイントを作成します。これにより、ストリームレコードの重複処理を回避できます。

ストリーム名に基づいてインデックスを作成するには、opensearch sink セクションのインデックスを **"index\$1\$1\$1getMetadata(\$1"stream\$1name\$1")\$1"** として定義します。

## ソースとしての Amazon Kinesis Data Streams クロスアカウント
<a name="kinesis-cross-account-source"></a>

OpenSearch Ingestion パイプラインがソースとして別のアカウントの Kinesis Data Streams にアクセスできるように、Amazon Kinesis Data Streams のアカウント間でアクセスを許可できます。クロスアカウントアクセスを有効にするには、以下の手順に従います。

**クロスアカウントアクセスの設定**

1. 

**Kinesis ストリームがあるアカウントにリソースポリシーを設定する**

   *プレースホルダー値*を、ユーザー自身の情報に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "StreamReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name"
           },
           {
               "Sid": "StreamEFOReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name"
           }
       ]
   }
   ```

------

1. 

**(オプション) コンシューマーおよびコンシューマーリソースポリシーの設定**

   これはオプションのステップであり、ストリームレコードの読み取りに拡張ファンアウトコンシューマー戦略を使用する場合にのみ必要です。詳しくは、「[Develop enhanced fan-out consumers with dedicated throughput](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)」を参照してください。

   1. 

**コンシューマーのセットアップ**

      既存のコンシューマーを再利用するには、このステップをスキップできます。詳細については、「*Amazon Kinesis Data Streams API リファレンス*」の「[RegisterStreamConsumer](https://docs.aws.amazon.com/dms/latest/APIReference/API_RegisterStreamConsumer.html)」を参照してください。

      次の例に示す CLI コマンドでは、*プレースホルダ値*を独自の情報に置き換えます。  
**Example CLI コマンドの例:**  

      ```
      aws kinesis register-stream-consumer \
      --stream-arn "arn:aws:kinesis:AWS リージョン:account-id:stream/stream-name" \
      --consumer-name consumer-name
      ```

   1. 

**コンシューマーリソースポリシーの設定**

      次のステートメントでは、*プレースホルダー値*を独自の情報に置き換えます。

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Sid": "ConsumerEFOReadStatementID",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
                  },
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name"
              }
          ]
      }
      ```

------

1. 

**パイプラインの設定**

   クロスアカウント取り込みの場合は、各ストリームの `kinesis_data_streams` に次の属性を追加します。
   + `stream_arn` – ストリームが存在するアカウントに属するストリームの ARN
   + `consumer_arn` – これはオプションの属性であり、デフォルトの拡張ファンアウトコンシューマー戦略が選択されている場合は指定する必要があります。このフィールドの実際のコンシューマー ARN を指定します。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

   ```
   version: "2"
        kinesis-pipeline:
          source:
            kinesis_data_streams:
              acknowledgments: true
              codec:
                newline:
              streams:
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                  # Enable this if ingestion should start from the start of the stream.
                  # initial_position: "EARLIEST"
                  # checkpoint_interval: "PT5M"
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                   # initial_position: "EARLIEST"
        
                # buffer_timeout: "1s"
                # records_to_accumulate: 100
                # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
                # consumer_strategy: "polling"
                # if consumer strategy is set to "polling", enable the polling config below.
                # polling:
                  # max_polling_records: 100
                  # idle_time_between_reads: "250ms"
              aws:
                # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                # Provide the AWS リージョン of the domain.
                region: "us-east-1"
        
          sink:
            - opensearch:
                # Provide an OpenSearch Serverless domain endpoint
                hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
                index: "index_${getMetadata(\"stream_name\")}"
                # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes
                document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
                aws:
                  # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
                  sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                  # Provide the AWS リージョン of the domain.
                  region: "us-east-1"
                  # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
                  serverless: false
                    # serverless_options:
                    # Specify a name here to create or update network policy for the serverless collection
                  # network_policy_name: network-policy-name
                # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
                # distribution_version: "es6"
                # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
                # enable_request_compression: true/false
                # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
                dlq:
                  s3:
                    # Provide an Amazon S3 bucket
                    bucket: "your-dlq-bucket-name"
                    # Provide a key path prefix for the failed requests
                    # key_path_prefix: "alb-access-log-pipeline/logs/dlq"
                    # Provide the AWS リージョン of the bucket.
                    region: "us-east-1"
                    # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                    sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

1. 

**OSI パイプラインロール Kinesis Data Streams**

   1. 

**IAM ポリシー**

      以下のポリシーをパイプラインロールに追加します。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": [
                  "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              },
              {
                  "Sid": "allowReadFromStream",
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStream",
                      "kinesis:DescribeStreamSummary",
                      "kinesis:GetRecords",
                      "kinesis:GetShardIterator",
                      "kinesis:ListShards",
                      "kinesis:ListStreams",
                      "kinesis:ListStreamConsumers",
                      "kinesis:RegisterStreamConsumer"
                  ],
                  "Resource": [
                      "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              }
          ]
      }
      ```

------

   1. 

**信頼ポリシー**

      ストリームアカウントからデータを取り込むには、パイプライン取り込みロールとストリームアカウントの間に信頼関係を確立する必要があります。パイプラインロールに以下を追加します。*プレースホルダー値*を、ユーザー自身の情報に置き換えます。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [{
           "Effect": "Allow",
           "Principal": {
             "AWS": "arn:aws:iam::111122223333:root"
            },
           "Action": "sts:AssumeRole"
        }]
      }
      ```

------