

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# OpenSearch Ingestion パイプラインを Kafka で使用する
<a name="configure-client-self-managed-kafka"></a>

[Kafka](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/) プラグインを使用して、セルフマネージド Kafka クラスターから Amazon OpenSearch Service ドメインおよび OpenSearch Serverless コレクションにデータをストリーミングできます。OpenSearch Ingestion は、パブリックまたはプライベート (VPC) ネットワークで設定された Kafka クラスターからの接続をサポートします。このトピックでは、相互 TLS (mTLS)、SASL/SCRAM、IAM などのネットワーク設定や認証方法の設定など、取り込みパイプラインをセットアップするための前提条件と手順について説明します。

## パブリック Kafka クラスターからのデータの移行
<a name="self-managaged-kafka-public"></a>

OpenSearch Ingestion パイプラインを使用して、パブリックなセルフマネージド Kafka クラスターからデータを移行できます。つまり、ドメイン DNS 名をパブリックに解決できます。これを行うには、ソースとして セルフマネージド Kafka、宛先として OpenSearch Service または OpenSearch Serverless を使用して OpenSearch Ingestion パイプラインを設定します。これにより、セルフマネージド型ソースクラスターから AWSマネージド型送信先ドメインまたはコレクションへのストリーミングデータが処理されます。

### 前提条件
<a name="self-managaged-kafka-public-prereqs"></a>

OpenSearch Ingestion パイプラインを作成する前に、次の手順を実行します。

1. パブリックネットワーク設定でセルフマネージド Kafka クラスターを作成します。クラスターには、OpenSearch Service に取り込むデータが含まれている必要があります。

