

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink Studio への移行例
<a name="migrating-to-kda-studio-overview"></a>

慎重な検討の結果、Amazon Kinesis Data Analytics for SQL アプリケーションのサポートは終了することになりました。お客様が計画的に Amazon Kinesis Data Analytics for SQL アプリケーションから移行できるように、完全なサポート終了までに 15 か月間の猶予を設け、その間に段階的に終了していく予定です。これらは、2**025 年 9 月 1 日、2**025 **年 10 月 15 日、および 202****6 年 1 月 27 **日の重要な日付です。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

1. **2025 年 10 月 15** 日以降、新しい Amazon Kinesis Data Analytics for SQL アプリケーションを作成することはできなくなります。

1. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL アプリケーションのサポートは終了します。詳細については[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)を参照してください。

[Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html) を使用することをお勧めします。このサービスは、使いやすさと高度な分析機能を兼ね備え、ストリーム処理アプリケーションを数分で構築できます。

このセクションでは、Amazon Kinesis Data Analytics for SQL アプリケーションのワークロードを Managed Service for Apache Flink に移行するために役立つコードとアーキテクチャの例を示します。

詳細については、[AWS ブログの記事「Migrate from Amazon Kinesis Data Analytics for SQL Applications to Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/)」も参照してください。

## Managed Service for Apache Flink Studio での Kinesis Data Analytics for SQL クエリの複製
<a name="examples-migrating-to-kda-studio"></a>

Managed Service for Apache Flink Studio または Managed Service for Apache Flink にワークロードを移行するために、このセクションでは一般的なユースケースで使用できるクエリ変換について説明します。

これらの例を参照する前に、「[Managed Service for Apache Flink で Studio ノートブックを使用する](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html)」を確認することをお勧めします。

### Managed Service for Apache Flink Studio での Kinesis Data Analytics for SQL クエリの再作成
<a name="examples-recreating-queries"></a>

ここでは、一般的な SQL ベースの Kinesis Data Analytics アプリケーションクエリを Managed Service for Apache Flink Studio に変換する方法を示します。

#### マルチステップアプリケーション
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### DateTime 値の変換
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### シンプルなアラート
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### 調整されたアラート
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### 例: クエリから部分的な結果を集約する
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### 文字列値の変換
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 正規表現を使用した部分文字列の置き換え
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 正規表現ログ解析
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### DateTime 値の変換
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### ウィンドウと集約
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### ROWTIME を使用したタンブリングウィンドウ
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### 最も頻繁に出現する値 (TOP\$1K\$1ITEMS\$1TUMBLING) の取得
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### おおよそのトップ K 項目
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### 例: ウェブログの解析 (W3C\$1LOG\$1PARSE 関数)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### 例: 複数のフィールドへの文字列の分割 (VARIABLE\$1COLUMN\$1LOG\$1PARSE 関数)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Joins
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### エラー
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Random Cut Forest ワークロードの移行
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Random Cut Forest を使用するワークロードを Kinesis Analytics for SQL から Managed Service for Apache Flink に移行することを検討している方のために、この[AWS ブログ記事では](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/)、Managed Service for Apache Flink を使用して異常検出用のオンライン RCF アルゴリズムを実行する方法を紹介します。

## ソースとしての Kinesis Data Firehose を Kinesis Data Streams に置き換える
<a name="examples-firehose"></a>

詳細なチュートリアルについては、「[Converting-KDASQL-KDAStudio/](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)」を参照してください。

次の演習では、Amazon Managed Service for Apache Flink Studio を使用するためにデータフローを変更します。これは、Amazon Kinesis Data Firehose から Amazon Kinesis Data Streams に切り替えることも意味します。

まずは一般的な KDA-SQL アーキテクチャを紹介し、次に Amazon Managed Service for Apache Flink Studio とAmazon Kinesis Data Streams を使用してこれを置き換える方法を示します。または、[ここで](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) CloudFormation テンプレートを起動することもできます。

### Amazon Kinesis Data Analytics-SQL と Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

Amazon Kinesis Data Analytics SQL アーキテクチャフローは次のとおりです。

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/legacy-sql.png)


まず、レガシーの Amazon Kinesis Data Analytics-SQL と Amazon Kinesis Data Firehose セットアップについて調べます。このユースケースでは、株式ティッカーや価格を含む取引データが外部ソースから Amazon Kinesis システムにストリーミングされる取引市場を扱います。Amazon Kinesis Data Analytics for SQL は、入力ストリームを使用してタンブリングウィンドウなどのウィンドウクエリを実行し、各株式ティッカーの 1 分間の取引量と `min`、`max`、`average` 取引価格を特定します。  

