

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 使用 OpenSearch 擷取管道 Amazon Managed Streaming for Apache Kafka
<a name="configure-client-msk"></a>

您可以使用 [Kafka 外掛程式](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/)，將資料從 [Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/msk/latest/developerguide/) (Amazon MSK) 擷取到您的 OpenSearch Ingestion 管道。透過 Amazon MSK，您可以建置和執行使用 Apache Kafka 來處理串流資料的應用程式。OpenSearch Ingestion 使用 AWS PrivateLink 連線到 Amazon MSK。您可以從 Amazon MSK 和 Amazon MSK Serverless 叢集擷取資料。這兩個程序的唯一區別是設定管道之前必須採取的先決條件。

**Topics**
+ [佈建的 Amazon MSK 先決條件](#msk-prereqs)
+ [Amazon MSK Serverless 先決條件](#msk-serverless-prereqs)
+ [步驟 1：設定管道角色](#msk-pipeline-role)
+ [步驟 2：建立管道](#msk-pipeline)
+ [步驟 3：（選用） 使用 AWS Glue 結構描述登錄檔](#msk-glue)
+ [步驟 4：（選用） 為 Amazon MSK 管道設定建議的運算單位 (OCUs)](#msk-ocu)

## 佈建的 Amazon MSK 先決條件
<a name="msk-prereqs"></a>

建立 OpenSearch Ingestion 管道之前，請執行下列步驟：

1. 按照 Amazon Managed Streaming for Apache Kafka 開發人員指南中的[建立叢集](https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#create-cluster-console)中的步驟建立 Amazon MSK 佈建叢集。 **對於**中介裝置類型**，請選擇`t3`類型以外的任何選項，因為 OpenSearch Ingestion 不支援這些選項。

1. 叢集處於**作用中**狀態後，請遵循[開啟多 VPC 連線](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)中的步驟。

1. 根據您的[叢集和管道是否位於相同位置，遵循將叢集政策連接至 MSK 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy)中的步驟，以連接下列其中一個政策 AWS 帳戶。此政策允許 OpenSearch Ingestion 建立 Amazon MSK 叢集的 AWS PrivateLink 連線，並從 Kafka 主題讀取資料。請務必`resource`使用自己的 ARN 更新 。

   當您的叢集和管道位於相同的 時，適用下列政策 AWS 帳戶：

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   如果您的 Amazon MSK 叢集與管道 AWS 帳戶 位於不同的 中，請改為連接下列政策。請注意，跨帳戶存取僅適用於佈建的 Amazon MSK 叢集，而不適用於 Amazon MSK Serverless 叢集。的 AWS `principal` ARN 應該是您提供給管道組態的相同管道角色的 ARN：

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "kafka-cluster:*",
           "kafka:*"
         ],
         "Resource": [
           "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id",
           "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/*",
           "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
         ]
       }
     ]
   }
   ```

------

1. 依照建立主題中的步驟建立 Kafka [主題](https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html)。確定 `BootstrapServerString`是其中一個私有端點 （單一 VPC) 引導 URLs。根據 Amazon MSK 叢集擁有的區域數量`3`， 的值`--replication-factor`應為 `2`或 。的值`--partitions`應至少為 `10`。

1. 遵循生產和使用資料中的步驟來[生產和使用資料](https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html)。同樣地，請確定 `BootstrapServerString`是您的私有端點 （單一 VPC) 引導 URLs之一。

## Amazon MSK Serverless 先決條件
<a name="msk-serverless-prereqs"></a>

建立 OpenSearch Ingestion 管道之前，請執行下列步驟：

1. 按照 Amazon *Managed Streaming for Apache Kafka 開發人員指南中的*[建立 MSK Serverless 叢集中的步驟建立 Amazon MSK Serverless 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/create-serverless-cluster.html#)。

1. 叢集處於**作用中**狀態後，請遵循將[叢集政策連接至 MSK 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy)中的步驟，以連接下列政策。請務必`resource`使用自己的 ARN 更新 。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   此政策允許 OpenSearch Ingestion 建立與 Amazon MSK Serverless 叢集的 AWS PrivateLink 連線，並從 Kafka 主題讀取資料。當您的叢集和管道位於相同的 時，此政策即適用 AWS 帳戶，因為 Amazon MSK Serverless 不支援跨帳戶存取，因此此政策必須是 true。

1. 依照建立主題中的步驟建立 Kafka [主題](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-create-topic.html)。請確定 `BootstrapServerString`是您的簡易身分驗證和安全層 (SASL) IAM 引導 URLs之一。根據 Amazon MSK Serverless 叢集擁有的區域數量`3`， 的值`--replication-factor`應為 `2`或 。的值`--partitions`應至少為 `10`。

1. 遵循生產和使用資料中的步驟來[生產和使用資料](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-produce-consume.html)。同樣地，請確定 `BootstrapServerString`是您的簡易身分驗證和安全層 (SASL) IAM 引導 URLs之一。

## 步驟 1：設定管道角色
<a name="msk-pipeline-role"></a>

在您設定 Amazon MSK 提供或無伺服器叢集之後，請在管道組態中要使用的管道角色中新增下列 Kafka 許可：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/topic-name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
            ]
        }
    ]
}
```

------

## 步驟 2：建立管道
<a name="msk-pipeline"></a>

然後，您可以如下所示設定 OpenSearch Ingestion 管道，將 Kafka 指定為來源：

```
version: "2"
log-pipeline:
  source:
    kafka:
      acknowledgements: true
      topics:
      - name: "topic-name"
        group_id: "grouplambd-id"
      aws:
        msk:
          arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id"
        region: "us-west-2"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index_name"
      aws_region: "region"
      aws_sigv4: true
```

您可以使用預先設定的 Amazon MSK 藍圖來建立此管道。如需詳細資訊，請參閱[使用藍圖](pipeline-blueprint.md)。

## 步驟 3：（選用） 使用 AWS Glue 結構描述登錄檔
<a name="msk-glue"></a>

當您將 OpenSearch Ingestion 與 Amazon MSK 搭配使用時，您可以將 AVRO 資料格式用於 AWS Glue 結構描述登錄檔中託管的結構描述。使用[AWS Glue 結構描述登錄](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)檔，您可以集中探索、控制和發展資料串流結構描述。

若要使用此選項，請在管道組態`type`中啟用結構描述：

```
schema:
  type: "aws_glue"
```

您還必須在管道角色中提供 AWS Glue 讀取存取許可。您可以使用稱為 [AWSGlueSchemaRegistryReadonlyAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSGlueSchemaRegistryReadonlyAccess.html) 的 AWS 受管政策。此外，您的登錄檔必須與 OpenSearch Ingestion 管道位於相同的 AWS 帳戶 和 區域。

## 步驟 4：（選用） 為 Amazon MSK 管道設定建議的運算單位 (OCUs)
<a name="msk-ocu"></a>

每個運算單位每個主題都有一個取用者。中介裝置在指定主題的這些取用者之間平衡分割區。不過，當分割區數量大於取用者數量時，Amazon MSK 會在每個取用者上託管多個分割區。OpenSearch Ingestion 具有內建的自動擴展功能，可根據 CPU 用量或管道中待定記錄的數量來擴展或縮減規模。

為了獲得最佳效能，請將分割區分散到許多運算單位以進行平行處理。如果主題具有大量分割區 （例如，超過 96 個，也就是每個管道的最大 OCUs)，建議您使用 1–96 OCUs 設定管道。這是因為它將視需要自動擴展。如果主題具有少量分割區 （例如，小於 96)，請保持最大運算單位與分割區數量相同。

當管道有多個主題時，請選擇分割區數量最高的主題做為設定最大運算單位的參考。透過將具有一組新 OCUs的另一個管道新增至相同的主題和取用者群組，您可以幾乎線性地擴展輸送量。