1. データの移行先となる OpenSearch Service ドメインまたは OpenSearch Serverless コレクションを作成します。詳細については、「[OpenSearch Service ドメインの作成](createupdatedomains.md#createdomains)」および「[コレクションの作成](serverless-create.md)」を参照してください。

1. でセルフマネージドクラスターの認証を設定します AWS Secrets Manager。[AWS Secrets Manager シークレットのローテーション](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)の手順に従って、シークレットのローテーションを有効にします。

1. [リソースベースのポリシー](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)をドメインにアタッチするか、[データアクセスポリシー](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html)をコレクションにアタッチします。これらのアクセスポリシーにより、OpenSearch Ingestion は自己管理型クラスターからドメインまたはコレクションにデータを書き込むことができます。

   次のドメインアクセスポリシーの例では、次の手順で作成するパイプラインロールに、ドメインへのデータの書き込みが許可されています。必ず独自の ARN で `resource` を更新してください。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAM ロールを作成するには、「[Amazon OpenSearch Ingestion のロールとユーザーの設定](pipeline-security-overview.md)」を参照してください。

### ステップ 1: パイプラインロールを設定する
<a name="self-managed-kafka-public-pipeline-role"></a>

Kafka パイプラインの前提条件を設定したら、パイプライン設定で使用する[パイプラインロールを設定](pipeline-security-overview.md#pipeline-security-sink)し、OpenSearch Service ドメインまたは OpenSearch Serverless コレクションに書き込むアクセス許可と、Secrets Manager からシークレットを読み取るアクセス許可を追加します。

### ステップ 2: パイプラインを作成する
<a name="self-managed-kafka-public-pipeline"></a>

そして、ソースとして Kafka を指定する OpenSearch Ingestion パイプラインを次のように設定できます。

複数の OpenSearch Service ドメインをデータの宛先として指定できます。この機能を使用すると、条件付きルーティングや、受信データを複数の OpenSearch Service ドメインに複製することができます。

ソース Confluent Kafka クラスターから OpenSearch Serverless VPC コレクションにデータを移行することもできます。パイプライン設定内にネットワークアクセスポリシーを指定していることを確認します。Confluent スキーマレジストリを使用して Confluent スキーマを定義できます。

```
version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

事前設定されたブループリントを使用して、このパイプラインを作成できます。詳細については、「[ブループリントの使用](pipeline-blueprint.md)」を参照してください。

### VPC 内の Kafka クラスターからのデータの移行
<a name="self-managaged-kafka-private"></a>

OpenSearch Ingestion パイプラインを使用して、VPC で実行されているセルフマネージド Kafka クラスターからデータを移行することもできます。これを行うには、ソースとして セルフマネージド Kafka、宛先として OpenSearch Service または OpenSearch Serverless を使用して OpenSearch Ingestion パイプラインを設定します。これにより、セルフマネージド型ソースクラスターから AWSマネージド型送信先ドメインまたはコレクションへのストリーミングデータが処理されます。

#### 前提条件
<a name="self-managaged-kafka-private-prereqs"></a>

OpenSearch Ingestion パイプラインを作成する前に、次の手順を実行します。

1. OpenSearch Service に取り込むデータを含む VPC ネットワーク設定を使用してセルフマネージド Kafka クラスターを作成します。

1. データの移行先となる OpenSearch Service ドメインまたは OpenSearch Serverless コレクションを作成します。詳細については、「[Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains)」および「[Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create)」を参照してください。

1. でセルフマネージドクラスターの認証を設定します AWS Secrets Manager。[AWS Secrets Manager シークレットのローテーション](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)の手順に従って、シークレットのローテーションを有効にします。

1. セルフマネージド Kafka にアクセスできる VPC の ID を取得します。OpenSearch Ingestion で使用する VPC CIDR を選択します。
**注記**  
を使用してパイプライン AWS マネジメントコンソール を作成する場合は、セルフマネージド Kafka を使用するにはOpenSearch Ingestion パイプラインも VPC にアタッチする必要があります。これを行うには、**[ネットワーク設定]** セクションを見つけ、**[VPC にアタッチ]** チェックボックスをオンにして、提供されたデフォルトオプションのいずれかから CIDR を選択するか、独自のものを選択します。[RFC 1918 のベストカレントプラクティス](https://datatracker.ietf.org/doc/html/rfc1918)で定義されているように、プライベートアドレス空間から任意の CIDR を使用できます。  
カスタム CIDR を指定するには、ドロップダウンメニューから **[その他]** を選択します。OpenSearch Ingestion とセルフマネージド OpenSearch 間の IP アドレスの衝突を回避するには、セルフマネージド OpenSearch VPC の CIDR が OpenSearch Ingestion の CIDR と異なることを確認してください。

1. [リソースベースのポリシー](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)をドメインにアタッチするか、[データアクセスポリシー](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html)をコレクションにアタッチします。これらのアクセスポリシーにより、OpenSearch Ingestion は自己管理型クラスターからドメインまたはコレクションにデータを書き込むことができます。

   次のドメインアクセスポリシーの例では、次の手順で作成するパイプラインロールに、ドメインへのデータの書き込みが許可されています。必ず独自の ARN で `resource` を更新してください。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAM ロールを作成するには、「[Amazon OpenSearch Ingestion のロールとユーザーの設定](pipeline-security-overview.md)」を参照してください。

#### ステップ 1: パイプラインロールを設定する
<a name="self-managed-kafka-private-pipeline-role"></a>

パイプラインの前提条件を設定したら、パイプライン設定で使用する[パイプラインロールを設定](pipeline-security-overview.md#pipeline-security-sink)し、そのロールに次の許可を追加します。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        { 
            "Effect": "Allow",
            "Action": [ 
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": { 
               "StringEquals": 
                    {
                        "aws:RequestTag/OSISManaged": "true"
                    } 
            } 
        }
    ]
}
```

------

OpenSearch Ingestion パイプラインの作成に使用する IAM ロールには、上記の Amazon EC2 アクセス許可を指定する必要があります。これは、パイプラインがこれらのアクセス許可を使用して VPC 内のネットワークインターフェイスを作成および削除するためです。パイプラインは、このネットワークインターフェイスを介してのみ Kafka クラスターにアクセスできます。

#### ステップ 2: パイプラインを作成する
<a name="self-managed-kafka-private-pipeline"></a>

そして、ソースとして Kafka を指定する OpenSearch Ingestion パイプラインを次のように設定できます。

複数の OpenSearch Service ドメインをデータの宛先として指定できます。この機能を使用すると、条件付きルーティングや、受信データを複数の OpenSearch Service ドメインに複製することができます。

ソース Confluent Kafka クラスターから OpenSearch Serverless VPC コレクションにデータを移行することもできます。パイプライン設定内にネットワークアクセスポリシーを指定していることを確認します。Confluent スキーマレジストリを使用して Confluent スキーマを定義できます。

```
 version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

事前設定されたブループリントを使用して、このパイプラインを作成できます。詳細については、「[ブループリントの使用](pipeline-blueprint.md)」を参照してください。