

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka
<a name="configure-client-msk"></a>

你可以使用 [Kafka 插件](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/)将来自亚马逊 Apache Managed [Streaming for Apache Kafka（亚马逊 MS OpenSearch K）的数据提取到你的摄取管](https://docs.aws.amazon.com/msk/latest/developerguide/)道中。在 Amazon MSK 中，您可以构建并运行使用 Apache Kafka 的应用程序来处理流数据。 OpenSearch Ingestion AWS PrivateLink 用于连接亚马逊 MSK。您可以从 Amazon MSK 和 Amazon MSK 无服务器集群摄取数据。这两个流程之间的唯一区别是在设置管道之前必须执行的先决条件步骤。

**Topics**
+ [预置的 Amazon MSK 先决条件](#msk-prereqs)
+ [Amazon MSK 无服务器先决条件](#msk-serverless-prereqs)
+ [步骤 1：配置管道角色](#msk-pipeline-role)
+ [步骤 2：创建管道](#msk-pipeline)
+ [步骤 3：（可选）使用 AWS Glue 架构注册表](#msk-glue)
+ [步骤 4：（可选）为 Amazon MSK 管道配置推荐的计算单位 (OCUs)](#msk-ocu)

## 预置的 Amazon MSK 先决条件
<a name="msk-prereqs"></a>

在创建 OpenSearch 摄取管道之前，请执行以下步骤：

1. 按照《Amazon Managed Streaming for Apache Kafka 开发人员指南》中 [Creating a cluster](https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#create-cluster-console)** 说明的步骤，创建一个由 Amazon MSK 预置的集群。对于 **Broker 类型**，请选择除`t3`类型之外的任何选项，因为 OpenSearch Ingestion 不支持这些类型。

1. 集群处于**活动**状态后，请按照[开启多 VPC 连接](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)中的步骤执行操作。

1. 按照[将集群策略附加到 MSK 集群](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy)的步骤附加以下策略之一，具体取决于集群与管道是否位于同一 AWS 账户。此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 集群的 AWS PrivateLink 连接并从 Kafka 主题中读取数据。确保使用自身 ARN 更新 `resource`。

   当集群与管道位于同一 AWS 账户时，适用以下策略：

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   如果您的 Amazon MSK 集群与您的管道 AWS 账户 不同，请改为附加以下策略。请注意，只有预置的 Amazon MSK 集群才能进行跨账户访问，Amazon MSK 无服务器集群不支持跨账户访问。的 ARN AWS `principal` 应该是您为工作流配置提供的相同管道角色的 ARN：

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "kafka-cluster:*",
           "kafka:*"
         ],
         "Resource": [
           "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id",
           "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/*",
           "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
         ]
       }
     ]
   }
   ```

------

1. 按照[创建主题](https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html)中的步骤创建 Kafka 主题。确保`BootstrapServerString`这是私有终端节点（单 VPC）引导程序 URLs之一。`--replication-factor` 的值应为 `2` 或 `3`，具体取决于 Amazon MSK 集群包含的可用区数量。`--partitions` 的值至少应为 `10`。

1. 按照[生成和使用数据](https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html)中的步骤生成和使用数据。再说一遍，请确保`BootstrapServerString`这是您的私有终端节点（单 VPC）引导程序 URLs之一。

## Amazon MSK 无服务器先决条件
<a name="msk-serverless-prereqs"></a>

在创建 OpenSearch 摄取管道之前，请执行以下步骤：

1. 按照《Amazon Managed Streaming for Apache Kafka 开发人员指南》中 [Create an MSK Serverless cluster](https://docs.aws.amazon.com/msk/latest/developerguide/create-serverless-cluster.html#)** 说明的步骤，创建一个 Amazon MSK 无服务器集群。

1. 集群处于**活动**状态后，按照 [Attach a cluster policy to the MSK cluster](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy) 中的步骤附加以下策略。确保使用自身 ARN 更新 `resource`。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 无服务器集群的 AWS PrivateLink 连接并从 Kafka 主题中读取数据。当您的集群和管道处于相同状态时，此政策适用 AWS 账户，这必须是正确的，因为 Amazon MSK Serverless 不支持跨账户访问。

1. 按照[创建主题](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-create-topic.html)中的步骤创建 Kafka 主题。确保`BootstrapServerString`这是您的简单身份验证和安全层 (SASL) IAM 引导程序 URLs之一。`--replication-factor` 的值应为 `2` 或 `3`，具体取决于 Amazon MSK 无服务器集群包含的可用区数量。`--partitions` 的值至少应为 `10`。

1. 按照[生成和使用数据](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-produce-consume.html)中的步骤生成和使用数据。再说一遍，请确保`BootstrapServerString`这是您的简单身份验证和安全层 (SASL) IAM 引导程序 URLs之一。

## 步骤 1：配置管道角色
<a name="msk-pipeline-role"></a>

预置好 Amazon MSK 集群或设置好无服务器集群后，在管道角色中添加要在管道配置中使用的以下 Kafka 权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/topic-name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
            ]
        }
    ]
}
```

------

## 步骤 2：创建管道
<a name="msk-pipeline"></a>

然后，你可以配置如下所示的 OpenSearch 摄取管道，将 Kafka 指定为来源：

```
version: "2"
log-pipeline:
  source:
    kafka:
      acknowledgements: true
      topics:
      - name: "topic-name"
        group_id: "grouplambd-id"
      aws:
        msk:
          arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id"
        region: "us-west-2"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index_name"
      aws_region: "region"
      aws_sigv4: true
```

您可以使用预先配置的 Amazon MSK 蓝图，以创建此管道。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

## 步骤 3：（可选）使用 AWS Glue 架构注册表
<a name="msk-glue"></a>

当您将 OpenSearch Ingestion 与 Amazon MSK 配合使用时，可以将 AVRO 数据格式用于架构注册表中托管的架构。 AWS Glue 在 [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)中，您可以集中发现、控制和演变数据流架构。

要使用此选项，请在管道配置中启用架构 `type`：

```
schema:
  type: "aws_glue"
```

您还必须在您的管道角色中提供 AWS Glue 读取访问权限。您可以使用名为的 AWS 托管策略[AWSGlueSchemaRegistryReadonlyAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSGlueSchemaRegistryReadonlyAccess.html)。此外，您的注册表必须 AWS 账户 与您的 OpenSearch 摄取管道位于同一区域中。

## 步骤 4：（可选）为 Amazon MSK 管道配置推荐的计算单位 (OCUs)
<a name="msk-ocu"></a>

每个计算单位的每个主题有一个使用者。代理在给定主题的使用者之间均衡分配分区。但是，当分区数量大于使用者数量时，Amazon MSK 将要求每个使用者托管多个分区。 OpenSearch Ingestion 具有内置的 auto Scaling，可以根据 CPU 使用率或管道中的待处理记录数量向上或向下扩展。

为实现最佳性能，请将分区分布在多个计算单位中以便并行处理。如果主题有大量分区（例如，超过 96 个，这是 OCUs 每个管道的最大分区），我们建议您将管道配置为 1— OCUs 96。因为它将根据需要自动扩缩。如果主题包含的分区数量较少（例如，少于 96 个），则最大计算单位应与分区数量相同。

当管道包含多个主题时，请选择分区数最多的主题作为参考来配置最大计算单位。通过向同一主题和使用者组添加另一个具有新集的 OCUs 管道，您可以几乎线性地扩展吞吐量。