

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 Confluent Cloud Kafka 使用 OpenSearch 擷取管道
<a name="configure-client-confluent-kafka"></a>

您可以使用 OpenSearch Ingestion 管道，將資料從 Confluent Cloud Kafka 叢集串流至 Amazon OpenSearch Service 網域和 OpenSearch Serverless 集合。OpenSearch Ingestion 支援公有和私有網路組態，可將資料從 Confluent Cloud Kafka 叢集串流至 OpenSearch Service 或 OpenSearch Serverless 管理的網域或集合。

## Confluent Cloud 公有 Kafka 叢集的連線能力
<a name="confluent-cloud-kafka-public"></a>

您可以使用 OpenSearch Ingestion 管道從具有公有組態的 Confluent Cloud Kafka 叢集遷移資料，這表示網域 DNS 名稱可以公開解析。若要這樣做，請使用 Confluent Cloud 公有 Kafka 叢集做為來源，以及 OpenSearch 設定 OpenSearch Service 或 OpenSearch Serverless 做為目的地。這會處理從自我管理來源叢集到受 AWS管目的地網域或集合的串流資料。

### 先決條件
<a name="confluent-cloud-kafka-public-prereqs"></a>

建立 OpenSearch Ingestion 管道之前，請執行下列步驟：

1. 建立充當來源的 Confluent Cloud Kafka 叢集。叢集應包含您要擷取至 OpenSearch Service 的資料。

1. 建立您要將資料遷移至其中的 OpenSearch Service 網域或 OpenSearch Serverless 集合。如需詳細資訊，請參閱[建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)及[建立集合](serverless-create.md)。

1. 使用 在 Confluent Cloud Kafka 叢集上設定身分驗證 AWS Secrets Manager。依照輪換秘密中的步驟啟用[AWS Secrets Manager 秘密輪](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)換。

1. 將[資源型政策](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)連接至您的網域，或將[資料存取政策](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html)連接至您的集合。這些存取政策允許 OpenSearch Ingestion 將資料從自我管理的叢集寫入您的網域或集合。

   下列範例網域存取政策允許您在下一個步驟中建立的管道角色將資料寫入網域。請務必`resource`使用自己的 ARN 更新 。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   若要建立具有正確許可的 IAM 角色，以存取寫入資料至集合或網域，請參閱 [在 Amazon OpenSearch 擷取中設定角色和使用者](pipeline-security-overview.md)。

### 步驟 1：設定管道角色
<a name="confluent-cloud-kafka-public-pipeline-role"></a>

