

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Kafka 中使用 OpenSearch 采集管道
<a name="configure-client-self-managed-kafka"></a>

您可以使用 [Kafka](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/) 插件将数据从自行管理的 Kafka 集群流式传输到亚马逊 OpenSearch 服务域和 OpenSearch 无服务器集合。 OpenSearch Ingestion 支持来自配置有公共或私有 (VPC) 网络的 Kafka 集群的连接。本主题概述设置摄取管道的先决条件和步骤，包括配置网络设置和身份验证方法，例如双向TLS（mTLS）、SASL/SCRAM 或 IAM。

## 从公有 Kafka 集群迁移数据
<a name="self-managaged-kafka-public"></a>

您可以使用 OpenSearch Ingestion 管道从公共自管 Kafka 集群迁移数据，这意味着可以公开解析域 DNS 名称。为此，请设置一个以自管理的 Kafka 为源，将 OpenSearch 服务或 OpenSearch 无服务器作为目标的 OpenSearch 摄取管道。这将处理您从自行管理的源集群到托 AWS管目标域或集合的流式数据。

### 先决条件
<a name="self-managaged-kafka-public-prereqs"></a>

在创建 OpenSearch 摄取管道之前，请执行以下步骤：

1. 使用公有网络配置创建自主管理型 Kafka 集群。集群应包含您要采集到 S OpenSearch ervice 中的数据。

1. 创建要将数据迁移到的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息，请参阅[创建 OpenSearch 服务域](createupdatedomains.md#createdomains)和[创建集合](serverless-create.md)。

1. 使用在您的自管理集群上设置身份验证。 AWS Secrets Manager按照 [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html) 中的步骤启用密钥轮换。

1. 将[基于资源的策略](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)附加到域，或将[数据访问策略](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html)附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从您的自管理集群写入您的域或集合。

   以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新 `resource`。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   要创建具有向该集合或域写入数据的正确权限的 IAM 角色，请参阅 [在 Amazon OpenSearch Ingestion 中设置角色和用户](pipeline-security-overview.md)。

### 步骤 1：配置管道角色
<a name="self-managed-kafka-public-pipeline-role"></a>

设置 Kafka 管道先决条件后，[配置要在工作流配置中使用的管道角色](pipeline-security-overview.md#pipeline-security-sink)，添加写入 OpenSearch 服务域或 OpenSearch 无服务器集合的权限，以及从 Secrets Manager 读取密钥的权限。

### 步骤 2：创建管道
<a name="self-managed-kafka-public-pipeline"></a>

然后，您可以配置如下所示的 OpenSearch Ingestion 管道，将 Kafka 指定为来源。

您可以将多个 OpenSearch 服务域指定为数据的目的地。此功能允许有条件地路由或将传入数据复制到多个 OpenSearch 服务域。

您还可以将数据从源 Confluent Kafka 集群迁移到无服务器 V OpenSearch PC 集合。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。

```
version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

您可以使用预先配置的蓝图，以创建此管道。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

### 从 VPC 中的 Kafka 集群迁移数据
<a name="self-managaged-kafka-private"></a>

您还可以使用 OpenSearch 摄取管道从 VPC 中运行的自管 Kafka 集群迁移数据。为此，请设置一个以自管理的 Kafka 为源，将 OpenSearch 服务或 OpenSearch 无服务器作为目标的 OpenSearch 摄取管道。这将处理您从自行管理的源集群到托 AWS管目标域或集合的流式数据。

#### 先决条件
<a name="self-managaged-kafka-private-prereqs"></a>

在创建 OpenSearch 摄取管道之前，请执行以下步骤：

1. 使用 VPC 网络配置创建自管理 Kafka 集群，其中包含您要提取到服务中的数据。 OpenSearch 

1. 创建要将数据迁移到的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息，请参阅[创建 OpenSearch 服务域](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains)和[创建集合](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create)。

1. 使用在您的自管理集群上设置身份验证。 AWS Secrets Manager按照 [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html) 中的步骤启用密钥轮换。

1. 获取有权访问自主管理型 Kafka 的 VPC 的 ID。选择 OpenSearch Ingestion 要使用的 VPC CIDR。
**注意**  
如果您使用创建管道，则还必须将您的 OpenSearch 摄取管道连接到您的 VPC 才能使用自我管理的 Kafka。 AWS 管理控制台 要执行此操作，请找到**网络配置**部分，选择**连接到 VPC** 复选框，然后从提供的任意一个默认选项中选择 CIDR，或者选择自己的 CIDR。您可以按照 [RFC 1918 当前最佳实践](https://datatracker.ietf.org/doc/html/rfc1918)中的定义，使用私有地址空间中的任何 CIDR。  
要提供自定义 CIDR，请从下拉菜单中选择**其他**。为避免接入和自行管理之间的 IP 地址冲突 OpenSearch，请确保自管理 VP OpenSearch C CIDR 与用于 OpenSearch 摄取的 CIDR 不同。 OpenSearch 

1. 将[基于资源的策略](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)附加到域，或将[数据访问策略](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html)附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从您的自管理集群写入您的域或集合。

   以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新 `resource`。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   要创建具有向该集合或域写入数据的正确权限的 IAM 角色，请参阅 [在 Amazon OpenSearch Ingestion 中设置角色和用户](pipeline-security-overview.md)。

#### 步骤 1：配置管道角色
<a name="self-managed-kafka-private-pipeline-role"></a>

完成管道先决条件设置后，[配置管道角色](pipeline-security-overview.md#pipeline-security-sink)以在管道配置中使用，并为该角色添加以下权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        { 
            "Effect": "Allow",
            "Action": [ 
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": { 
               "StringEquals": 
                    {
                        "aws:RequestTag/OSISManaged": "true"
                    } 
            } 
        }
    ]
}
```

------

您必须为用于创建 OpenSearch 摄取管道的 IAM 角色提供上述 Amazon EC2 权限，因为管道使用这些权限在您的 VPC 中创建和删除网络接口。该管道只能通过此网络接口访问 Kafka 集群。

#### 步骤 2：创建管道
<a name="self-managed-kafka-private-pipeline"></a>

然后，您可以配置如下所示的 OpenSearch Ingestion 管道，将 Kafka 指定为来源。

您可以将多个 OpenSearch 服务域指定为数据的目的地。此功能允许有条件地路由或将传入数据复制到多个 OpenSearch 服务域。

您还可以将数据从源 Confluent Kafka 集群迁移到无服务器 V OpenSearch PC 集合。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。

```
 version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

您可以使用预先配置的蓝图，以创建此管道。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。