Amazon Kinesis Data Analytics-SQL は Amazon Kinesis Data Firehose API からデータを取り込むように設定されています。処理の後、Amazon Kinesis Data Analytics-SQL は処理されたデータを別の Amazon Kinesis Data Firehose に送信します。これがその出力を Amazon S3 バケットに保存します。

この場合は、 Amazon Kinesis Data Generator を使用します。Amazon Kinesis Data Generator を使用すると、Amazon Kinesis Data Streams または Amazon Kinesis Data Firehose 配信ストリームにテストデータを送信できます。開始するには、[こちら](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)の手順に従ってください。手順に記載されているテンプレートの代わりに、[ここで](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) CloudFormation テンプレートを使用します。 [https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)

 CloudFormation テンプレートを実行すると、出力セクションに Amazon Kinesis Data Generator の URL が表示されます。[ここ](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)で設定した Cognito ユーザー ID とパスワードを使用してポータルにログインします。リージョンとターゲットストリーム名を選択します。現在の状態については、Amazon Kinesis Data Firehose 配信ストリームを選択してください。新しい状態については、Amazon Kinesis Data Firehose 配信ストリームを選択してください。要件に応じて複数のテンプレートを作成し、ターゲットストリームに送信する前に [**テストテンプレート**] ボタンを使用すると、テンプレートをテストできます。

Amazon Kinesis Data Generator を使用したサンプルペイロードを以下に示します。Data Generator は、入力された Amazon Kinesis Firehose Streams をターゲットにして、データを継続的にストリーミングします。Amazon Kinesis SDK クライアントは、他のプロデューサーからのデータも送信できます。 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

次の JSON を使用して、取引日時、株式ティッカー、株価をランダムに生成します。

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

**[データを送信]**を選択すると、Generator はモックデータの送信を開始します。

外部システムが Amazon Kinesis Data Firehose にデータをストリーミングします。Amazon Kinesis Data Analytics for SQL アプリケーションを使用すると、Java を使用してストリーミングデータを処理および分析できます。このサービスを使用すると、ストリーミングソースに対する SQL コードを作成して実行し、時系列分析の実行、ダッシュボードへのリアルタイムフィード、メトリクスのリアルタイム作成を行うことができます。Amazon Kinesis Data Analytics for SQL アプリケーションでは、入力ストリームの SQL クエリから送信先ストリームを作成し、その送信先ストリームを別の Amazon Kinesis Data Firehose に送信できます。送信先の Amazon Kinesis Data Firehose は、分析データを最終状態として Amazon S3 に送信できます。

Amazon Kinesis Data Analytics-SQL レガシーコードは SQL 標準の拡張に基づいています。

Amazon Kinesis Data Analytics-SQL では、次のクエリを使用します。まず、クエリ出力の送信先ストリームを作成します。次に、`PUMP` を使用します。これは Amazon Kinesis Data Analytics Repository Object (SQL 標準の拡張) で、継続的に実行される `INSERT INTO stream SELECT ... FROM` クエリ機能を提供するため、クエリの結果を名前付きストリームに継続的に入力できます。 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

上記の SQL では 2 つのタイムウィンドウを使用します。`tradeTimestamp` は受信ストリームのペイロードから取得され、`ROWTIME.tradeTimestamp` は `Event Time` または `client-side time` とも呼ばれます。イベントが発生した時間であるため、分析でこの時間を使用するのが望ましい場合がよくあります。しかし、携帯電話やウェブクライアントなど多くのイベントソースは信頼性の高い時計を持たないため、時間が不正確になる場合があります。さらに、接続性の問題で、レコードがイベントの発生と同じ順序でストリームに現れない場合があります。 

アプリケーション内ストリームには、`ROWTIME` という特別な行も含まれています。Amazon Kinesis Data Analytics によって最初のアプリケーション内ストリームに行が挿入されると、タイムスタンプが保存されます。`ROWTIME` は、Amazon Kinesis Data Analytics がストリーミングソースからレコードを読み取った後、最初のアプリケーション内ストリームにレコードを挿入した時点のタイムスタンプを反映します。この `ROWTIME` 値はその後、アプリケーション全体で維持されます。 

SQL は、60 秒間隔でティッカーのカウント (`volume`) と `min`、`max`、`average` 価格を特定します。 

