

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# アプリケーション出力の設定
<a name="how-it-works-output"></a>



アプリケーションコードでは、SQL ステートメントの出力を 1 つ以上のアプリケーション内ストリームに書き込みます。必要に応じて、出力設定をアプリケーションに追加できます。 は、アプリケーション内ストリームに書き込まれたすべてを Amazon Kinesis データストリーム、Firehose 配信ストリーム、 AWS Lambda 関数などの外部宛先に保持します。

アプリケーション出力の永続化に使用できる外部宛先の数には制限があります。詳細については、「[制限](limits.md)」を参照してください。

**注記**  
エラーを精査するためにアプリケーション内エラーストリームのデータを永続化する外部宛先は、1 つにすることをお勧めします。



これらの出力設定ごとに、以下を指定します。
+ **アプリケーション内ストリーム名** – 外部宛先で永続化するストリームです。

  Kinesis Data Analytics は、出力設定で指定されたアプリケーション内ストリームを検索します。(ストリーム名では大文字と小文字が区別され、正確に一致する必要があります）。アプリケーションコードでこのアプリケーション内ストリームが作成されていることを確認します。
+ **外部宛先** – データを Kinesis データストリーム、Firehose データ配信ストリーム、または Lambda 関数に永続化できます。ストリームまたは関数の Amazon リソースネーム (ARN) を指定します。また、Amazon Kinesis Analytics がユーザーに代わってストリームまたは Lambda 関数に書き込むために引き受けることができる IAM ロールも指定します。外部宛先に書き込むときに Kinesis Data Analytics が使用するレコード形式 (JSON, CSV) も記述します。

Kinesis Data Analytics でストリーミングまたは Lambda 宛先に書き込むことができない場合、サービスは無限に試行を続けます。これはバックプレッシャーを生み出し、アプリケーションに遅延が生じます。この問題が解決しない場合、アプリケーションは最終的に新しいデータの処理を停止します。[Kinesis Data Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html) をモニタリングし、障害のアラームを設定できます。メトリクスとアラームの詳細については、[Amazon CloudWatch メトリクスを使用する](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html)と[Amazon CloudWatch アラームを作成する](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)を参照してください。

 AWS マネジメントコンソールを使用してアプリケーション出力を設定できます。コンソールは API コールを実行して設定を保存します。

## を使用した出力の作成 AWS CLI
<a name="how-it-works-output-cli"></a>

このセクションでは、`CreateApplication` または `AddApplicationOutput` オペレーションのリクエストボディの `Outputs` セクションを作成する方法について説明します。

### Kinesis ストリーム出力を作成する
<a name="how-it-works-output-cli-streams"></a>

次の JSON フラグメントは、Amazon Kinesis データストリームの宛先を作成する `CreateApplication` リクエストボディの `Outputs` セクションを示しています。

```
"Outputs": [
   {
       "DestinationSchema": {
           "RecordFormatType": "string"
       },
       "KinesisStreamsOutput": {
           "ResourceARN": "string",
           "RoleARN": "string"
       },
       "Name": "string"
   }
 
]
```

### Firehose 配信ストリーム出力を作成する
<a name="how-it-works-output-cli-firehose"></a>

次の JSON フラグメントは、Amazon Data Firehose 配信ストリームの宛先を作成する `CreateApplication` リクエスト本文の `Outputs` セクションを示しています。

```
"Outputs": [
   {
       "DestinationSchema": {
           "RecordFormatType": "string"
       },
       "KinesisFirehoseOutput": {
           "ResourceARN": "string",
           "RoleARN": "string"
       },
       "Name": "string"
   }
]
```

### Lambda 関数出力を作成する
<a name="how-it-works-output-cli-lambda"></a>

次の JSON フラグメントは、 AWS Lambda 関数の送信先を作成するための`CreateApplication`リクエスト本文の `Outputs`セクションを示しています。

```
"Outputs": [
   {
       "DestinationSchema": {
           "RecordFormatType": "string"
       },
       "LambdaOutput": {
           "ResourceARN": "string",
           "RoleARN": "string"
       },
       "Name": "string"
   }
]
```

# 出力としての Lambda 関数の使用
<a name="how-it-works-output-lambda"></a>

を送信先 AWS Lambda として使用すると、最終送信先に送信する前に SQL 結果の後処理をより簡単に実行できます。一般的な後処理タスクには次のものがあります。
+ 複数の行を 1 つのレコードに集約する
+ 現在の結果と過去の結果を組み合わせて、遅れて届くデータに対処する
+ 情報のタイプに基づいて異なる送信先に配信する
+ レコード形式の変換 (Protobuf への変換など)
+ 文字列操作または変換
+ 分析処理後のデータの強化
+ 地理空間ユースケースのカスタム処理
+ データ暗号化

Lambda 関数は、次のようなさまざまな AWS サービスやその他の送信先に分析情報を配信できます。
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)
+ カスタム API
+ [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)
+ [Amazon Aurora](http://aurora.apache.org/)
+ [Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/)
+ [Amazon Simple Notification Service (Amazon SNS)](https://docs.aws.amazon.com/sns/latest/dg/)
+ [Amazon Simple Queue Service](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/) (Amazon SQS)
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/)

Lambda アプリケーションの作成の詳細については、「[AWS Lambdaのご利用開始にあたって](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)」を参照してください。

**Topics**
+ [出力許可としての Lambda](#how-it-works-output-lambda-perms)
+ [出力メトリクスとしての Lambda](#how-it-works-output-lambda-metrics)
+ [出力イベント入力データモデルおよびレコードレスポンスモデルとしての Lambda](#how-it-works-output-lambda-model)
+ [Lambda 出力呼び出しの頻度](#how-it-works-output-lambda-frequency)
+ [出力として使用するための Lambda 関数の追加](#how-it-works-output-lambda-procedure)
+ [出力エラーとしてよく見られる Lambda](#how-it-works-output-lambda-troubleshooting)
+ [アプリケーションの送信先の Lambda 関数の作成](how-it-works-output-lambda-functions.md)

## 出力許可としての Lambda
<a name="how-it-works-output-lambda-perms"></a>

出力として Lambda を使用するには、アプリケーションの Lambda 出力 IAM ロールに次のアクセス許可ポリシーが必要です。

```
{
   "Sid": "UseLambdaFunction",
   "Effect": "Allow",
   "Action": [
       "lambda:InvokeFunction",
       "lambda:GetFunctionConfiguration"
   ],
   "Resource": "FunctionARN"
}
```

## 出力メトリクスとしての Lambda
<a name="how-it-works-output-lambda-metrics"></a>

Amazon CloudWatch を使用して、送信バイト数、成功および失敗などをモニタリングします。出力として Lambda を使用する Kinesis Data Analytics によって出力される CloudWatch メトリクスについては、「[Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。

## 出力イベント入力データモデルおよびレコードレスポンスモデルとしての Lambda
<a name="how-it-works-output-lambda-model"></a>

Kinesis Data Analytics 出力レコードを送信する場合、Lambda 関数は、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

### イベント入力データモデル
<a name="how-it-works-output-lambda-model-request"></a>

Kinesis Data Analytics は、次のリクエストモデルの出力関数として、アプリケーションから Lambda に出力レコードを継続的に送信します。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、出力要件 (最終的な送信先に送信する前のデータ変換など) を実行します。


| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| invocationId | Lambda 呼び出し ID (ランダム GUID)。 | 
| applicationArn | Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)。 | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| recordId | レコード ID (ランダム GUID) | 
| lambdaDeliveryRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| data | Base64 でエンコードされた出力レコードのペイロード | 
| retryHint | 配信再試行回数 | 

**注記**  
`retryHint` は配信失敗ごとに増加する値です。この値は永続的に保持されず、アプリケーションが中断された場合にリセットされます。

### レコードレスポンスモデル
<a name="how-it-works-output-lambda-model-response"></a>

出力関数として `Ok` に (レコード ID と共に) 送信される各レコードは、 または `DeliveryFailed` のどちらかで確認される必要があり、次のパラメータを含める必要があります。それ以外の場合、Kinesis Data Analytics はそれらを配信失敗として扱います。


| フィールド | 説明 | 
| --- | --- | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| recordId | レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。元のレコードの ID と確認されたレコードの ID との不一致は、配信失敗として扱われます。 | 
| result | レコード配信のステータス。以下の値を指定できます。[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 

## Lambda 出力呼び出しの頻度
<a name="how-it-works-output-lambda-frequency"></a>

Kinesis Data Analytics アプリケーションは、出力レコードをバッファして AWS Lambda 宛先関数を頻繁に呼び出します。
+ タンブリングウィンドウとしてデータ分析アプリケーション内の送信先アプリケーション内ストリームにレコードが出力された場合、送信 AWS Lambda 先関数はタンブリングウィンドウトリガーごとに呼び出されます。例えば、タンブリングウィンドウを 60 秒に設定してレコードを宛先のアプリケーション内ストリームに出力すると、Lambda 関数は、60 秒ごとに 1 回呼び出されます。
+ アプリケーション内で連続するクエリまたはスライディングウィンドウとしてレコードがアプリケーション内ストリームに出力される場合、Lambda 宛先関数は約 1 秒に 1 回呼び出されます。

**注記**  
[Lambda 関数あたりの呼び出しリクエストのペイロードサイズの制限](https://docs.aws.amazon.com/lambda/latest/dg/limits.html)が適用されます。これらの制限を超えると、出力レコードが分割され、複数の Lambda 関数呼び出しに分けて送信されます。

## 出力として使用するための Lambda 関数の追加
<a name="how-it-works-output-lambda-procedure"></a>

次の手順では、Kinesis Data Analytics アプリケーションの出力として Lambda 関数を追加する方法を示しています。

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) で Managed Service for Apache Flink コンソールを開きます。

1. リストからアプリケーションを選択し、[**Application details**] を選択します。

1. [**宛先**] セクションで、[**Connect new destination**] を選択します。

1. [**宛先**] 項目に、[**AWS Lambda 関数**] を選択します。

1. [** AWS Lambdaにレコードを配信**] セクションで、既存の Lambda 関数とバージョンを選択するか、[**新規作成**] を選択します。

1. 新しい Lambda 関数を作成する場合は、次の操作を行います。

   1. 提供されているいずれかのテンプレートのいずれかを選択します。詳細については、[アプリケーションの送信先の Lambda 関数の作成](how-it-works-output-lambda-functions.md)。

   1. [**関数の作成**] ページが新しいブラウザタブで開きます。[**Name (名前)**] ボックスで、関数にわかりやすい名前を付けます (例: **myLambdaFunction**)。

   1. アプリケーションの後処理機能のテンプレートを更新します。Lambda 関数作成の詳細については、AWS Lambda 開発者ガイドの[入門ガイド](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)を参照してください。

   1. Kinesis Data Analytics コンソールの [**Lambda 関数**] リストで、先ほど作成した Lambda 関数を選択します。Lambda 関数のバージョンは **[\$1最新]** を選択します。

1. [**In-application stream**] セクションで、[**Choose an existing in-application stream**] を選択します。[**In-application stream name**] に、アプリケーションの出力ストリームを選択します。選択した出力ストリームからの結果は、Lambda 出力関数に送信されます。

1. 残りのフォームはデフォルト値のままにして、[**Save and continue**] を選択します。

アプリケーションはアプリケーション内ストリームから Lambda 関数にレコードを送信するようになりました。Amazon CloudWatch コンソールのデフォルトテンプレートの結果を確認できます。`AWS/KinesisAnalytics/LambdaDelivery.OkRecords` メトリクスをモニタリングして、Lambda 関数に配信されるレコードの数を確認します。

## 出力エラーとしてよく見られる Lambda
<a name="how-it-works-output-lambda-troubleshooting"></a>

以下は、Lambda 関数への配信が失敗する可能性のある一般的な理由です。
+ Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。
+ レスポンスにレコード ID、またはステータスフィールドのいずれかが欠落しています。
+ Lambda 関数のタイムアウトが Lambda 関数内のビジネスロジックを達成するのに十分ではありません。
+ Lambda 関数内のビジネスロジックは、すべてのエラーをキャッチしないため、処理されない例外のためにタイムアウトとバックプレッシャーが生じます。これらのメッセージは、「ポイズンピル」と呼ばれることが少なくありません。

データ配信が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から情報を得ることができます。
+ Kinesis Data Analytics アプリケーションの Output CloudWatch メトリクスとしてのLambda : 統計の中でも、特に成功と失敗の数を示します。詳細については、「[Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。
+ AWS Lambda 関数 CloudWatch メトリクスとログ。

# アプリケーションの送信先の Lambda 関数の作成
<a name="how-it-works-output-lambda-functions"></a>

Kinesis Data Analytics アプリケーションは、 AWS Lambda 関数を出力として使用できます。Kinesis Data Analytics はアプリケーションの送信先として使用する Lambda 関数を作成するテンプレートを提供します。これらのテンプレートは、アプリケーションからの後処理出力の開始点として使用します。

**Topics**
+ [Node.js での Lambda 関数の送信先の作成](#how-it-works-lambda-dest-nodejs)
+ [Python での Lambda 関数の送信先の作成](#how-it-works-lambda-dest-python)
+ [Java での Lambda 関数の送信先の作成](#how-it-works-lambda-dest-java)
+ [.NET での Lambda 関数の送信先の作成](#how-it-works-lambda-net)

## Node.js での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-dest-nodejs"></a>

Lambda 関数の宛先を Node.js で作成するための次のテンプレートがコンソールで利用できます。


| 出力ブループリントとしての Lambda | 言語とバージョン | 説明 | 
| --- | --- | --- | 
| kinesis-analytics-output | Node.js 12.x | Kinesis Data Analytics アプリケーションからカスタム送信先に出力レコードを配信します。 | 

## Python での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-dest-python"></a>

Lambda 関数の宛先を Python で作成するための次のテンプレートがコンソールで利用できます。


| 出力ブループリントとしての Lambda | 言語とバージョン | 説明 | 
| --- | --- | --- | 
| kinesis-analytics-output-sns | Python 2.7 | Kinesis Data Analytics アプリケーションから Amazon SNS に出力レコードを配信します。 | 
| kinesis-analytics-output-ddb | Python 2.7 | Kinesis Data Analytics アプリケーションから Amazon DynamoDB に出力レコードを配信します。 | 

## Java での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-dest-java"></a>

Java で Lambda 関数の送信先を作成するには、[Java イベント](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events)クラスを使用します。

次のコードは、Java を使用したサンプルの送信先 Lambda 関数を示しています。

```
public class LambdaFunctionHandler
        implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> {

    @Override
    public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event,
            Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>();
        KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint);
            // Add logic here to transform and send the record to final destination of your choice.
            response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok));
        });
        return response;
    }

}
```

## .NET での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-net"></a>

.NET で Lambda 関数の送信先を作成するには、[.NET イベント](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)クラスを使用します。

次のコードは、C\$1 を使用したサンプルの送信先 Lambda 関数を示しています。

```
public class Function
    {
        public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsOutputDeliveryResponse
            {
                Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add logic here to send to the record to final destination of your choice.

                var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsOutputDeliveryResponse.OK
                };
                response.Records.Add(deliveredRecord);
            }
            return response;
        }
    }
```

事前処理および宛先の Lambda 関数を .NET で作成する場合の詳細については、「[https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)」を参照してください。

# アプリケーション出力を外部宛先で永続化する配信モデル
<a name="failover-checkpoint"></a>

Amazon Kinesis Data Analytics は、設定された宛先へのアプリケーション出力に「1 回以上」の配信モデルを使用します。アプリケーションの実行時に、Kinesis Data Analytics は内部チェックポイントを取ります。このチェックポイントは、出力レコードが宛先に配信されデータ損失がない場合のポイントを時間で示すものです。サービスでは必要に応じてチェックポイントを使用し、アプリケーション出力が少なくとも 1 回、設定された宛先に配信されたことを確認します。

通常の状況では、アプリケーションは受信データを継続的に処理します。Kinesis Data Analytics は、Kinesis データストリームや Firehose 配信ストリームなど、設定された宛先に出力を書き込みます。ただし、次の例に示すように、アプリケーションはときどき中断される可能性があります。
+ アプリケーションを停止して、後で再起動する場合。
+ 設定された宛先に Kinesis Data Analytics がアプリケーション出力を書き込むために必要な IAM ロールを削除した場合。IAM ロールがない場合、Kinesis Data Analytics にはユーザーの代わりに外部宛先に書き込むアクセス権限がありません。
+ ネットワークの停止またはその他の内部サービスの障害により、一時的にアプリケーションが実行を停止した場合。

アプリケーションが再起動すると、Kinesis Data Analytics は、障害の発生時またはその前の時点からの出力の処理および書き込みを続けます。これにより、設定された宛先にアプリケーション出力が確実に配信されます。

同じアプリケーション内ストリームから複数の宛先を設定したとします。アプリケーションが障害から復旧したら、Kinesis Data Analytics は、設定された宛先への永続的出力を、最も低速な宛先に配信された最後のレコードから再開します。これにより、同じ出力レコードが別の宛先に複数回配信される場合があります。この場合は、外部で、宛先での重複を処理する必要があります。