

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用采 OpenSearch 集管道和 Amazon Kinesis Data Streams
<a name="configure-client-kinesis"></a>

使用 OpenSearch 采集管道和 Amazon Kinesis Data Streams，将来自多个流的流记录数据提取到亚马逊 OpenSearch 服务域和集合中。 OpenSearch Ingestion 管道整合了流媒体摄取基础架构，为持续从 Kinesis 摄取直播记录提供了一种高规模、低延迟的方式。

**Topics**
+ [Amazon Kinesis Data Streams 作为源](#confluent-cloud-kinesis)
+ [Amazon Kinesis Data Streams 跨账户作为源](#kinesis-cross-account-source)

## Amazon Kinesis Data Streams 作为源
<a name="confluent-cloud-kinesis"></a>

通过以下过程，您将学习如何设置使用 Amazon K OpenSearch inesis Data Streams 作为数据源的摄取管道。本节介绍必要的先决条件，例如创建 OpenSearch 服务域或 OpenSearch 无服务器集合，并逐步完成配置管道角色和创建管道的步骤。

### 先决条件
<a name="s3-prereqs"></a>

要设置管道，则需要一个或多个活跃的 Kinesis Data Streams。这些流必须正在接收记录，或准备接收来自其他源的记录。有关更多信息，请参阅[ OpenSearch 摄取概述](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)。

**设置管道**

1. 

**创建 OpenSearch 服务域或 OpenSearch 无服务器集合**

   要创建域名或集合，请参阅 [ OpenSearch Ingestion 入门](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)。

   要创建具有向该集合或域写入数据的正确权限的 IAM 角色，请参阅 [Resource-based policies](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)。

1. 

**配置具有权限的管道角色**

   设置要在管道配置中使用的[管道角色](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink)，并为其添加以下权限。将 *placeholder values* 替换为您自己的信息。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowReadFromStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:DescribeStreamConsumer",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards",
                   "kinesis:ListStreams",
                   "kinesis:ListStreamConsumers",
                   "kinesis:RegisterStreamConsumer",
                   "kinesis:SubscribeToShard"
               ],
               "Resource": [
                   "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name"
               ]
           }
       ]
   }
   ```

------

   如果已在流上启用服务器端加密，则以下 AWS KMS 策略允许对记录进行解密。将 *placeholder values* 替换为您自己的信息。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowDecryptionOfCustomManagedKey",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

   为使管道能够将数据写入域，域必须具有[域级访问策略](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)，以允许 **sts\$1role\$1arn** 管道角色访问域。

   以下示例是域访问策略，允许在上一步（`pipeline-role`）中创建的管道角色将数据写入 `ingestion-domain` 域。将 *placeholder values* 替换为您自己的信息。

   ```
   {
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::your-account-id:role/pipeline-role"
         },
         "Action": ["es:DescribeDomain", "es:ESHttp*"],
         "Resource": "arn:aws:es:AWS 区域:account-id:domain/domain-name/*"
       }
     ]
   }
   ```

1. 

**创建管道**

   配置一个将 **K** 指定inesis-data-streams为源的 OpenSearch 摄取管道。您可以在 OpenSearch Ingestion 控制台上找到用于创建此类管道的现成蓝图。（可选）要使用创建管道 AWS CLI，您可以使用名为 “**`AWS-KinesisDataStreamsPipeline`**” 的蓝图。将 *placeholder values* 替换为您自己的信息。

   ```
   version: "2"
   kinesis-pipeline:
     source:
       kinesis_data_streams:
         acknowledgments: true
         codec:
           # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records.
           # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch.
           # json:
             # key_name: "logEvents"
             # These keys contain the metadata sent by CloudWatch Subscription Filters
             # in addition to the individual log events:
             # include_keys: [ 'owner', 'logGroup', 'logStream' ]
           newline:
         streams:
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
   
           # buffer_timeout: "1s"
           # records_to_accumulate: 100
           # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
           # consumer_strategy: "polling"
           # if consumer strategy is set to "polling", enable the polling config below.
           # polling:
             # max_polling_records: 100
             # idle_time_between_reads: "250ms"
         aws:
           # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com
           sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
           # Provide the AWS 区域 of the Data Stream.
           region: "us-east-1"
   
     sink:
       - opensearch:
           # Provide an Amazon OpenSearch Serverless domain endpoint
           hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
           index: "index_${getMetadata(\"stream_name\")}"
           # Ensure adding unique document id as a combination of the metadata attributes available.
           document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
           aws:
             # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
             sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
             # Provide the AWS 区域 of the domain.
             region: "us-east-1"
             # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
             serverless: false
             # serverless_options:
               # Specify a name here to create or update network policy for the serverless collection
               # network_policy_name: "network-policy-name"
           # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
           # distribution_version: "es6"
           # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
           # enable_request_compression: true/false
           # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
           dlq:
             s3:
               # Provide an S3 bucket
               bucket: "your-dlq-bucket-name"
               # Provide a key path prefix for the failed requests
               # key_path_prefix: "kinesis-pipeline/logs/dlq"
               # Provide the region of the bucket.
               region: "us-east-1"
               # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
               sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

