

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# での OpenSearch Ingestion パイプラインの使用 Amazon Managed Streaming for Apache Kafka
<a name="configure-client-msk"></a>

[Kafka プラグイン](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/)を使用して [Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/) から OpenSearch Ingestion パイプラインにデータを取り込むことができます。Amazon MSK を使用すると、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを構築および実行できます。OpenSearch Ingestion は AWS PrivateLink を使用して Amazon MSK に接続します。Amazon MSK クラスターと Amazon MSK Serverless クラスターの両方からデータを取り込むことができます。2 つのプロセスの違いは、パイプラインをセットアップする前に実行する必要がある前提条件の手順だけです。

**Topics**
+ [プロビジョニングされた Amazon MSK の前提条件](#msk-prereqs)
+ [Amazon MSK Serverless の前提条件](#msk-serverless-prereqs)
+ [ステップ 1: パイプラインロールを設定する](#msk-pipeline-role)
+ [ステップ 2: パイプラインを作成する](#msk-pipeline)
+ [ステップ 3: (オプション) AWS Glue スキーマレジストリを使用する](#msk-glue)
+ [ステップ 4: (オプション) Amazon MSK パイプラインの推奨コンピューティングユニット (OCU) を設定する](#msk-ocu)

## プロビジョニングされた Amazon MSK の前提条件
<a name="msk-prereqs"></a>

OpenSearch Ingestion パイプラインを作成する前に、次の手順を実行します。

1. *Amazon Managed Streaming for Apache Kafka 開発者ガイド*の「[クラスターの作成](https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#create-cluster-console)」の手順に従って Amazon MSK のプロビジョニングされたクラスターを作成します。**[ブローカータイプ]** では、`t3` タイプは OpenSearch Ingestion ではサポートされていないため、それ以外のオプションを選択します。

1. クラスターのステータスが **Active** になったら、「[マルチ VPC 接続を有効にする](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)」の手順に従います。

1. クラスターとパイプラインが同じ AWS アカウントにあるかどうかに応じて、[「クラスターポリシーを MSK クラスターにアタッチする](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy)」のステップに従い、以下のポリシーのいずれかをアタッチします。このポリシーでは、OpenSearch Ingestion は Amazon MSK クラスターへの AWS PrivateLink 接続を作成して、Kafka トピックからデータを読み取ることができます。必ず独自の ARN で `resource` を更新してください。

   クラスターとパイプラインが同じ AWS アカウントにある場合は、次のポリシーが適用されます。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   Amazon MSK クラスターがパイプライン AWS アカウント とは異なる にある場合は、代わりに次のポリシーをアタッチします。クロスアカウントアクセスは、プロビジョニングされた Amazon MSK クラスターでのみ可能であり、Amazon MSK Serverless クラスターではできないことに注意してください。の AWS `principal` ARN は、パイプライン設定に提供するのと同じパイプラインロールの ARN である必要があります。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "kafka-cluster:*",
           "kafka:*"
         ],
         "Resource": [
           "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id",
           "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/*",
           "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
         ]
       }
     ]
   }
   ```

------

1. 「[トピックの作成](https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html)」の手順に従って Kafka トピックを作成します。`BootstrapServerString` がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。`--replication-factor` の値は、Amazon MSK クラスターのゾーンの数に応じて `2` または `3` を指定します。`--partitions` の値は少なくとも `10` である必要があります。

1. 「[データの生成と消費](https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html)」の手順に従って、データを生成して使用します。`BootstrapServerString` がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。

## Amazon MSK Serverless の前提条件
<a name="msk-serverless-prereqs"></a>

OpenSearch Ingestion パイプラインを作成する前に、次の手順を実行します。

1. *Amazon Managed Streaming for Apache Kafka 開発者ガイド*の「[MSK Serverless クラスターの作成](https://docs.aws.amazon.com/msk/latest/developerguide/create-serverless-cluster.html#)」の手順に従って Amazon MSK Serverless クラスターを作成します。

1. クラスターのステータスが **[アクティブ]** になったら、「[クラスターポリシーを MSK クラスターにアタッチする](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy)」の手順に従って、次のポリシーをアタッチします。必ず独自の ARN で `resource` を更新してください。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   このポリシーにより、OpenSearch Ingestion は Amazon MSK Serverless クラスター AWS PrivateLink への接続を作成し、Kafka トピックからデータを読み取ることができます。このポリシーは、クラスターとパイプラインが同じ にある場合に適用されます。これは AWS アカウント、Amazon MSK Serverless がクロスアカウントアクセスをサポートしていないためです。

1. 「[トピックの作成](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-create-topic.html)」の手順に従って Kafka トピックを作成します。`BootstrapServerString` が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URL の 1 つであることを確認します。`--replication-factor` の値は、Amazon MSK Serverless クラスターのゾーンの数に応じて `2` または `3` を指定します。`--partitions` の値は少なくとも `10` である必要があります。

1. 「[データの生成と消費](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-produce-consume.html)」の手順に従って、データを生成して使用します。ここでも、`BootstrapServerString` が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URL の 1 つであることを確認します。

## ステップ 1: パイプラインロールを設定する
<a name="msk-pipeline-role"></a>

Amazon MSK をプロビジョニングし、サーバーレスクラスターを設定したら、パイプライン設定で使用するパイプラインロールに次の Kafka アクセス許可を追加します。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/topic-name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
            ]
        }
    ]
}
```

------

## ステップ 2: パイプラインを作成する
<a name="msk-pipeline"></a>

そして、ソースとして Kafka を指定する OpenSearch Ingestion パイプラインを次のように設定できます。

```
version: "2"
log-pipeline:
  source:
    kafka:
      acknowledgements: true
      topics:
      - name: "topic-name"
        group_id: "grouplambd-id"
      aws:
        msk:
          arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id"
        region: "us-west-2"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index_name"
      aws_region: "region"
      aws_sigv4: true
```

事前設定された Amazon MSK ブループリントを使用して、このパイプラインを作成できます。詳細については、「[ブループリントの使用](pipeline-blueprint.md)」を参照してください。

## ステップ 3: (オプション) AWS Glue スキーマレジストリを使用する
<a name="msk-glue"></a>

Amazon MSK で OpenSearch Ingestion を使用する場合、 AWS Glue スキーマレジストリでホストされているスキーマに AVRO データ形式を使用できます。[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を使用すると、データストリームスキーマを一元的に検出、制御、および展開できます。

このオプションを使用するには、パイプライン設定で `type` スキーマを有効にします。

```
schema:
  type: "aws_glue"
```

また、パイプラインロールで AWS Glue に読み取りアクセス許可を付与する必要があります。[AWSGlueSchemaRegistryReadonlyAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSGlueSchemaRegistryReadonlyAccess.html) という AWS マネージドポリシーを使用できます。さらに、レジストリは OpenSearch Ingestion パイプラインと同じ AWS アカウント およびリージョンに存在する必要があります。

## ステップ 4: (オプション) Amazon MSK パイプラインの推奨コンピューティングユニット (OCU) を設定する
<a name="msk-ocu"></a>

各コンピューティングユニットには、トピックごとに 1 つのコンシューマーがあります。ブローカーは、特定のトピックについて、これらのコンシューマー間でパーティションのバランスを取ります。ただし、パーティションの数がコンシューマーの数よりも多い場合、Amazon MSK は各コンシューマーで複数のパーティションをホストします。OpenSearch Ingestion には、CPU 使用率またはパイプライン内の保留中のレコード数に基づいてスケールアップまたはスケールダウンする自動スケーリングが組み込まれています。

最適なパフォーマンスを得るには、パーティションを多くのコンピューティングユニットに分散して並列処理を行います。トピックに多くのパーティションがある場合 (パイプラインあたりの最大数である 96 以上の OCU がある場合など)、1 ～ 96 個の OCU でパイプラインを設定することをお勧めします。これは、必要に応じて自動的にスケールするためです。トピックのパーティション数が少ない場合 (96 未満の場合など)、最大コンピューティングユニットをパーティションの数と同じにします。

パイプラインに複数のトピックがある場合は、最大コンピューティングユニットを設定する参照としてパーティション数が最も多いトピックを選択します。新しい OCU セットを含むパイプラインを同じトピックとコンシューマーグループに追加すると、スループットをほぼ直線的にスケールすることができます。