

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# パーティショニングキーを理解する
<a name="dynamic-partitioning-partitioning-keys"></a>

動的パーティショニングでは、パーティショニングキーに基づいてデータをパーティショニングすることで、ストリーミング S3 データからターゲットデータセットを作成します。パーティショニングキーを使用すると、特定の値に基づいてストリーミングデータをフィルタリングできます。たとえば、顧客 ID と国に基づいてデータをフィルタリングする必要がある場合は、1 つのパーティショニングキーとして `customer_id` のデータフィールドを、また別のパーティショニングキーとして `country` のデータフィールドを指定できます。次に、(サポートされている形式を使用して) 式を指定し、動的にパーティショニングされたデータレコードの配信先となる S3 バケットプレフィックスを定義します。

パーティショニングキーは、次の方法で作成できます。
+ **Inline parsing** – このメソッドは、JSON 形式のデータレコードからパーティショニングするためのキーの抽出で、Firehose 組み込みサポートメカニズムである [jq パーサー](https://stedolan.github.io/jq/)を使用します。現在、`jq 1.6` バージョンのみをサポートしています。
+ **AWS Lambda 関数** – このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します。

**重要**  
動的パーティショニングを有効にする場合、データをパーティショニングするには、これらのメソッドの少なくとも 1 つを設定する必要があります。これらのメソッドのいずれかを設定して、パーティショニングキーを指定することも、両方を同時に指定することもできます。

## インライン解析でパーティショニングキーを作成する
<a name="dynamic-partitioning-inline-parsing"></a>

ストリーミングデータの動的パーティショニングメソッドとしてインライン解析を設定するには、パーティショニングキーとして使用するデータレコードパラメータを選択し、それぞれの指定したパーティショニングキーの値を提供する必要があります。

次のサンプルデータレコードは、インライン解析を使用してパーティションキーを定義する方法を示しています。データは Base64 形式でエンコードされる必要があることに留意してください。[CLI の例](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples)を参照することもできます。

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

たとえば、`customer_id` パラメータまたは `event_timestamp` パラメータに基づいてデータをパーティショニングすることを選択できます。これは、レコードが配信される S3 プレフィックスの決定に使用される各レコードの `customer_id` パラメータまたは `event_timestamp` パラメータの値が必要であることを意味します。また、式 `.type.device` を用いた `device` のように、ネストされたパラメータを選択することもできます。動的パーティショニングロジックは、複数のパラメータに依存する可能性があります。

パーティショニングキーのデータパラメータを選択した後、各パラメータを有効な jq 式にマップします。次のテーブルに、jq 式へのパラメータのマッピングを示します。


| パラメータ | ip 式 | 
| --- | --- | 
| customer\$1id | .customer\$1id | 
| device |  .type.device  | 
| year |  .event\$1timestamp\$1 strftime("%Y")  | 
| month |  .event\$1timestamp\$1 strftime("%m")  | 
| day |  .event\$1timestamp\$1 strftime("%d")  | 
| hour |  .event\$1timestamp\$1 strftime("%H")  | 

実行時に、Firehose は上の右の列を使用して、各レコードのデータに基づいてパラメータを評価します。

## AWS Lambda 関数を使用してパーティショニングキーを作成する
<a name="dynamic-partitioning-with-lambda"></a>

圧縮または暗号化されたデータレコード、または JSON 以外のファイル形式のデータの場合、統合 AWS Lambda 関数を独自のカスタムコードとともに使用してレコードを解凍、復号、または変換し、パーティショニングに必要なデータフィールドを抽出して返すことができます。これは、Firehose で現在利用できる既存の変換 Lambda 関数の拡張です。同じ Lambda 関数を使用して、動的パーティショニングに使用できるデータフィールドを変換、解析、および返すことができます。

入力から出力までのすべての読み取りレコードを再生し、レコードからパーティショニングキーを抽出する Python の Lambda 関数を処理する Firehose ストリームの例を次に示します。

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

入力から出力までのすべての読み取りレコードを再生し、レコードからパーティショニングキーを抽出する Go の Lambda 関数を処理する Firehose ストリームの例を次に示します。

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```