

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# OpenSearch Ingestion パイプラインを Amazon S3 で使用する
<a name="configure-client-s3"></a>

OpenSearch Ingestion では、Amazon S3 をソースまたは送信先として使用できます。Amazon S3 をソースとして使用する場合は、OpenSearch Ingestion パイプラインにデータを送信します。Amazon S3 を送信先として使用する場合、OpenSearch Ingestion パイプラインから 1 つまたは複数の S3 バケットにデータを書き込みます。

**Topics**
+ [ソースとしての Amazon S3](#s3-source)
+ [送信先としての Amazon S3](#s3-destination)
+ [ソースとしての Amazon S3 クロスアカウント](#fdsf)

## ソースとしての Amazon S3
<a name="s3-source"></a>

Amazon S3 をデータ処理のソースとして使用する方法には、*S3-SQS 処理*と*スケジュールされたスキャン*の 2 つがあります。

S3 にファイルが書き込まれた後にほぼリアルタイムでファイルをスキャンする必要がある場合は、S3-SQS 処理を使用します。バケット内でオブジェクトが保存または変更されるたびにイベントを発生させるように Amazon S3 バケットを設定できます。S3 バケット内のデータをバッチ処理するには、1 回限りのスケジュールされたスキャンまたは反復的なスケジュールされたスキャンを使用します。

**Topics**
+ [前提条件](#s3-prereqs)
+ [ステップ 1: パイプラインロールを設定する](#s3-pipeline-role)
+ [ステップ 2: パイプラインを作成する](#s3-pipeline)

### 前提条件
<a name="s3-prereqs"></a>

スケジュールされたスキャンと S3-SQS 処理の両方の OpenSearch Ingestion パイプラインのソースとして Amazon S3 を使用するには、最初に[SS3 バケットを作成](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)します。

**注記**  
OpenSearch Ingestion パイプラインのソースとして使用される S3 バケットが異なる にある場合は AWS アカウント、バケットでクロスアカウント読み取りアクセス許可を有効にする必要もあります。これにより、パイプラインはデータを読み取って処理できるようになります。クロスアカウントアクセス許可を有効にするには、「*Amazon S3 User Guide*」の「[Bucket owner granting cross-account bucket permissions](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html)」を参照してください。  
S3 バケットが複数のアカウントにある場合は、`bucket_owners` マップを使用します。例については、OpenSearch ドキュメントの「[クロスアカウント S3 アクセス](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#cross-account-s3-access)」を参照してください。

S3-SQS 処理をセットアップするには、以下のステップも実行する必要があります。

1. [Amazon SQS キューを作成します](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html)。

1. SQS キューを送信先とする S3 バケットで[イベント通知を有効にします](https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html)。

### ステップ 1: パイプラインロールを設定する
<a name="s3-pipeline-role"></a>

パイプラインにデータを*プッシュ*する他のソースプラグインとは異なり、[S3 ソースプラグイン](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/)は、パイプラインがソースからデータを*取得する*読み取りベースのアーキテクチャを使用しています。

したがって、パイプラインが S3 から読み取るには、パイプラインの S3 ソース設定で、S3 バケットと Amazon SQS キューの両方にアクセスできるロールを指定する必要があります。キューからデータを読み取るために、パイプラインはこのロールを引き受けます。

**注記**  
S3 ソース設定内で指定するロールは、[パイプラインロール]()である必要があります。したがって、パイプラインロールには 2 つの異なるアクセス許可ポリシーを含める必要があります。1 つはシンクに書き込むポリシーで、もう 1 つは S3 ソースから取得するポリシーです。すべてのパイプラインコンポーネントで同じ `sts_role_arn` を使用する必要があります。

次のサンプルポリシーは、S3 をソースとして使用するために必要なアクセス許可を示しています。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action":[
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:GetObject"
       ],
      "Resource": "arn:aws:s3:::{{amzn-s3-demo-bucket}}/*"
    },
    {
       "Effect":"Allow",
       "Action":"s3:ListAllMyBuckets",
       "Resource":"arn:aws:s3:::*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:{{us-east-1}}:{{111122223333}}:{{MyS3EventSqsQueue}}"
    }
  ]
}
```

------

 S3 ソースプラグイン設定の `sts_role_arn` オプションで指定した IAM ロールに、これらのアクセス許可をアタッチする必要があります。

```
version: "2"
source:
  s3:
    ...
    aws:
      ...
processor:
  ...
sink:
  - opensearch:
      ...
```

### ステップ 2: パイプラインを作成する
<a name="s3-pipeline"></a>

アクセス許可を設定したら、Amazon S3 のユースケースに応じて OpenSearch Ingestion パイプラインを設定できます。

#### S3-SQS 処理
<a name="s3-sqs-processing"></a>

S3-SQS 処理をセットアップするには、パイプラインを設定して S3 をソースとして指定し、Amazon SQS 通知を設定します。

```
version: "2"
s3-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline: null
      sqs:
        queue_url: "https://sqs.{{us-east-1}}amazonaws.com/{{account-id}}/{{ingestion-queue}}"
      compression: "none"
      aws:
        region: "{{region}}"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-{{domain-endpoint}}.{{us-east-1}}es.amazonaws.com"]
      index: "{{index-name}}"
      aws:
        region: "{{region}}"
```

Amazon S3 で小さなファイルを処理するときに CPU 使用率が低い場合は、`workers` オプションの値を変更してスループットを高めることを検討してください。詳細については、「[S3 plugin configuration options](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#configuration)」を参照してください。

#### スケジュールされたスキャン
<a name="s3-scheduled-scan"></a>

スケジュールされたスキャンをセットアップするには、すべての S3 バケットまたはバケットレベルに適用されるスキャンレベルで、パイプラインにスケジュールを設定します。バケットレベルのスケジュールまたはスキャン間隔の設定は、常にスキャンレベルの設定を上書きします。

スケジュールれたスキャンは、データ移行に理想的な *1 回限りのスキャン*、またはバッチ処理に理想的な*反復的スキャン*で設定できます。

Amazon S3 から読み取るようにパイプラインを設定するには、事前設定された Amazon S3 ブループリントを使用します。パイプライン設定の `scan` の部分は、スケジュールのニーズに合わせて編集できます。詳細については、「[ブループリントの使用](pipeline-blueprint.md)」を参照してください。

**1 回限りのスキャン**

1 回限りのスケジュールされたスキャンは 1 回実行されます。パイプライン設定で、`start_time` および `end_time` を使用して、バケット内のオブジェクトをスキャンするタイミングを指定できます。`range` を使用して、バケット内のオブジェクトをスキャンする現在の時刻に相対的な間隔を指定できます。

例えば、範囲を `PT4H` に設定すると、過去 4 時間に作成されたすべてのファイルがスキャンされます。1 回限りのスキャンを 2 回目に実行するように設定するには、パイプラインを停止して再起動する必要があります。範囲を設定しない場合、開始時間と終了時間も更新する必要があります。

次の設定では、すべてのバケット、およびそれらのバケット内のすべてのオブジェクトを 1 回だけスキャンするようにセットアップされます。

```
version: "2"
log-pipeline:
  source:
    s3:
      codec:
        csv:
      compression: "none"
      aws:
        region: "{{region}}"
      acknowledgments: true
      scan:
        buckets:
          - bucket:
              name: {{my-bucket}}
              filter:
                include_prefix:
                  - {{Objects1}}/
                exclude_suffix:
                  - .jpeg
                  - .png
          - bucket:
              name: {{my-bucket-2}}
              key_prefix:
                include:
                  - {{Objects2}}/
                exclude_suffix:
                  - .jpeg
                  - .png
      delete_s3_objects_on_read: false
  processor:
    - date:
        destination: "@timestamp"
        from_time_received: true
  sink:
    - opensearch:
        hosts: ["https://{{search-domain-endpoint}}.{{us-east-1}}es.amazonaws.com"]
        index: "{{index-name}}"
        aws:
          region: "{{region}}"
        dlq:
          s3:
            bucket: "{{dlq-bucket}}"
            region: "{{us-east-1}}"
```

次の設定では、指定した時間ウィンドウですべてのバケットを 1 回だけスキャンするようにセットアップされます。したがって、S3 は、作成時間がこのウィンドウ内に収まるオブジェクトのみを処理します。

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        name: {{my-bucket-1}}
        filter:
          include:
            - {{Objects1}}/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: {{my-bucket-2}}
        filter:
          include:
            - {{Objects2}}/
          exclude_suffix:
            - .jpeg
            - .png
```

次の設定では、スキャンレベルとバケットレベルの両方で 1 回限りのスキャンがセットアップされます。バケットレベルの開始時間と終了時間は、スキャンレベルの開始時間と終了時間よりも優先されます。

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: {{my-bucket-1}}
        filter:
          include:
            - {{Objects1}}/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: {{my-bucket-2}}
        filter:
          include:
            - {{Objects2}}/
          exclude_suffix:
            - .jpeg
            - .png
```

パイプラインを停止すると、停止前にパイプラインによってスキャンされたオブジェクトの既存の参照が削除されます。1 つのスキャンパイプラインが停止すると、既にスキャンされている場合でも、すべてのオブジェクトが起動後に再スキャンされます。1 つのスキャンパイプラインを停止する必要がある場合は、パイプラインを再開する前に時間ウィンドウを変更することをお勧めします。

開始時刻と終了時刻でオブジェクトをフィルタリングする必要がある場合は、パイプラインの停止と開始が唯一のオプションです。開始時刻と終了時刻でフィルタリングする必要がない場合は、名前でオブジェクトをフィルタリングできます。名前で読み取ると、パイプラインを停止して開始する必要はありません。これを行うには、`include_prefix` と `exclude_suffix` を使用します。

**反復的スキャン**

反復的なスケジュールされたスキャンでは、指定した S3 バケットのスキャンがスケジュールされた間隔で定期的に実行されます。個別のバケットレベルの設定はサポートされていないため、これらの間隔はスキャンレベルでのみ設定できます。

パイプライン設定で、`interval` は反復的スキャンの頻度を 30 秒から 365 日の間で指定します。これらのスキャンのうちの最初のスキャンは、パイプラインの作成時に実行されます。`count` はスキャンインスタンスの合計数を定義します。

次の設定では、12 時間のスキャン間隔での反復的スキャンがセットアップされます。

```
scan:
  scheduling:
    interval: PT12H
    count: 4
  buckets:
    - bucket:
        name: {{my-bucket-1}}
        filter:
          include:
            - {{Objects1}}/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: {{my-bucket-2}}
        filter:
          include:
            - {{Objects2}}/
          exclude_suffix:
            - .jpeg
            - .png
```

## 送信先としての Amazon S3
<a name="s3-destination"></a>

OpenSearch Ingestion パイプラインから S3 バケットにデータを書き込むには、事前設定済み S3 ブループリントを使用して [S3 シンク](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/)を含むパイプラインを作成します。このパイプラインは、選択したデータを OpenSearch シンクにルーティングし、同時にすべてのデータを S3 にアーカイブするために送信します。詳細については、「[ブループリントの使用](pipeline-blueprint.md)」を参照してください。

S3 シンクを作成するとき、さまざまな[シンクコーデック](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/#codec)から優先フォーマットを指定できます。例えば、列形式でデータを書き込む場合は、Parquet コーデックまたは Avro コーデックを選択します。行ベースの形式の場合は、JSON または NDJSON を選択します。指定したスキーマで S3 にデータを書き込むには、[Avro 形式](https://avro.apache.org/docs/current/specification/#schema-declaration)を使用してシンクコーデック内にインラインスキーマを定義することもできます。

次の例では、S3 シンクのインラインスキーマが定義されます。

```
- s3:
  codec:
    parquet:
      schema: >
        {
           "type" : "record",
           "namespace" : "org.vpcFlowLog.examples",
           "name" : "VpcFlowLog",
           "fields" : [
             { "name" : "version", "type" : "string"},
             { "name" : "srcport", "type": "int"},
             { "name" : "dstport", "type": "int"},
             { "name" : "start", "type": "int"},
             { "name" : "end", "type": "int"},
             { "name" : "protocol", "type": "int"},
             { "name" : "packets", "type": "int"},
             { "name" : "bytes", "type": "int"},
             { "name" : "action", "type": "string"},
             { "name" : "logStatus", "type" : "string"}
           ]
         }
```

このスキーマを定義するとき、パイプラインがシンクに配信するさまざまなタイプのイベントに存在する可能性のあるすべてのキーのスーパーセットを指定します。

例えば、あるイベントでキーが欠落している可能性がある場合は、そのキーに `null` 値を付けてスキーマに追加します。null 値を宣言すると、(キーを持つイベントと持たないイベントがある) 不均一なデータをスキーマで処理できます。受信イベントにこれらのキーがある場合、その値はシンクに書き込まれます。

このスキーマ定義は、定義済みのキーのみをシンクに送信し、未定義のキーを受信イベントから削除するフィルターとして機能します。

`include_keys` と `exclude_keys` をシンクで使用して、他のシンクにルーティングされるデータをフィルタリングすることもできます。この 2 つのフィルターは相互に排他的であるため、スキーマでは一度に 1 つしか使用できません。また、ユーザー定義のスキーマと一緒に使用することもできません。

このようなフィルターでパイプラインを作成するには、事前設定済みのシンクフィルターのブループリントを使用します。詳細については、「[ブループリントの使用](pipeline-blueprint.md)」を参照してください。

## ソースとしての Amazon S3 クロスアカウント
<a name="fdsf"></a>

OpenSearch Ingestion パイプラインがソースとして別のアカウントの S3 バケットにアクセスできるように、Amazon S3 のアカウント間でアクセスを許可できます。クロスアカウントアクセスを有効にするには、「*Amazon S3 User Guide*」の「[Bucket owner granting cross-account bucket permissions](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html)」を参照してください。アクセスを付与したら、パイプラインロールに必要なアクセス許可があることを確認します。

次に、`bucket_owners` を使用して パイプラインを作成し、Amazon S3 バケットへのクロスアカウントアクセスをソースとして有効にできます。

```
s3-pipeline:
 source:
  s3:
   notification_type: "sqs"
   codec:
    csv:
     delimiter: ","
     quote_character: "\""
     detect_header: True
   sqs:
    queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue"
   bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999
   compression: "gzip"
```