設定 Confluent Cloud Kafka 叢集管道先決條件之後，[請設定管道組態中要使用的管道角色](pipeline-security-overview.md#pipeline-security-sink)，並新增寫入 OpenSearch Service 網域或 OpenSearch Serverless 集合的許可，以及從 Secrets Manager 讀取秘密的許可。

需要下列許可才能管理網路介面：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:us-east-1:111122223333:network-interface/*",
                "arn:aws:ec2:us-east-1:111122223333:subnet/*",
                "arn:aws:ec2:us-east-1:111122223333:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "arn:aws:ec2:us-east-1:111122223333:subnet/*"
        },
        { 
            "Effect": "Allow",
            "Action": [ "ec2:CreateTags" ],
            "Resource": "arn:aws:ec2:us-east-1:111122223333:network-interface/*",
            "Condition": { 
               "StringEquals": { "aws:RequestTag/OSISManaged": "true" } 
            } 
        }
    ]
}
```

------

以下是從 AWS Secrets Manager 服務讀取秘密所需的許可：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": ["secretsmanager:GetSecretValue"],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:,secret-name"]
        }
    ]
}
```

------

需要下列許可才能寫入 Amazon OpenSearch Service 網域：

```
{
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::account-id:role/pipeline-role"
      },
      "Action": ["es:DescribeDomain", "es:ESHttp*"],
      "Resource": "arn:aws:es:region:account-id:domain/domain-name/*"
    }
  ]
}
```

### 步驟 2：建立管道
<a name="confluent-cloud-kafka-public-pipeline"></a>

然後，您可以如下所示設定 OpenSearch Ingestion 管道，將 Confluent Cloud Kafka 指定為來源。

您可以指定多個 OpenSearch Service 網域做為資料的目的地。此功能可將傳入資料的條件式路由或複寫到多個 OpenSearch Service 網域。

您也可以將資料從來源 Confluent Kafka 叢集遷移至 OpenSearch Serverless VPC 集合。請確定您在管道組態中提供網路存取政策。您可以使用 Confluent 結構描述登錄檔來定義 Confluent 結構描述。

```
version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

您可以使用預先設定的藍圖來建立此管道。如需詳細資訊，請參閱[使用藍圖](pipeline-blueprint.md)。

### 在 VPC 中連線至 Confluent Cloud Kafka 叢集
<a name="confluent-cloud-kafka-private"></a>

您也可以使用 OpenSearch Ingestion 管道，從 VPC 中執行的 Confluent Cloud Kafka 叢集遷移資料。若要這麼做，請設定 OpenSearch Ingestion 管道，並將 Confluent Cloud Kafka 叢集做為來源，並將 OpenSearch Service 或 OpenSearch Serverless 做為目的地。這會處理從 Confluent Cloud Kafka 來源叢集到 AWS受管目的地網域或集合的串流資料。

 OpenSearch Ingestion 支援 Confluent 中所有支援的網路模式中設定的 Confluent Cloud Kafka 叢集。OpenSearch Ingestion 中支援下列網路組態模式做為來源：
+ AWS VPC 對等互連
+  AWS PrivateLink 專用叢集的
+  AWS PrivateLink for Enterprise 叢集
+ AWS Transit Gateway

#### 先決條件
<a name="confluent-cloud-kafka-private-prereqs"></a>

建立 OpenSearch Ingestion 管道之前，請執行下列步驟：

1. 使用包含您要擷取至 OpenSearch Service 之資料的 VPC 網路組態建立 Confluent Cloud Kafka 叢集。

1. 建立您要將資料遷移至其中的 OpenSearch Service 網域或 OpenSearch Serverless 集合。如需詳細資訊，請參閱 和 [建立 OpenSearch Service 網域](createupdatedomains.md#createdomains) [建立集合](serverless-create.md)。

1. 使用 在 Confluent Cloud Kafka 叢集上設定身分驗證 AWS Secrets Manager。依照輪換秘密中的步驟啟用[AWS Secrets Manager 秘密輪](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)換。

1. 取得可存取 Confluent Cloud Kafka 叢集的 VPC ID。選擇要由 OpenSearch Ingestion 使用的 VPC CIDR。
**注意**  
如果您使用 AWS 管理主控台 建立管道，您還必須將 OpenSearch Ingestion 管道連接至 VPC，才能使用 Confluent Cloud Kafka 叢集。若要這樣做，請尋找**網路組態**區段，選取**連接至 VPC** 核取方塊，然後從其中一個提供的預設選項中選擇 CIDR，或選取您自己的選項。您可以從私有地址空間使用任何 CIDR，如 [RFC 1918 最佳實務](https://datatracker.ietf.org/doc/html/rfc1918)所定義。  
若要提供自訂 CIDR，請從下拉式功能表中選取**其他**。若要避免 OpenSearch Ingestion 與自我管理 OpenSearch 之間的 IP 地址發生衝突，請確定自我管理的 OpenSearch VPC CIDR 與 OpenSearch Ingestion 的 CIDR 不同。

1. 將[資源型政策](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)連接至您的網域，或將[資料存取政策](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html)連接至您的集合。這些存取政策允許 OpenSearch Ingestion 將資料從自我管理的叢集寫入您的網域或集合。
**注意**  
如果您使用 AWS PrivateLink 來連接 Confluent Cloud Kafka，則需要設定 [VPC DHCP 選項](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html)。應啟用 *DNS 主機名稱*和 *DNS 解析*。  
具體而言，請使用下列選項集值：  
**企業叢集：**  

   ```
   domain-name: aws.private.confluent.cloud
   domain-name-servers: AmazonProvidedDNS
   ```
**專用叢集：**  

   ```
   domain-name: aws.confluent.cloud
   domain-name-servers: AmazonProvidedDNS
   ```
此變更可確保 Confluent PrivateLink 端點的 DNS 解析在 VPC 內正常運作。

   下列範例網域存取政策允許您在下一個步驟中建立的管道角色將資料寫入網域。請務必`resource`使用自己的 ARN 更新 。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   若要建立具有正確許可的 IAM 角色，以存取寫入資料至集合或網域，請參閱 [在 Amazon OpenSearch 擷取中設定角色和使用者](pipeline-security-overview.md)。

#### 步驟 1：設定管道角色
<a name="confluent-cloud-kafka-private-pipeline-role"></a>

在您設定管道先決條件之後，[請設定您要在管道組態中使用的管道角色](pipeline-security-overview.md#pipeline-security-sink)，並在角色中新增下列許可：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        { 
            "Effect": "Allow",
            "Action": [ 
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": { 
               "StringEquals": 
                    {
                        "aws:RequestTag/OSISManaged": "true"
                    } 
            } 
        }
    ]
}
```

------

您必須對您用來建立 OpenSearch Ingestion 管道的 IAM 角色提供上述 Amazon EC2 許可，因為管道使用這些許可來建立和刪除 VPC 中的網路介面。管道只能透過此網路界面存取 Kafka 叢集。

#### 步驟 2：建立管道
<a name="self-managed-kafka-private-pipeline"></a>

然後，您可以如下所示設定 OpenSearch Ingestion 管道，指定 Kafka 作為來源。

您可以指定多個 OpenSearch Service 網域做為資料的目的地。此功能可將傳入資料的條件式路由或複寫到多個 OpenSearch Service 網域。

您也可以將資料從來源 Confluent Kafka 叢集遷移至 OpenSearch Serverless VPC 集合。請確定您在管道組態中提供網路存取政策。您可以使用 Confluent 結構描述登錄檔來定義 Confluent 結構描述。

```
 version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-2"
```