

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Amazon OpenSearch Ingestion 中的管道功能概述
<a name="osis-features-overview"></a>

Amazon OpenSearch Ingestion 预置*管道*，这些管道由一个源、一个缓冲区、零个或多个处理器以及一个或多个接收器组成。提取管道由 Data Prepper 作为数据引擎提供支持。有关管道各个组件的概述，请参见 [Amazon OpenSearch Ingestion 中的关键概念](ingestion-process.md)。

以下各节概述了 Amazon OpenSearch Ingestion 中一些最常用的功能。

**注意**  
该列表不是管道可用功能的详尽列表。有关管道所有可用功能的综合文档，请参阅 [Data Prepper 文档](https://opensearch.org/docs/latest/data-prepper/pipelines/pipelines/)。请注意， OpenSearch Ingestion 对你可以使用的插件和选项施加了一些限制。有关更多信息，请参阅 [Amazon OpenSearch Ingestion 管道支持的插件和选项](pipeline-config-reference.md)。

**Topics**
+ [持久缓冲功能](#persistent-buffering)
+ [拆分](#osis-features-splitting)
+ [链接](#osis-features-chaining)
+ [死信队列](#osis-features-dlq)
+ [索引管理](#osis-features-index-management)
+ [End-to-end 确认](#osis-features-e2e)
+ [源背压](#osis-features-backpressure)

## 持久缓冲功能
<a name="persistent-buffering"></a>

持久缓冲区将您的数据存储在跨多个可用区且基于磁盘的磁盘缓冲区中，以增强数据持久性。对于所有支持的基于推送的源，您可使用持久缓冲区以摄取数据，无需设置独立的缓冲区。这些来源包括 HTTP 以及 OpenTelemetry 用于日志、跟踪和指标的来源。要启用持久缓冲功能，请在创建或更新管道时选择**启用持久缓冲区**。有关更多信息，请参阅 [创建 Amazon OpenSearch Ingestion 管道](creating-pipeline.md)。

OpenSearch Ingestion 会动态确定 OCUs 用于持久缓冲的数量，同时考虑数据源、流式传输转换和接收目标。由于它会 OCUs 为缓冲分配一些资源，因此您可能需要增加最小值和最大值 OCUs 以保持相同的摄取吞吐量。管道在缓冲区中保留数据的时间最长可达 72 小时。

如果为管道启用持久缓冲功能，则默认最大请求有效载荷大小如下所示：
+ **HTTP 源**：10 MB
+ **OpenTelemetry 来源** — 4 MB

对于 HTTP 源，您可以将最大有效载荷大小增加到 20 MB。请求有效载荷大小包括整个 HTTP 请求，通常包含多个事件。每个事件不得超过 3.5 MB。

具有持久缓冲功能的管道将配置的管道单位在计算单位和缓冲区单位之间进行拆分。如果管道使用 CPU 密集型处理器，例如 grok、键值或拆分字符串，则它将以 1:1 的比例分配单位。 buffer-to-compute否则，将按 3:1 的比例进行分配，始终优先考虑计算单位。

例如：
+ 具有 grok 和 2 个最大单位的管道：1 个计算单位和 1 个缓冲区单位
+ 具有 grok 和 5 个最大单位的管道：3 个计算单位和 2 个缓冲区单位
+ 无处理器但具有 2 个最大单位的管道：1 个计算单位和 1 个缓冲区单位
+ 无处理器但具有 4 个最大单位的管道：1 个计算单位和 3 个缓冲区单位
+ 具有 grok 和 5 个最大单位的管道：2 个计算单位和 3 个缓冲区单位

默认情况下，管道使用加密 AWS 拥有的密钥 缓冲区数据。这些管道不需要任何额外的管道角色权限。

您也可以指定客户自主管理型密钥，并将以下 IAM 权限添加到管道角色：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KeyAccess",
            "Effect": "Allow",
            "Action": [
              "kms:Decrypt",
              "kms:GenerateDataKeyWithoutPlaintext"
            ],
            "Resource": "arn:aws:kms:us-east-1:111122223333:key/ASIAIOSFODNN7EXAMPLE"
        }
    ]
}
```

------

有关更多信息，请参阅《AWS Key Management Service 开发人员指南》**中的[客户托管密钥](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk)。

**注意**  
如果禁用永久缓冲功能，则管道将开始完全基于内存缓冲运行。

## 拆分
<a name="osis-features-splitting"></a>

您可以将 OpenSearch Ingestion 管道配置为将传入事件*拆*分为子管道，从而允许您对同一个传入事件执行不同类型的处理。

以下示例管道将传入事件拆分到两个子管道。每个子管道都使用自己的处理器来丰富和操作数据，然后将数据发送到不同的 OpenSearch 索引。

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  sink:
    - pipeline:
        name: "logs_enriched_one_pipeline"
    - pipeline:
        name: "logs_enriched_two_pipeline"

logs_enriched_one_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
        index: "enriched_one_logs"

logs_enriched_two_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_two_logs"
```

## 链接
<a name="osis-features-chaining"></a>

您可以将多个子管道*链接*在一起，以便分块执行数据处理和扩充。换句话说，你可以在一个子管道中使用一定的处理能力来丰富传入的事件，然后将其发送到另一个子管道，使用不同的处理器进行进一步丰富，最后将其发送到其 OpenSearch 接收器。

在以下示例中，`log_pipeline`子管道使用一组处理器丰富传入的日志事件，然后将该事件发送到名为的 OpenSearch 索引。`enriched_logs`管道将相同的事件发送到`log_advanced_pipeline`子管道，子管道对其进行处理并将其发送到名`enriched_advanced_logs`为的其他 OpenSearch 索引。

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  processor:
    ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_logs"
    - pipeline:
        name: "log_advanced_pipeline"

log_advanced_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_advanced_logs"
```

## 死信队列
<a name="osis-features-dlq"></a>

死信队列 (DLQs) 是管道未能写入接收器的事件的目的地。在 OpenSearch Ingestion 中，您必须指定一个具有相应写入权限的 Amazon S3 存储桶才能用作 DLQ。您可以向管道中的每个接收器添加 DLQ 配置。当管道遇到写入错误时，它会在配置的 S3 存储桶中创建 DLQ 对象。DLQ 对象作为一组失败事件存在于 JSON 文件中。

满足以下任意条件时，管道会向 DLQ 写入事件：
+  OpenSearch 接收器**的最大重试次**数已用完。 OpenSearch 要进行此设置，摄取至少需要 16。
+ 由于出现错误条件，事件遭接收器拒绝。

### 配置
<a name="osis-features-dlq-config"></a>

要为子管道配置死信队列，请在配置接收器目标时选择**启用 S3 DLQ**。随后为队列指定所需设置。有关更多信息，请参阅 Data Prepper DLQ 文档中的[配置](https://opensearch.org/docs/latest/data-prepper/pipelines/dlq/#configuration)。

写入此 S3 DLQ 的文件采用以下命名模式：

```
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
```

有关手动配置管道角色以允许访问 DLQ 写入到 S3 存储桶的说明，请参阅 [写入 Amazon S3 或死信队列的权限](pipeline-security-overview.md#pipeline-security-dlq)。

### 示例
<a name="osis-features-dlq-example"></a>

考虑以下示例 DLQ 文件：

```
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
```

以下是未能写入接收器并发送到 DLQ S3 存储桶进行进一步分析的数据示例：

```
Record_0	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index		  "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log		    "sample log"
timestamp	    "2023-04-14T10:36:01.070Z"

Record_1	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index               "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log                 "another sample log"
timestamp           "2023-04-14T10:36:01.071Z"
```

## 索引管理
<a name="osis-features-index-management"></a>

Amazon OpenSearch Ingestion 具有许多索引管理功能，包括以下功能。

### 创建索引
<a name="osis-features-index-management-create"></a>

您可以在管道接收器中指定索引名称， OpenSearch Ingestion 在置备管道时会创建索引。如果索引已经存在，管道会将其用于索引传入事件。如果您停止并重启管道，或者更新其 YAML 配置，如果这些索引尚不存在，则管道会尝试创建新的索引。管道始终不会删除索引。

以下示例为接收器在预调配管道时创建两个索引：

```
sink:
  - opensearch:
      index: apache_logs
  - opensearch:
      index: nginx_logs
```

### 生成索引名称和模式
<a name="osis-features-index-management-patterns"></a>

您可以使用传入事件字段的变量来生成动态索引名称。在接收器配置中，使用格式 `string${}` 表示字符串插值，并使用 JSON 指针从事件中提取字段。`index_type` 的选项是 `custom` 或 `management_disabled`。由于 OpenSearch 域名和 OpenSearch 无服务器集合`management_disabled`的`index_type`默认值为，因此可以将其保留为未设置。`custom`

例如，以下管道从传入事件中选择 `metadataType` 字段以生成索引名称。

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}"
```

以下配置继续每天或每小时生成一个新索引。

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd}"

pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
```

索引名称也可以是以日期-时间模式作为后缀的纯字符串，例如 `my-index-%{yyyy.MM.dd}`。当接收器向发送数据时 OpenSearch，它会将日期时间模式替换为 UTC 时间，并为每天创建一个新索引，例如。`my-index-2022.01.25`有关更多信息，请参阅[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html)课程。

该索引名称也可以是带有/不带日期-时间样式后缀的格式化字符串，例如 `my-${index}-name`。当接收器向发送数据时 OpenSearch，它会将该`"${index}"`部分替换为正在处理的事件中的值。如果格式为 `"${index1/index2/index3}"`，则使用事件中的值代替字段 `index1/index2/index3`。

### 正在生成文档 IDs
<a name="osis-features-index-management-ids"></a>

管道可以在为文档编制索引时生成文档 ID OpenSearch。它可以 IDs 从传入事件中的字段中推断出这些文档。

此示例使用传入事件的 `uuid` 字段生成文档 ID。

```
pipeline:
  ...
  sink:
    opensearch:
      index_type: custom
      index: "metadata-${metadataType}-%{yyyy.MM.dd}" 
      "document_id": "uuid"
```

在以下示例中，[添加条目](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/add-entries/)处理器合并传入事件的字段 `uuid` 和 `other_field` 以生成文档 ID。

该`create`操作可确保具有相同内容的文档 IDs 不会被覆盖。管道会丢弃重复的文档，而不会出现任何重试或 DLQ 事件。由于使用此操作的管道作者的目的在于避免更新现有文档，因而这一预期十分合理。

```
pipeline:
  ...
  processor:
   - add_entries:
      entries:
        - key: "my_doc_id_field"
          format: "${uuid}-${other_field}"
  sink:
    - opensearch:
       ...
       action: "create"
       document_id: "my_doc_id"
```

您可能需要将事件的文档 ID 设置为子对象中的字段。在以下示例中，s OpenSearch ink 插件使用子对象`info/id`生成文档 ID。

```
sink:
  - opensearch:
       ...
       document_id: info/id
```

鉴于以下事件，管道将生成一个 `_id` 字段设置为 `json001` 的文档：

```
{
   "fieldA":"arbitrary value",
   "info":{
      "id":"json001",
      "fieldA":"xyz",
      "fieldB":"def"
   }
}
```

### 正在生成路由 IDs
<a name="osis-features-index-management-routing-ids"></a>

你可以使用 sin OpenSearch k 插件中的`routing_field`选项将文档路由属性 (`_routing`) 的值设置为来自传入事件的值。

路由支持 JSON 指针语法，因此嵌套字段也可用，而不仅仅是顶级字段。

```
sink:
  - opensearch:
       ...
       routing_field: metadata/id
       document_id: id
```

鉴于以下事件，插件将生成一个 `_routing` 字段设置为 `abcd` 的文档：

```
{
   "id":"123",
   "metadata":{
      "id":"abcd",
      "fieldA":"valueA"
   },
   "fieldB":"valueB"
}
```

有关创建供管道在创建索引时使用的索引模板的说明，请参阅[索引模板](https://opensearch.org/docs/latest/im-plugin/index-templates/)。

## End-to-end 确认
<a name="osis-features-e2e"></a>

OpenSearch *Ingestion 使用end-to-end确认功能跟踪无状态管道中从源到接收器的传输，从而确保数据的持久性和可靠性。*

**注意**  
目前，只有 S [3 源](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/)插件支持 end-to-end确认。

通过 end-to-end确认，管道源插件会创建一个*确认集*来监视一批事件。当这些事件成功发送到其接收器时，它会收到肯定应答，或者当任何事件无法发送到其接收器时，它会收到否定应答。

如果管道组件出现故障或崩溃，或者源未能收到确认，则源会超时并采取必要的操作，例如重试或记录失败。如果管道配置了多个接收器或多个子管道，则只有在将事件发送到*所有*子管道中的*所有*接收器之后，才会发送事件级别确认。如果接收器配置了 DLQ，则 end-to-end确认还会跟踪写入 DLQ 的事件。

要启用 end-to-end确认，请展开 Amazon S3 源配置中的**其他选项**，然后选择**启用 end-to-end消息确认**。

## 源背压
<a name="osis-features-backpressure"></a>

当管道忙于处理数据，或者其接收器暂时关闭或数据采集速度缓慢时，管道可能会遇到背压。 OpenSearch 根据管道使用的源插件，Ingestion 有不同的处理背压的方法。

### HTTP 源
<a name="osis-features-backpressure-http"></a>

使用 [HTTP 源](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http-source/)插件的管道处理反向压力的方式会有所不同，具体取决于哪个管道组件处于拥塞状态：
+ **缓冲区** — 当缓冲区已满时，管道开始将错误代码为 408 的 HTTP 状态 `REQUEST_TIMEOUT` 返回到源端点。缓冲区被释放后，管道将重新开始处理 HTTP 事件。
+ **源线程** — 当所有 HTTP 源线程都忙于执行请求，并且未处理的请求队列大小已超过允许的最大请求数时，管道开始将错误代码为 429 的 HTTP 状态 `TOO_MANY_REQUESTS` 返回到源端点。当请求队列降至允许的最大队列大小以下时，管道将重新开始处理请求。

### OTel 来源
<a name="osis-features-backpressure-otel"></a>

当使用 OpenTelemetry 源（[OTel 日志](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-logs-source)、[OTel 指标](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-metrics-source/)和[OTel 跟踪](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-trace/)）的管道的缓冲区已满时，管道开始向源端点返回错误代码为 408 的 HTTP 状态`REQUEST_TIMEOUT`。缓冲区被释放后，管道将重新开始处理事件。

### S3 源
<a name="osis-features-backpressure-s3"></a>

当带有 [S3](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) 源的管道的缓冲区已满时，管道将停止处理 SQS 通知。缓冲区被释放后，管道将重新开始处理通知。

如果接收器关闭或无法采集数据，并且为源启用了 end-to-end确认功能，则管道将停止处理 SQS 通知，直到收到来自所有接收器的成功确认为止。