

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon OpenSearch Ingestion のパイプライン機能の概要
<a name="osis-features-overview"></a>

Amazon OpenSearch Ingestion は、ソース、バッファ、0 個以上のプロセッサ、1 個以上のシンクで構成される*パイプライン*をプロビジョニングします。取り込みパイプラインは、データエンジンとして Data Prepper を利用しています。パイプラインのさまざまなコンポーネントの概要については、「[Amazon OpenSearch Ingestion の主な概念](ingestion-process.md)」を参照してください。

次の各セクションでは、Amazon OpenSearch Ingestion で最も一般的に使用されるいくつかの機能の概要を説明します。

**注記**  
これは、パイプラインで利用可能な機能をすべて網羅したリストではありません。使用可能なすべてのパイプライン機能に関する包括的なドキュメントについては、「[Data Prepper のドキュメント](https://opensearch.org/docs/latest/data-prepper/pipelines/pipelines/)」を参照してください。OpenSearch Ingestion では、使用できるプラグインやオプションにいくつかの制約があることに注意してください。詳細については、「[Amazon OpenSearch Ingestion パイプラインでサポートされているプラグインとオプション](pipeline-config-reference.md)」を参照してください。

**Topics**
+ [永続的バッファリング](#persistent-buffering)
+ [分割](#osis-features-splitting)
+ [チェーン](#osis-features-chaining)
+ [デッドレターキュー](#osis-features-dlq)
+ [インデックス管理](#osis-features-index-management)
+ [エンドツーエンドの確認応答](#osis-features-e2e)
+ [ソースバックプレッシャー](#osis-features-backpressure)

## 永続的バッファリング
<a name="persistent-buffering"></a>

永続バッファは、データを複数のアベイラビリティーゾーンにわたるディスクベースのバッファに保存して、データの耐久性を高めます。永続的バッファリングを使用して、スタンドアロンバッファを設定することなく、サポートされているすべてのプッシュベースのソースからデータを取り込むことができます。これらのソースには、ログ、トレース、メトリクスの HTTP と OpenTelemetry が含まれます。永続的バッファリングを有効にするには、パイプラインを作成または更新するときに **[永続的バッファリングを有効にする]** を選択します。詳細については、「[Amazon OpenSearch Ingestion パイプラインの作成](creating-pipeline.md)」を参照してください。

OpenSearch Ingestion は、データソース、ストリーミング変換、およびシンクの送信先を考慮して、永続的なバッファリングに使用する OCU の数を動的に決定します。一部の OCU がバッファリングに割り当てられるため、同じ取り込みスループットを維持するには、最小 OCU と最大 OCUs を増やす必要がある場合があります。パイプラインは、データをバッファに最大 72 時間保持します。

パイプラインの永続的バッファリングを有効にする場合、デフォルトの最大リクエストペイロードサイズは次のとおりです。
+ **HTTP ソース** – 10 MB
+ **OpenTelemetry ソース** – 4 MB

HTTP ソースの場合、ペイロードの最大サイズを 20 MB に増やすことができます。リクエストペイロードサイズには、通常複数のイベントを含む HTTP リクエスト全体が含まれます。各イベントは 3.5 MB を超えることはできません。

永続的バッファが有効になっているパイプラインは、プロセッサタイプに関係なく、1:1 buffer-to-computeの比率を使用して、設定された OCUs (パイプラインユニット) をコンピューティングとバッファの間で均等に分割します。

デフォルトでは、パイプラインは を使用してバッファデータを暗号化 AWS 所有のキー します。これらのパイプラインでは、パイプラインロールに追加の許可は必要ありません。

あるいは、カスタマーマネージドキーを指定して、次の IAM 許可をパイプラインロールに追加することもできます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KeyAccess",
            "Effect": "Allow",
            "Action": [
              "kms:Decrypt",
              "kms:GenerateDataKeyWithoutPlaintext"
            ],
            "Resource": "arn:aws:kms:{{us-east-1}}:{{111122223333}}:key/{{ASIAIOSFODNN7EXAMPLE}}"
        }
    ]
}
```

------

詳細については、「[AWS Key Management Service デベロッパーガイド](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk)」の「カスタマーマネージドキー」を参照してください。

**注記**  
永続的バッファリングを無効にすると、パイプラインは完全にメモリ内バッファリングで動作を開始します。

## 分割
<a name="osis-features-splitting"></a>

OpenSearch Ingestion パイプラインを設定し、受信イベントをサブパイプラインに*分割する*と、同じ受信イベントに対してさまざまなタイプの処理を実行できます。

次のパイプラインの例では、受信イベントを 2 つのサブパイプラインに分割します。各サブパイプラインは、独自のプロセッサを使用してデータをエンリッチ化および操作し、データを異なる OpenSearch インデックスに送信します。

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  sink:
    - pipeline:
        name: "logs_enriched_one_pipeline"
    - pipeline:
        name: "logs_enriched_two_pipeline"

logs_enriched_one_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
        index: "enriched_one_logs"

logs_enriched_two_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_two_logs"
```

## チェーン
<a name="osis-features-chaining"></a>

複数のサブパイプラインを*連結*し、データ処理とエンリッチメントをチャンク単位で実行できます。つまり、1 つのサブパイプライン内で特定の処理機能を用いて受信イベントをエンリッチ化し、それを別のサブパイプラインに送信して別のプロセッサでさらにエンリッチ化して、最後に OpenSearch シンクに送信できます。

次の例では、`log_pipeline` サブパイプラインは受信ログイベントをプロセッサのセットでエンリッチ化し、そのイベントを `enriched_logs` という名前の OpenSearch インデックスに送信します。パイプラインは同じイベントを `log_advanced_pipeline` サブパイプラインに送信し、サブパイプラインはそれを処理して `enriched_advanced_logs` という名前の別の OpenSearch インデックスに送信します。

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  processor:
    ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_logs"
    - pipeline:
        name: "log_advanced_pipeline"

log_advanced_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_advanced_logs"
```

## デッドレターキュー
<a name="osis-features-dlq"></a>

デッドレターキュー (DLQ) とは、パイプラインがシンクへの書き込みに失敗したイベントの送信先です。OpenSearch Ingestion では、DLQ として使用するために、適切な書き込み許可を持つ Amazon S3 バケットを指定する必要があります。パイプライン内のすべてのシンクに DLQ 設定を追加できます。パイプラインで書き込みエラーが発生すると、設定された S3 バケットに DLQ オブジェクトが作成されます。DLQ オブジェクトは、失敗したイベントの配列として JSON ファイル内に存在します。

次のいずれかの条件が満たされたとき、パイプラインは DLQ にイベントを書き込みます。
+ OpenSearch シンクの**[最大再試行]** 回数が使い果たされました。OpenSearch Ingestion では、この設定での必要最低数は 16 個です。
+ エラー状態のため、イベントがシンクによって拒否されています。

### 設定
<a name="osis-features-dlq-config"></a>

サブパイプラインのデッドレターキューを設定するには、シンクの送信先を設定するときに **[S3 DLQ を有効にする]** を選択します。次に、キューに必要な設定を指定します。詳細については、Data Prepper DLQ ドキュメントの「[Configuration](https://opensearch.org/docs/latest/data-prepper/pipelines/dlq/#configuration)」を参照してください。

この S3 DLQ に書き込まれたファイルには、次の命名パターンが付けられます。

```
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
```

DLQ が書き込む S3 バケットへのアクセスを許可するようにパイプラインロールを手動で設定する手順については、「[Amazon S3 またはデッドレターキューに書き込むためのアクセス許可](pipeline-security-overview.md#pipeline-security-dlq)」を参照してください。

### 例
<a name="osis-features-dlq-example"></a>

次の DLQ ファイルの例を考えてみます。

```
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
```

次は、シンクへの書き込みに失敗したデータの例です。これは、さらなる分析のために DLQ S3 バケットに送信されます。

```
Record_0	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index		  "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log		    "sample log"
timestamp	    "2023-04-14T10:36:01.070Z"

Record_1	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index               "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log                 "another sample log"
timestamp           "2023-04-14T10:36:01.071Z"
```

## インデックス管理
<a name="osis-features-index-management"></a>

Amazon OpenSearch Ingestion には、次の機能など多くのインデックス管理機能があります。

### インデックスの作成
<a name="osis-features-index-management-create"></a>

パイプラインのシンクでインデックス名を指定すると、OpenSearch Ingestion でパイプラインをプロビジョニングするときに、そのインデックスが作成されます。インデックスが既に存在する場合、パイプラインはそれを使用して受信イベントのインデックスを作成します。インデックスがまだ存在しない場合、パイプラインを停止して再起動する、または YAML 設定を更新すると、パイプラインは新しいインデックスの作成を試みます。パイプラインではインデックスを一切削除できません。

次のシンクのサンプルでは、パイプラインがプロビジョニングされるときに 2 つのインデックスを作成します。

```
sink:
  - opensearch:
      index: apache_logs
  - opensearch:
      index: nginx_logs
```

### インデックス名とパターンの生成
<a name="osis-features-index-management-patterns"></a>

受信イベントのフィールドにある変数を使用すると、動的なインデックス名を生成できます。シンク設定では、形式 `string${}` を使用して文字列補間を示し、JSON ポインタを使用してイベントからフィールドを抽出します。`index_type` のオプションは `custom` または `management_disabled` です。`index_type` は OpenSearch ドメインでは `custom` に、OpenSearch Serverless コレクションでは `management_disabled` にデフォルトで設定されるため、未設定のままにすることができます。

例えば、次のパイプラインは、受信イベントから `metadataType` フィールドを選択してインデックス名を生成します。

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}"
```

次の設定では、1 日または 1 時間ごとに新しいインデックスを生成し続けます。

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd}"

pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
```

インデックス名は、`my-index-%{yyyy.MM.dd}` のように、サフィックスとして日付/時刻パターンを持つプレーン文字列にすることもできます。シンクでは、データを OpenSearch に送信するときに日時パターンを UTC 時間に置き換え、`my-index-2022.01.25` のような新しいインデックスを 1 日ごとに作成します。詳細については、「[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html)」クラスを参照してください。

このインデックス名は、`my-${index}-name` のように形式化された文字列にすることもできます (日付/時刻パターンのサフィックスの有無にかかわらず)。シンクでは、データを OpenSearch に送信するときに、`"${index}"` の部分を処理中のイベント内の値に置き換えます。形式が `"${index1/index2/index3}"` の場合、フィールド `index1/index2/index3` をイベント内の値に置き換えます。

### ドキュメント ID の生成
<a name="osis-features-index-management-ids"></a>

OpenSearch にドキュメントのインデックスを作成するとき、パイプラインはドキュメント ID を生成できます。また、それらのドキュメント ID を受信イベント内のフィールドから推測することも可能です。

次の例では、受信イベントの `uuid` フィールドを使用してドキュメント ID を生成します。

```
pipeline:
  ...
  sink:
    opensearch:
      index_type: custom
      index: "metadata-${metadataType}-%{yyyy.MM.dd}" 
      "document_id": "uuid"
```

次の例では、[Add entries](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/add-entries/) プロセッサを使用し、受信イベントから `uuid` フィールドと `other_field` フィールドをマージしてドキュメント ID を生成します。

`create` アクションは、同じ ID のドキュメントが上書きされないようにします。パイプラインは再試行や DLQ イベントを必要とせずに、重複したドキュメントを削除します。ここでの目的は、既存ドキュメントの更新を避けることなので、このアクションを使用するパイプライン作成者にとっては当然想定されるものです。

```
pipeline:
  ...
  processor:
   - add_entries:
      entries:
        - key: "my_doc_id_field"
          format: "${uuid}-${other_field}"
  sink:
    - opensearch:
       ...
       action: "create"
       document_id: "my_doc_id"
```

イベントのドキュメント ID をサブオブジェクトのフィールドに設定したい場合があります。次の例では、OpenSearch シンクプラグインで `info/id` というサブオブジェクトを使用して、ドキュメント ID を生成します。

```
sink:
  - opensearch:
       ...
       document_id: info/id
```

次のイベントが発生すると、パイプラインは `_id` フィールドに `json001` を設定したドキュメントを生成します。

```
{
   "fieldA":"arbitrary value",
   "info":{
      "id":"json001",
      "fieldA":"xyz",
      "fieldB":"def"
   }
}
```

### ルーティング ID の生成
<a name="osis-features-index-management-routing-ids"></a>

OpenSearch シンクプラグイン内の `routing_field` オプションを使用すると、ドキュメントルーティングプロパティ (`_routing`) の値を受信イベントの値に設定できます。

ルーティングは JSON ポインタ構文をサポートしているため、最上位のフィールドだけでなく、ネストされたフィールドも使用できます。

```
sink:
  - opensearch:
       ...
       routing_field: metadata/id
       document_id: id
```

次のイベントが発生すると、プラグインは `_routing` フィールドに `abcd` を設定したドキュメントを生成します。

```
{
   "id":"123",
   "metadata":{
      "id":"abcd",
      "fieldA":"valueA"
   },
   "fieldB":"valueB"
}
```

インデックスを作成するときに、パイプラインで使用できるインデックステンプレートを作成する手順については、「[インデックステンプレート](https://opensearch.org/docs/latest/im-plugin/index-templates/)」を参照してください。

## エンドツーエンドの確認応答
<a name="osis-features-e2e"></a>

OpenSearch Ingestion では、*エンドツーエンドの確認応答*を使用して、ステートレスパイプライン内のソースからシンクまでのデータの配信を追跡します。これにより、データの耐久性と信頼性を保証します。

**注記**  
現在、エンドツーエンドの確認応答をサポートしているのは [S3 ソース](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/)プラグインのみです。

エンドツーエンドの確認応答では、パイプラインのソースプラグインが*確認応答セット*を作成し、イベントのバッチを監視します。イベントがシンクに正常に送信された場合は肯定応答を受け取り、いずれかのイベントがシンクに送信できなかった場合は否定応答を受け取ります。

パイプラインコンポーネントに障害またはクラッシュが発生した場合、またはソースが確認応答を受け取れなかった場合、ソースはタイムアウトし、再試行や障害のログ記録などの必要なアクションを実行します。パイプラインに複数のシンクまたは複数のサブパイプラインが設定されている場合、イベントレベルの確認応答は、イベントが*全*サブパイプラインの*全*シンクに送信された後にのみ送信されます。シンクに DLQ が設定されている場合、エンドツーエンドの確認応答は DLQ に書き込まれたイベントも追跡します。

エンドツーエンドの確認を有効にするには、Amazon S3 ソース設定で **[追加オプション]** を展開し **[エンドツーエンドのメッセージの確認を有効にする]** を選択します。

## ソースバックプレッシャー
<a name="osis-features-backpressure"></a>

パイプラインでは、データ処理で負荷がかかっているときや、シンクが一時的にダウンしていたり、データの取り込みに時間がかかったりすると、バックプレッシャーが生じることがあります。OpenSearch Ingestion では、パイプラインで使用されているソースプラグインによってバックプレッシャーの処理方法が異なります。

### HTTP ソース
<a name="osis-features-backpressure-http"></a>

[HTTP ソース](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http-source/)プラグインを使用するパイプラインでは、混雑しているパイプラインコンポーネントによってバックプレッシャーの処理方法が異なります。
+ **バッファ** - バッファがいっぱいになると、パイプラインはエラーコード 408 の HTTP ステータス `REQUEST_TIMEOUT` をソースエンドポイントに返し始めます。バッファが解放されると、パイプラインは HTTP イベントの処理を再開します。
+ **ソーススレッド** - すべての HTTP ソーススレッドがリクエストを実行中で負荷がかかっており、未処理のリクエストキューサイズがリクエストの最大許容数を超えた場合、パイプラインはエラーコード 429 の HTTP ステータス `TOO_MANY_REQUESTS` をソースエンドポイントに返し始めます。リクエストキューが最大許容キューサイズを下回ると、パイプラインはリクエストの処理を再開します。

### OTel ソース
<a name="osis-features-backpressure-otel"></a>

OpenTelemetry ソース ([OTel logs](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-logs-source)、[OTel metrics](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-metrics-source/)、および [OTel trace](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-trace/)) を使用するパイプラインのバッファがいっぱいになると、パイプラインはエラーコード 408 の HTTP ステータス `REQUEST_TIMEOUT` をソースエンドポイントに返し始めます。バッファが解放されると、パイプラインはイベントの処理を再開します。

### S3 ソース
<a name="osis-features-backpressure-s3"></a>

[S3](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) ソースを使用するパイプラインのバッファがいっぱいになると、パイプラインは SQS 通知の処理を停止します。バッファが解放されると、パイプラインは通知の処理を再開します。

シンクがダウンしている、またはデータを取り込むことができず、ソースのエンドツーエンドの確認応答が有効になっている場合、パイプラインはすべてのシンクから正常な確認を受け取るまで SQS 通知の処理を停止します。