時間ベースのウィンドウクエリでこれらの時間を使用するには、それぞれ利点と欠点があります。これらの時間を 1 つ以上選択し、またそれに伴う欠点に対処する戦略をお客様のユースケースシナリオに基づいて選択します。 

2 ウィンドウ戦略では、2 つの時間ベースの値 (両方の `ROWTIME`、イベント時間などのもう 1 つの時間) を使用します。
+ 次の例に示すように、クエリで結果を発行する頻度を制御する `ROWTIME` を最初のウィンドウとして使用します。論理時間としては使用されません。
+ 分析に関連付ける論理時間であるその他の時間のうち 1 つを使用します。この時間は、いつイベントが発生したかを示します。次の例では、分析の目的はレコードをグループ化し、ティッカーでカウントを返すことです。

### Amazon Managed Service for Apache Flink Studio 
<a name="examples-studio"></a>

更新されたアーキテクチャでは、Amazon Kinesis Data Firehose を Amazon Kinesis Data Streams に置き換えます。Amazon Kinesis Data Analytics for SQL アプリケーションは Amazon Managed Service for Apache Flink Studio に置き換えられました。Apache Flink コードは Apache Zeppelin ノートブック内でインタラクティブに実行されます。Amazon Managed Service for Apache Flink Studio は、収集した取引データを保存用の Amazon S3 バケットに送信します。その手順を以下に示します。

Amazon Managed Service for Apache Flink Studio のアーキテクチャフローは次のとおりです。

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Kinesis データストリームを作成する
<a name="examples-studio-create-data-stream"></a>

**コンソールを使用してデータストリームを作成するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

1. ナビゲーションバーで、リージョンセレクターを展開し、リージョンを選択します。

1. [**データストリームの作成**] を選択します。

1. [**Kinesis ストリームの作成**] ページで、データストリームの名前を入力し、デフォルトの [**オンデマンド**] 容量モードを選択します。

   **[オンデマンド]** モードの場合、**[Kinesis ストリームの作成]** を選択して、データストリームを作成することができます。

   ストリームの作成中、[**Kinesis ストリーム**] ページのストリームの**ステータス**は、**Creating** になります。ストリームを使用する準備が完了すると、**ステータス**は **Active** に変わります。

1. ストリームの名前を選択します。[**ストリームの詳細**] ページには、ストリーム設定の概要とモニタリング情報が表示されます。

1. Amazon Kinesis Data Generator で、ストリーム/配信ストリームを新しい Amazon Kinesis Data Streams **TRADE\$1SOURCE\$1STREAM** に変更します。

   JSON とペイロードは Amazon Kinesis Data Analytics-SQL に使用したものと同じになります。Amazon Kinesis Data Generator を使用してサンプルの取引ペイロードデータを作成し、この演習では **TRADE\$1SOURCE\$1STREAM** データストリームをターゲットにします。

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1. Managed Service for Apache Flink AWS マネジメントコンソール に移動し、**アプリケーションの作成**を選択します。

1. 左側のナビゲーションペインで、[**Studio ノートブック**]、[**ノートブックインスタンスの作成**] の順に選択します。

1. Studio ノートブック名を入力します。

1. [**AWS Glue データベース**] で、ソースと宛先のメタデータを定義する既存の AWS Glue データベースを指定します。 AWS Glue データベースがない場合は、**作成**を選択し、以下を実行します。

   1.  AWS Glue コンソールで、左側のメニューから**データカタログ**の****データベースを選択します。

   1. [**データベースの作成**] を選択します。

   1. [**データベースの作成**] ページで、データベースの名前を入力します。[**場所 — オプション**] セクションで、[**Amazon S3 を参照する**] を選択した上で、Amazon S3 バケットを選択します。Amazon S3 バケットをまだセットアップしていない場合は、このステップをスキップし、後に再開することができます。

   1. (オプション)。データベースの説明を入力します。

   1. **[データベースの作成]** を選択します。

1. [**ノートブックの作成)**] を選択します。

1. ノートブックを作成したら、[**実行**] を選択します。

1. ノートブックが正常に起動したら、**[Apache Zeppelin で開く]** を選択して Zeppelin ノートブックを起動します。

1. Zeppelin ノートブックのページで [**新規ノートを作成**] を選択し、MarketDataFeed と命名します。

Flink SQL コードについては以下で説明しますが、まず[ Zeppelin ノートブックの画面は次のようになります](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg)。ノートブック内の各ウィンドウは個別のコードブロックで、一度に 1 つずつ実行できます。

##### Amazon Managed Service for Apache Flink Studio Code
<a name="examples-studio-code"></a>