**配置选项**  
有关 Kinesis 配置选项，请参阅[文档中的*OpenSearch*配置选项](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kinesis/#configuration-options)。

**可用的元数据属性**
   + **stream\$1name**：从中摄取记录的 Kinesis Data Streams 的名称
   + **partition\$1key**：正在摄取的 Kinesis Data Streams 记录的分区键
   + **sequence\$1number**：正在摄取的 Kinesis Data Streams 记录的序列号
   + **sub\$1sequence\$1number**：正在摄取的 Kinesis Data Streams 记录的子序列号

1. 

**（可选）为 Kinesis Data Streams 管道配置推荐的计算单位 (OCUs)**

   也可以将 OpenSearch Kinesis Data Streams 源管道配置为从多个流中提取直播记录。为加快数据摄取速度，我们建议您为每个新增流添加一个额外的计算单位。

### 数据一致性
<a name="confluent-cloud-kinesis-private"></a>

OpenSearch Ingestion 支持 end-to-end确认以确保数据的持久性。当管道从 Kinesis 读取流记录时，会根据与流关联的分片动态分配读取流记录的工作。Pipeline 在收录 OpenSearch 域或集合中的所有记录后收到确认后，将自动对流进行检查点。这将避免对流记录进行重复处理。

要根据流名称创建索引，请在 opensearch 接收器部分中将索引定义为 **"index\$1\$1\$1getMetadata(\$1"stream\$1name\$1")\$1"**。

## Amazon Kinesis Data Streams 跨账户作为源
<a name="kinesis-cross-account-source"></a>

您可以通过 Amazon Kinesis Data Streams 跨账户授予访问权限 OpenSearch ，以便摄取管道可以访问另一个账户作为来源的 Kinesis Data Streams。请完成以下步骤以启用跨账户访问。

**配置跨账户访问**

1. 

**在拥有 Kinesis 流的账户中设置资源策略**

   将 *placeholder values* 替换为您自己的信息。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "StreamReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name"
           },
           {
               "Sid": "StreamEFOReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name"
           }
       ]
   }
   ```

------

1. 

**（可选）设置消费端和消费端资源策略**

   此步骤为可选操作，仅当您计划使用增强型扇出消费端策略读取流记录时才需要执行。有关更多信息，请参阅[开发具有专用吞吐量增强型扇出消费端](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)。

   1. 

**设置消费端**

      要重复使用现有消费端，可跳过此步骤。有关更多信息，请参阅[RegisterStreamConsumer](https://docs.aws.amazon.com/dms/latest/APIReference/API_RegisterStreamConsumer.html)《*亚马逊 Kinesis Data Streams API* 参考》。

      在以下示例 CLI 命令中，*placeholder values*用您自己的信息替换。  
**Example 示例 CLI 命令：**  

      ```
      aws kinesis register-stream-consumer \
      --stream-arn "arn:aws:kinesis:AWS 区域:account-id:stream/stream-name" \
      --consumer-name consumer-name
      ```

   1. 

**设置消费端资源策略**

      在以下语句中，*placeholder values*用您自己的信息替换。

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Sid": "ConsumerEFOReadStatementID",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
                  },
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name"
              }
          ]
      }
      ```

------

1. 

**管道配置**

   对于跨账户数据摄取，请为每个流的 `kinesis_data_streams` 下添加以下属性：
   + `stream_arn`：该流所属账户中存在的流的 ARN
   + `consumer_arn`：这是可选属性，如果选择默认的增强型扇出消费端策略，则必须指定该属性。指定此字段的实际消费端 ARN。将 *placeholder values* 替换为您自己的信息。

   ```
   version: "2"
        kinesis-pipeline:
          source:
            kinesis_data_streams:
              acknowledgments: true
              codec:
                newline:
              streams:
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                  # Enable this if ingestion should start from the start of the stream.
                  # initial_position: "EARLIEST"
                  # checkpoint_interval: "PT5M"
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                   # initial_position: "EARLIEST"
        
                # buffer_timeout: "1s"
                # records_to_accumulate: 100
                # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
                # consumer_strategy: "polling"
                # if consumer strategy is set to "polling", enable the polling config below.
                # polling:
                  # max_polling_records: 100
                  # idle_time_between_reads: "250ms"
              aws:
                # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                # Provide the AWS 区域 of the domain.
                region: "us-east-1"
        
          sink:
            - opensearch:
                # Provide an OpenSearch Serverless domain endpoint
                hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
                index: "index_${getMetadata(\"stream_name\")}"
                # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes
                document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
                aws:
                  # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
                  sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                  # Provide the AWS 区域 of the domain.
                  region: "us-east-1"
                  # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
                  serverless: false
                    # serverless_options:
                    # Specify a name here to create or update network policy for the serverless collection
                  # network_policy_name: network-policy-name
                # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
                # distribution_version: "es6"
                # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
                # enable_request_compression: true/false
                # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
                dlq:
                  s3:
                    # Provide an Amazon S3 bucket
                    bucket: "your-dlq-bucket-name"
                    # Provide a key path prefix for the failed requests
                    # key_path_prefix: "alb-access-log-pipeline/logs/dlq"
                    # Provide the AWS 区域 of the bucket.
                    region: "us-east-1"
                    # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                    sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

1. 

**OSI 管道角色 Kinesis Data Streams**

   1. 

**IAM 策略**

      将以下策略附加到管道角色。将 *placeholder values* 替换为您自己的信息。

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": [
                  "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              },
              {
                  "Sid": "allowReadFromStream",
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStream",
                      "kinesis:DescribeStreamSummary",
                      "kinesis:GetRecords",
                      "kinesis:GetShardIterator",
                      "kinesis:ListShards",
                      "kinesis:ListStreams",
                      "kinesis:ListStreamConsumers",
                      "kinesis:RegisterStreamConsumer"
                  ],
                  "Resource": [
                      "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              }
          ]
      }
      ```

------

   1. 

**信任策略**

      要从流账户摄取数据，您需要在管道摄取角色与流账户之间建立信任关系。将以下内容添加到管道角色。将 *placeholder values* 替换为您自己的信息。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [{
           "Effect": "Allow",
           "Principal": {
             "AWS": "arn:aws:iam::111122223333:root"
            },
           "Action": "sts:AssumeRole"
        }]
      }
      ```

------