Amazon Managed Service for Apache Flink Studio は、Zeppelin ノートブックを使用してコードを実行します。この例では、Apache Flink 1.13 に基づく ssql コードへのマッピングが行われています。以下では、Zeppelin ノートブックのコードを 1 ブロックずつ示します。  

Zeppelin ノートブックでコードを実行する前に、Flink 設定コマンドを実行する必要があります。コード (ssql、Python、または Scala) を実行した後に設定を変更する必要がある場合は、ノートブックを停止して再起動する必要があります。この例では、チェックポイントを設定する必要があります。Amazon S3 のファイルにデータをストリーミングできるようにするには、チェックポイントが必要です。これにより、Amazon S3 へのデータストリームをファイルにフラッシュできます。以下のステートメントは、間隔を 5000 ミリ秒に設定します。  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` は、このブロックが設定ステートメントであることを示します。チェックポイントを含む Flink 設定の詳細については、「[Apache Flink Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/)」を参照してください。  

ソース Amazon Kinesis Data Streams の入力テーブルは、以下の Flink ssql コードを使用して作成されます。`TRADE_TIME` フィールドには、データジェネレーターが作成した日付/時刻が格納されることに注意してください。

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

入力ストリームは次のステートメントで確認できます。

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

集計データを Amazon S3 に送信する前に、Amazon Managed Service for Apache Flink Studio でタンブリングウィンドウの選択クエリを使用してデータを直接表示できます。これにより、取引データが 1 分のタイムウィンドウに集約されます。%flink.ssql ステートメントには (type=update) という指定が必要であることに注意してください。

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

そうすると、Amazon S3 でターゲット用のテーブルを作成できます。ウォーターマークを使用する必要があります。ウォーターマークは、これ以上遅延イベントが発生しないと確信できる時点を示す進捗指標です。ウォーターマークが表示されるのは、到着が遅れた場合を考慮に入れるためです。この `‘5’ Second` 間隔により、5 秒遅れて Amazon Kinesis Data Stream に取引を入力することが可能になり、このウィンドウ内にタイムスタンプが存在する場合は取引が含まれるようになります。詳細については、「[Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/)」を参照してください。   

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

このステートメントはデータを `TRADE_DESTINATION_S3` に挿入します。`TUMPLE_ROWTIME` はタンブリングウィンドウの上限を含むタイムスタンプです。

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

ステートメントを 10 ～ 20 分間実行して、Amazon S3 にデータを蓄積します。その後、ステートメントを中止します。

これにより Amazon S3 内のファイルが閉じて表示可能になります。

内容は以下のようになっています。

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


この [CloudFormation テンプレート](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)を使用してインフラストラクチャを作成できます。

CloudFormation は、 AWS アカウントに次のリソースを作成します。
+  Amazon Kinesis Data Streams
+  Amazon Managed Service for Apache Flink Studio
+  AWS Glue データベース
+  Amazon S3 バケット
+  Amazon Managed Service for Apache Flink Studio で適切なリソースにアクセスするための IAM ロールとポリシー

ノートブックをインポートし、 によって作成された新しい Amazon S3 バケットを使用して Amazon S3 バケット名を変更します CloudFormation。

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### 詳細を見る
<a name="more"></a>

Managed Service for Apache Flink Studio の使用方法の詳細については、次の追加リリソースを参照してください。
+ [Managed Service for Apache Flink Studio Notebooks 開発者ガイド](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Apache Flink 1.13 のドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Managed Service for Apache Flink Studio ワークショップ ](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Apache Flink Windowing](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Amazon Kinesis Data Analytics 開発者ガイド — Kinesis Data Analytics Stream から S3 バケットへの書き込み](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# ユーザー定義関数 (UDF) の活用
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

このパターンの目的は、Kinesis Data Analytics-Studio Zeppelin ノートブックの UDF を活用して Kinesis ストリームのデータを処理する方法を説明することです。Managed Service for Apache Flink Studio は、Apache Flink を使用して高度な分析機能を提供します。これには、1 回限りの処理セマンティクス、イベント時間のウィンドウ、ユーザー定義関数とカスタム統合を使用した拡張性、命令型言語サポート、永続的なアプリケーション状態、水平スケーリング、複数のデータソースのサポート、拡張可能な統合などが含まれます。これらの機能は、データストリーム処理の正確性、完全性、一貫性、信頼性を確保するために不可欠で、Amazon Kinesis Data Analytics for SQL では利用できません。

このサンプルアプリケーションでは、KDA-Studio Zeppelin ノートブックの UDF を活用して Kinesis ストリームのデータを処理する方法を紹介します。Kinesis Data Analytics 用 Studio ノートブックでは、データストリームをリアルタイムでインタラクティブにクエリし、標準 SQL、Python、Scala を使用してストリーム処理アプリケーションを簡単に構築して実行できます。を数回クリックするだけで AWS マネジメントコンソール、サーバーレスノートブックを起動してデータストリームをクエリし、数秒で結果を取得できます。詳細については、「[Studio ノートブックを Kinesis Data Analytics for Apache Flink で使用する](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html)」を参照してください。

KDA-SQL アプリケーションのデータの前処理/後処理に使用される Lambda 関数。

![\[alt text not found\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/sql-udf-1.png)


KDA-Studio Zeppelin ノートブックを使用してデータを前処理または後処理するためのユーザー定義関数

![\[alt text not found\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/sql-udf.png)


## ユーザー定義関数 (UDF)
<a name="examples-migrating-to-kda-studio-udfs"></a>

一般的なビジネスロジックをオペレータに再利用するには、ユーザー定義関数を参照してデータストリームを変換すると便利です。これは Managed Service for Apache Flink Studio ノートブック内で行うことも、外部から参照されるアプリケーション jar ファイルとして行うこともできます。ユーザー定義関数を利用すると、ストリーミングデータに対して実行する変換やデータエンリッチメントを簡略化できます。

 ノートブックでは、個人の電話番号を匿名化する機能を備えた単純な Java アプリケーション jar を参照することになります。Python や Scala の UDF を記述してノートブック内で使用することもできます。アプリケーション jar を Pyflink ノートブックにインポートする機能を強調するため、Java アプリケーション jar を選択しています。

## 環境設定
<a name="examples-migrating-to-kda-studio-setup"></a>

このガイドに従ってストリーミングデータを操作するには、 AWS CloudFormation スクリプトを使用して以下のリソースを起動します。
+ Kinesis Data Streams をソースとする場合
+ Glue データベース
+ IAM ロール
+ Managed Service for Apache Flink アプリケーション
+ Managed Service for Apache Flink Studio アプリケーションを開始する Lambda 関数
+ 上記の Lambda 関数を実行する Lambda ロール
+ Lambda 関数を呼び出すカスタムリソース

 CloudFormation テンプレートはこちらからダウンロード[してください](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)。

**CloudFormation スタックを作成する**

1. に移動 AWS マネジメントコンソール し、サービスのリストで **CloudFormation** を選択します。

1. [**クラウドの形成**] ページでは、[**スタックの作成**]、[**新しいリソースの使用 (スタンダード)**] の順に選択します。

1. **[スタックの作成]** ページで、**[テンプレートファイルをアップロード]** を選択してから、以前にダウンロードした `kda-flink-udf.yml` を選択します。ファイルを選択してから、[**次へ**] を選択します。

1. テンプレートには `kinesis-UDF` のような覚えやすい名前を付け、別の名前を付けたい場合は input-stream などの入力パラメータを更新します。[**次へ**] を選択します。

1. **[スタックオプションの設定]** ページで、必要に応じて **[タグ]** を追加し、**[次へ]** を選択します。

1. [**レビュー**] ページで IAM リソースの作成を許可するボックスにチェックを入れ、[**提出**] を選択します。

 起動するリージョンによっては、 CloudFormation スタックの起動に 10～15 分かかる場合があります。スタック全体の `CREATE_COMPLETE` ステータスが表示されたら、次に進むことができます。

## Managed Service for Apache Flink Studio ノートブックで作業する
<a name="examples-migrating-to-kda-studio-notebook"></a>

Kinesis Data Analytics 用 Studio ノートブックでは、データストリームをリアルタイムでインタラクティブにクエリし、標準 SQL、Python、Scala を使用してストリーム処理アプリケーションを簡単に構築して実行できます。を数回クリックするだけで AWS マネジメントコンソール、サーバーレスノートブックを起動してデータストリームをクエリし、数秒で結果を取得できます。

ノートブックはウェブベースの開発環境です。ノートブックでは、Apache Flink が提供する高度なデータストリーム処理機能と組み合わせて、シンプルでインタラクティブな開発環境を実現できます。Studio ノートブックは、Apache Zeppelin をベースとしたノートブックを使用し、ストリーム処理エンジンとして Apache Flink を使用しています。Studio ノートブックはこれらのテクノロジーをシームレスに組み合わせて、あらゆるスキルを持つ開発者がデータストリームの高度な分析にアクセスできるようにします。

 Apache Zeppelin は、Studio ノートブックに次のような分析ツール一式を提供します。
+ データの視覚化
+ ファイルにデータをエクスポートする
+ 分析を容易にする出力形式の制御 

**ノートブックの使用**

1. に移動 AWS マネジメントコンソール し、サービスのリストで **Amazon Kinesis** を選択します。

1. 左側のナビゲーションページで [**Analytics アプリケーション**] を選択してから[**Studio ノートブック**] を選択します。

1. **KinesisDataAnalyticsStudio** ノートブックが実行されていることを確認します。

1. ノートブックを選択し、[**Apache Zeppelin で開く**] を選択します。

1. Kinesis Stream へのデータの読み取りと読み込みに使用する[データプロデューサー Zeppelin ノートブック](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln)ファイルをダウンロードします。

1.  `Data Producer` Zeppelin ノートブックをインポートします。ノートブックで、入力 `STREAM_NAME` と `REGION` のコードを変更してください。入力ストリーム名は[CloudFormation スタック出力](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)にあります。

1. [**この段落を実行**] ボタンを選択して **Data Producer** ノートブックを実行し、入力の Kinesis Data Stream にサンプルデータを挿入します。

1. サンプルデータが読み込まれている間に、[MaskPhoneNumber-Interactive ノートブック](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln)をダウンロードします。このノートブックは、入力データを読み取り、入力ストリームから電話番号を匿名化し、匿名化されたデータを出力ストリームに保存します。

1. `MaskPhoneNumber-interactive` Zeppelin ノートブックをインポートします。

1. ノートブック内の各段落を実行します。

   1. 第 1 段落では、電話番号を匿名化するユーザー定義関数をインポートします。

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. 次の段落では、入力ストリームデータを読み取るためのメモリ内テーブルを作成します。ストリーム名と AWS リージョンが正しいことを確認します。

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. データがメモリ内テーブルに読み込まれているか確認してください。

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. ユーザー定義関数を呼び出して、電話番号を匿名化します。

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. 電話番号がマスクされたので、番号をマスクしたビューを作成します。

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. データを検証します。

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. 出力 Kinesis Stream 用のメモリ内テーブルを作成します。ストリーム名と AWS リージョンが正しいことを確認します。

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. 更新したレコードをターゲット Kinesis Stream に挿入します。

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. ターゲット Kinesis Stream のデータを表示して検証します。

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## ノートブックをアプリケーションとしてプロモートする
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

ノートブックのコードをインタラクティブにテストしたので、コードを耐久性の高いストリーミングアプリケーションとしてデプロイします。まず、アプリケーション設定を変更して Amazon S3 内のコードの場所を指定する必要があります。

1. でノートブックを選択し AWS マネジメントコンソール、**アプリケーション設定としてデプロイ - オプション**で編集 を選択します****。

1. [**Amazon S3 のコードの送信先**] で、[CloudFormation スクリプト](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)によって作成された Amazon S3 バケットを選択します。プロセスには数分かかることがあります。

1. ノートをそのままプロモートすることはできません。実行すると、`Select` ステートメントがサポートされていないためエラーになります。この問題を回避するには、[MaskPhoneNumber ストリーミング Zeppelin ノートブック](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln)をダウンロードしてください。

1. `MaskPhoneNumber-streaming` Zeppelin ノートブックをインポートします。

1. メモを開き、[**KinesisDataAnalyticsStudio のアクション**] を選択します。

1. **[MaskPhoneNumber-Streaming のビルド] を選択し、S3 にエクスポートします**。**アプリケーション名**を変更し、特殊文字を含めないようにしてください。

1. [**ビルドしてエクスポート**] を選択します。ストリーミングアプリケーションの設定には数分かかります。

1.  ビルドが完了したら、[** AWS コンソールを使用してデプロイ**] を選択します。

1.  次のページで設定を確認し、正しい IAM ロールを選択していることを確認します。次に、[**ストリーミングアプリケーションの作成**] を選択します。

1. 数分後、ストリーミングアプリケーションが正常に作成されたことを示すメッセージが表示されます。

 永続状態と制限のあるアプリケーションのデプロイに関する詳細については、「[永続的な状態のアプリケーションとしてデプロイする](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html)」を参照してください。

## クリーンアップ
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

オプションで、[CloudFormation スタックをアンインストールできるようになりました](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html)。これにより、以前に設定したサービスがすべて削除されます。