

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 遷移至 Managed Service for Apache Flink Studio 範例
<a name="migrating-to-kda-studio-overview"></a>

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式。為了協助您規劃和遷移 Amazon Kinesis Data Analytics for SQL 應用程式，我們將在 15 個月內逐漸停止該方案。這些是需要注意的重要日期，2**025 年 9** 月 1 日、**2025 年 10 月 15** 日和 **2026 年 1 月 27** 日。

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

1. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Amazon Kinesis Data Analytics for SQL 應用程式。

1. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 應用程式將不再提供支援。如需詳細資訊，請參閱 [Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

我們建議您使用 [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)。它結合了易於使用與進階分析功能，可讓您在幾分鐘內建置串流處理應用程式。

本節提供程式碼和架構範例，協助您將 Amazon Kinesis Data Analytics for SQL 應用程式工作負載移至 Managed Service for Apache Flink。

如需詳細資訊，請參閱此[AWS 部落格文章：從 Amazon Kinesis Data Analytics for SQL 應用程式遷移至 Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/)。

## 在 Managed Service for Apache Flink Studio 中複寫 Kinesis Data Analytics for SQL 查詢
<a name="examples-migrating-to-kda-studio"></a>

若要將您的工作負載遷移至 Managed Service for Apache Flink Studio 或 Managed Service for Apache Flink，本節提供可用於常見使用案例的查詢翻譯。

探索這些範例之前，建議您先檢閱[使用 Studio 筆記本搭配 Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html)。

### 在 Managed Service for Apache Flink Studio 中重建 Kinesis Data Analytics for SQL 查詢
<a name="examples-recreating-queries"></a>

下列選項提供常用 SQL 型 Kinesis Data Analytics 應用程式查詢的轉譯至 Managed Service for Apache Flink Studio。

#### 多步驟應用程式
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### 轉換 DateTime 值
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 簡單提醒
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### 限流提醒
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### 從查詢彙總部分結果
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### 轉換字串值
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 使用 Regex 替換子字串
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Regex 日誌剖析
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### 轉換 DateTime 值
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 視窗與彙總
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### 使用列時間的輪轉視窗
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### 擷取最常出現的值 (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### 大約前 K 個項目
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### 剖析網頁日誌 (W3C\$1LOG\$1PARSE 函數)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### 將字串拆分為多個欄位（VARIABLE\$1COLUMN\$1LOG\$1PARSE 函數）
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### 聯結
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### 錯誤
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## 遷移隨機分割森林工作負載
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

如果您想要將使用隨機分割森林的工作負載從 Kinesis Analytics for SQL 移動到 Managed Service for Apache Flink，此[AWS 部落格文章](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/)示範如何使用 Managed Service for Apache Flink 來執行線上 RCF 演算法進行異常偵測。

## 將 Kinesis Data Firehose 取代為具有 Kinesis Data Streams 的來源
<a name="examples-firehose"></a>

如需完整的教學課程，請參閱 [Converting-KDASQL-KDAStudio/](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)。

在下列練習中，您將變更資料流程來使用 Amazon Managed Service for Apache Flink Studio。這也意味著從 Amazon Kinesis Data Firehose 切換到 Amazon Kinesis Data Streams。

首先，我們分享一個典型的 KDA-SQL 架構，接著展示如何使用 Amazon Managed Service for Apache Flink Studio 和 Amazon Kinesis Data Streams.替換此架構。或者，您也可以[在此處](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)啟動 CloudFormation 範本：

### Amazon Kinesis Data Analytics-SQL 和 Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

以下是 Amazon Kinesis Data Analytics SQL 架構流程：

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/legacy-sql.png)


我們首先檢查傳統 Amazon Kinesis Data Analytics-SQL 和 Amazon Kinesis Data Firehose 的設置。此使用案例是交易市場，其中包括股票代號和價格在內的交易資料會從外部來源串流至 Amazon Kinesis 系統。Amazon Kinesis Data Analytics for SQL 使用輸入串流來執行輪轉時段等視窗化查詢，以判斷每個股票代號在一分鐘時段內的交易量和 `max`、 `min`和 `average`交易價格。  

Amazon Kinesis Data Analytics-SQL 已準備好從 Amazon Kinesis Data Firehose API 擷取資料。處理完畢後，Amazon Kinesis Data Analytics-SQL 會將處理過的資料傳送到另一個 Amazon Kinesis Data Firehose，然後將輸出儲存在 Amazon S3 儲存貯體中。

在這種情況下，您可以使用 Amazon Kinesis 資料產生器。Amazon Kinesis 資料產生器可讓您將測試資料傳送到 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 交付串流。若要開始使用，請依照[此處](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)的指示進行。使用[此處](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)的 CloudFormation 範本取代說明中提供的範本[：](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)

執行 CloudFormation 範本後，輸出區段將提供 Amazon Kinesis Data Generator URL。使用您在[此處](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)設定的 Cognito 使用者 ID 和密碼登入入口網站。選取地區和目標串流名稱。針對目前的狀態，選擇 Amazon Kinesis Data Firehose 交付串流。針對新狀態，選擇 Amazon Kinesis Data Firehose 串流名稱。您可以根據需求建立多個範本，並在傳送範本至目標串流前使用**測試範本**按鈕來測試範本。

以下是使用 Amazon Kinesis 資料產生器的範例承載。資料產生器之目標為 Amazon Kinesis Firehose 的輸入串流，以持續串流資料。Amazon Kinesis SDK 用戶端也可以從其他生產者傳送資料。 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

以下 JSON 用於生成一系列隨機的交易時間和日期，股票代號和股票價格：

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

選擇**傳送資料**後，生成器將開始傳送模擬資料。

外部系統會將資料串流到 Amazon Kinesis Data Firehose。使用 Amazon Kinesis Data Analytics for SQL 應用程式，您可以用標準 SQL 來分析串流資料。此服務可讓您針對串流來源撰寫和執行 SQL 程式碼，以執行時間序列分析、饋送即時儀表板，以及建立即時指標。Amazon Kinesis Data Analytics for SQL 應用程式可以從輸入串流上的 SQL 查詢建立目標串流，然後將目標串流傳送到另一個 Amazon Kinesis Data Firehose。目的地 Amazon Kinesis Data Firehose 可以將分析資料傳送到 Amazon S3 做為最終狀態。

Amazon Kinesis Data Analytics-SQL 舊版程式碼的基礎，是 SQL 標準的延伸模組。

在 Amazon Kinesis Data Analytics-SQL 中使用以下查詢。首先建立查詢輸出的目標串流。然後，您可以使用 `PUMP` Amazon Kinesis Data Analytics 儲存庫物件 (SQL 標準的延伸模組)，提供持續執行的 `INSERT INTO stream SELECT ... FROM` 查詢功能，進而讓查詢結果持續輸入到具名串流中。 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

上述 SQL 使用兩個時段 – `tradeTimestamp` 來自傳入串流承載，`ROWTIME.tradeTimestamp`也稱為 `Event Time`或 `client-side time`。需要在分析中偏好使用此時間，因其為事件發生的時間。不過，許多事件來源 (例如行動電話和 Web 用戶端) 沒有可靠的時鐘，這可能會導致不正確的時間。此外，連線問題可能會導致串流上的記錄顯示順序與事件發生的順序不相同。 

應用程式內串流也包含一個名為 `ROWTIME` 的特殊資料欄。當 Amazon Kinesis Data Analytics 在第一個應用程式內串流中插入資料列時，會儲存時間戳記。 `ROWTIME` 指的是 Amazon Kinesis Data Analytics 從串流來源讀取後，將記錄插入第一個應用程式內串流的時間戳記。接著整個應用程式中皆會保留此 `ROWTIME` 值。 

SQL 會在 60 秒的間隔內，將股票代號的計數判斷為 `max`、、 `volume` `min`和 `average` 價格。 

在時間類型的視窗查詢中，使用其中的任一個時間都有優點和缺點。選擇其中一個或多個時間，與根據使用案例情境來處理相關缺點的策略。 

雙視窗策略使用兩個時間類型，包含 `ROWTIME` 與另一個其他時間，如事件時間。
+ 將 `ROWTIME` 當作第一個視窗，此視窗可控制查詢發出結果的頻率，如下列範例所示。這並非邏輯時間。
+ 把其中一個其他時間當作邏輯時間，即您想要連結到分析的時間 此時間表示事件發生的時間。在下面的例子中，分析目標是按股票代號對記錄進行分組和返回計數。

### Amazon Managed Service for Apache Flink Studio 
<a name="examples-studio"></a>

在更新的架構中，您可以使用 Amazon Kinesis Data Streams 取代 Amazon Kinesis Data Firehose。Amazon Kinesis Data Analytics for SQL 應用程式已由 Amazon Managed Service for Apache Flink Studio 取代。Apache Flink 程式碼會在 Apache Zeppelin 筆記本中交互運行。Amazon Managed Service for Apache Flink Studio 會將彙總的交易資料傳送到 Amazon S3 儲存貯體來儲存。步驟如下所示：

此為 Amazon Managed Service for Apache Flink Studio 的架構流程：

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/kda-studio.png)


#### 建立 Kinesis Data Stream
<a name="examples-studio-create-data-stream"></a>

**使用主控台建立資料串流**

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽列中，展開區域選擇工具，然後選擇一個區域。

1. 選擇 **建立資料串流**。

1. 在**建立 Kinesis 串流**頁面上，輸入資料串流的名稱，然後接受預設的**隨需**容量模式。

   在**隨需**模式下，您可以選擇**建立 Kinesis 串流**來建立資料串流。

   建立串流時，在 **Kinesis 串流**頁面上，串流的**狀態**會是**正在建立**。當串流就緒可供使用後，其**狀態**將變成**作用中**。

1. 選擇串流名稱。**串流詳細資訊**頁面會顯示串流組態的摘要以及監控資訊。

1. 在 Amazon Kinesis 資料產生器中，將串流/交付串流變更為新的 Amazon Kinesis Data Streams：**TRADE\$1SOURCE\$1STREAM**。

   JSON 和承載會與您用於 Amazon Kinesis Data Analytics-SQL 的相同。使用 Amazon Kinesis 資料產生器產生一些交易承載資料範例，並針對本練習將 **TRADE\$1SOURCE\$1STREAM** 資料串流設為目標：

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1.  AWS 管理主控台 前往 Managed Service for Apache Flink，然後選擇**建立應用程式**。

1. 在左邊的導覽窗格中，選擇 **Studio 筆記本**，然後選擇**建立 Studio 筆記本**。

1. 輸入 Studio 筆記本的名稱。

1. 在 **AWS Glue 資料庫**中，提供現有的資料 AWS Glue 資料庫，以定義您的來源和目的地之中繼資料。如果您沒有 AWS Glue 資料庫，請選擇**建立**並執行下列動作：

   1. 在 AWS Glue 主控台中，從左側選單選擇**資料目錄**下的**資料庫**。

   1. 選擇**建立資料型錄**。

   1. 在**建立資料庫**頁面中輸入資料庫的名稱。在**位置 - 選用**區段中，選擇**瀏覽 Amazon S3**並選取 Amazon S3 儲存貯體。如果您還沒有設定好 Amazon S3 儲存貯體，您可以跳過此步驟，稍後再回來。

   1. (選用)。輸入資料庫的說明。

   1. 選擇**建立資料庫**。

1. 選擇**建立筆記本**。

1. 建立您的筆記本後，選擇**執行**。

1. 筆記本成功啟動後，請選擇**在 Apache Zeppelin 中開啟來啟動 Zeppelin** 筆記本。

1. 在 Zeppelin 筆記本頁面上，選擇**建立新的筆記**並將其命名為 *MarketDataFeed*。

Flink SQL 程式碼解釋如下，但首先，[這是 Zeppelin 筆記本的畫面](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg)。筆記本中的每個視窗都是單獨的程式碼區塊，一次可運行一個。

##### Amazon Managed Service for Apache Flink Studio 程式碼
<a name="examples-studio-code"></a>

Amazon Managed Service for Apache Flink Studio 使用 Zeppelin 筆記本來運行程式碼。此範例以 Apache Flink 1.13 為基礎映射到 ssql 程式碼。Zeppelin 筆記本中的程式碼如下所示，一次一個區塊。  

在您的 Zeppelin 筆記本運行任何程式碼前，必須運行 Flink 組態命令。如果您需要在執行程式碼 (ssql、Python 或 Scala) 後變更任何組態設定，您必須停止並重新啟動筆記本。在此範例中，您必須設定檢查點。需要檢查點，才能將資料串流到 Amazon S3 中的檔案。這可將串流至 Amazon S3 的資料排清到檔案中。下列陳述式會將間隔設定為 5000 毫秒。  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` 表示此區塊為組態陳述式。如需包括檢查點在內的 Flink 組態詳細資訊，請參閱 [Apache Flink 檢查點](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/)。  

使用下列 Flink ssql 程式碼建立來源 Amazon Kinesis Data Streams 的輸入資料表。請注意，`TRADE_TIME` 字段會儲存由資料生成器創建的日期/時間。

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

您可以使用以下陳述式查看輸入串流：

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

在彙總資料傳送到 Amazon S3 之前，您可以用翻轉視窗選擇查詢在 Amazon Managed Service for Apache Flink 中直接檢視該資料。這會在一分鐘的時間範圍內彙總交易資料。請注意，%flink.ssql 陳述式必須具有 (類型 = 更新) 指定：

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

然後，您可以在 Amazon S3 中建立目的地路由表。您必須使用浮水印。浮水印是一種進度指標，指出您確信不會再有延遲事件的時間點。浮水印是為了因應遲到的情形。間隔 `‘5’ Second` 允許交易延遲 5 秒進入 Amazon Kinesis Data Stream，如果在視窗內有時間戳記，則仍會包含在內。如需詳細資訊，請參閱[產生浮水印](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/)。   

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

此陳述式會將資料插入到 `TRADE_DESTINATION_S3`。`TUMPLE_ROWTIME` 是翻轉視窗包容性上界的時間戳記。

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

讓陳述式執行 10 到 20 分鐘，以便在 Amazon S3 中累積一些資料。然後中止你的陳述式。

此舉會關閉 Amazon S3 中的檔案，讓其成為可檢視狀態。

以下是內容的樣子：

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


您可以使用[CloudFormation 範本](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)來建立基礎架構。

CloudFormation 會在您的帳戶中建立下列資源 AWS ：
+  Amazon Kinesis Data Streams
+  Amazon Managed Service for Apache Flink Studio
+  AWS Glue 資料庫
+  Amazon S3 儲存貯體
+  適用於 Amazon Managed Service for Apache Flink Studio，可存取適當資源的 IAM 角色和政策

匯入筆記本，並使用 建立的新 Amazon S3 儲存貯體變更 Amazon S3 儲存貯體名稱 CloudFormation。

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### 查看更多
<a name="more"></a>

以下是一些額外的資源，您可以用來進一步了解如何使用 Managed Service for Apache Flink Studio：
+ [Managed Service for Apache Flink 筆記本開發人員指南](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Apache Flink 1.13 文件](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Amazon Managed Service for Apache Flink Studio 研討會 ](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Apache Flink 視窗化](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Amazon Kinesis Data Analytics 開發人員指南 — 從 Kinesis Data Analytics 串流寫入 S3 儲存貯體 ](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# 利用使用者定義函數 (UDF)
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

該模式的目的是示範如何在 Kinesis Data Analytics-Studio Zeppelin 筆記本中運用 UDF，以處理 Kinesis 串流中的資料。Managed Service for Apache Flink Studio 使用 Apache Flink 提供進階分析功能，包括處理語意、事件時段、使用使用者定義函數和客戶整合的可擴展性、命令式語言支援、持久的應用程式狀態、水平擴展、支援多個資料來源、可擴展整合等。這些對於確保資料串流處理的準確性，完整性，一致性和可靠性至關重要，且無法由 Amazon Kinesis Data Analytics for SQL 提供。

在此範例應用程式中，我們將示範如何利用 KDA-Studio Zeppelin 筆記本中的 UDF 來處理 Kinesis 串流中的資料。適用於 Kinesis Data Analytics 的 Studio 筆記本可讓您即時以互動方式查詢資料串流，並使用標準 SQL、Python 和 Scala 輕鬆建置和執行串流處理應用程式。只要在 中按幾下滑鼠 AWS 管理主控台，您就可以啟動無伺服器筆記本來查詢資料串流，並在幾秒鐘內取得結果。如需詳細資訊，請參閱[搭配 Kinesis Data Analytics for Apache Flink 使用 Studio 筆記本](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html)。

用於前/後處理 KDA-SQL 應用程式資料的 Lambda 函數：

![\[alt text not found\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/sql-udf-1.png)


使用 KDA-Studio Zeppelin 筆記本對資料進行前後處理的使用者定義函數

![\[alt text not found\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/sql-udf.png)


## 使用者定義的函數 (UDF)
<a name="examples-migrating-to-kda-studio-udfs"></a>

若要將通用的商業邏輯重複使用到運算子中，不妨參考使用者定義函數來轉換資料串流。此舉可在 Managed Service for Apache Flink Studio 筆記本中完成，也可以將其當作外部引用的應用程式 JAR 文件。利用使用者定義的函數可以簡化轉換或資料擴充作業，這些作業可能會在串流資料上執行。

 在筆記本中，您要引用一個簡單的 Java 應用程式 JAR，其具有匿名個人電話號碼的功能。您也可以編寫 Python 或 Scala UDF 以便在筆記本中使用。我們選擇了一個 Java 應用程式 JAR 來強調將應用程式 JAR 導入 Pyflink 筆記本的功能。

## 環境設定
<a name="examples-migrating-to-kda-studio-setup"></a>

若要遵循本指南並與串流資料互動，您將使用 AWS CloudFormation 指令碼來啟動下列資源：
+ Kinesis Data Streams 做為來源與目標
+ Glue 資料庫
+ IAM 角色
+ Managed Service for Apache Flink Studio 應用程式
+ 啟動 Managed Service for Apache Flink Studio 應用程式的 Lambda 函數
+ 執行上述 Lambda 函數的 Lambda 角色
+ 用來叫用 Lambda 函數的自訂資源

[在此處](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)下載 CloudFormation 範本。

**建立 CloudFormation 堆疊**

1. 前往 AWS 管理主控台 ，然後在 服務清單下選擇 **CloudFormation**。

1. 在**CloudFormation**頁面上，選擇**堆疊**，並選擇**用新資源建立堆疊 (標準)**。

1. 在**建立堆疊**頁面上，選擇**上傳範本檔案**，然後選擇您先前下載的 `kda-flink-udf.yml` 檔案。選擇檔案，然後選擇**下一步**。

1. 給模板一個名稱便於記憶，如 `kinesis-UDF`。如想要不同名稱的話可更新輸入參數，如輸入串流。選擇**下一步**。

1. 在**設定堆疊選項**頁面上，視需要新增**標籤**，然後選擇**下一步**。

1. 在**檢閱**頁面，勾選允許建立 IAM 資源的方塊，然後選擇**提交**。

 CloudFormation 堆疊可能需要 10 到 15 分鐘才能啟動，視您啟動的區域而定。一旦您看到整個堆疊的 `CREATE_COMPLETE` 狀態，就可以繼續。

## 使用 Managed Service for Apache Flink Studio 筆記本
<a name="examples-migrating-to-kda-studio-notebook"></a>

適用於 Kinesis Data Analytics 的 Studio 筆記本可讓您即時以互動方式查詢資料串流，並使用標準 SQL、Python 和 Scala 輕鬆建置和執行串流處理應用程式。只要在 中按幾下滑鼠 AWS 管理主控台，您就可以啟動無伺服器筆記本來查詢資料串流，並在幾秒鐘內取得結果。

筆記本是基於 Web 的開發環境。使用筆記本，您不僅能獲得簡單的互動式開發體驗，還能使用 Apache Flink 提供的進階資料串流處理功能。Studio 筆記本使用採用 Apache Zeppelin 技術的筆記本，並使用 Apache Flink 做為串流處理引擎。Studio 筆記本無縫結合了這些技術，讓所有技能背景的開發人員都能存取資料串流的進階分析。

 Apache Zeppelin 為您的 Studio 筆記本提供了完整的分析工具套件，包括以下專案：
+ 資料視覺化
+ 將資料匯出到檔案
+ 控制輸出格式以便於分析 

**使用筆記本**

1. 前往 AWS 管理主控台 ，然後在服務清單下選擇 **Amazon Kinesis**。

1. 在左側導覽頁面上，選擇**分析應用程式**，然後選擇 **Studio 筆記本**。

1. 確認 **KinesisDataAnalyticsStudio** 筆記本正在執行。

1. 選擇筆記本，然後選擇**在 Apache Zeppelin 中打開**。

1. 下載[資料生產者 Zeppelin 筆記本](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln)檔案，您可以使用該檔案讀取資料並將其載入 Kinesis 串流。

1.  匯入 `Data Producer`Zeppelin 筆記本。確保您有在筆記本程式碼中修改輸入 `STREAM_NAME` 和 `REGION`。輸入串流名稱可以在 [CloudFormation 堆疊輸出](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)中找到。

1. 選擇**執行此段落**按鈕，將樣本資料插入輸入 Kinesis 資料串流，以執行**資料生產者**筆記本。

1. 當樣本資料載入時，下載 [MaskPhoneNumber-互動式筆記本](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln)，該筆記本會讀取輸入資料，從輸入串流中匿名化電話號碼，並將匿名數據儲存到輸出流中。

1. 匯入 `MaskPhoneNumber-interactive`Zeppelin 筆記本。

1. 執行筆記本中的每個段落。

   1. 在第 1 段中，您可以匯入使用者定義函數來匿名化電話號碼。

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. 在下一段，請建立記憶體內資料表來讀取輸入串流資料。請確定串流名稱和 AWS 區域正確無誤。

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. 檢查資料是否已載入記憶體內資料表。

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. 調用用戶定義的功能以匿名化電話號碼。

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. 現在，電話號碼已被遮罩，請創建一個帶遮罩號碼的檢視。

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. 驗證資料。

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. 為輸出 Kinesis 串流建立記憶體內資料表。請確定串流名稱和 AWS 區域正確無誤。

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. 在目標 Kinesis 串流中插入更新的記錄。

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. 檢視和驗證來自目標 Kinesis 串流的資料。

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## 將筆記本提升為應用程式
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

現在您已經以互動方式測試筆記本程式碼，請將程式碼部署為具有持久狀態的串流應用程式。您必須先修改應用程式組態，以在 Amazon S3 中指定程式碼的位置。

1. 在 上 AWS 管理主控台，選擇您的筆記本，然後在**部署為應用程式組態 - 選用**，選擇**編輯**。

1. **在 Amazon S3 中的程式碼目的地**中，選擇[CloudFormation 指令碼](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)建立的 Amazon S3 儲存貯體。該程序需要幾分鐘的時間。

1. 您無法按原樣提升筆記。嘗試的話會出錯，因為 `Select` 陳述式不受支援。若要避免這個問題，請下載[MaskPhoneNumber - 串流 Zeppelin 筆記本](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln)。

1. 匯入 `MaskPhoneNumber-streaming` Zeppelin 筆記本。

1. 開啟筆記，然後選擇 **KinesisDataAnalyticsStudio 的動作**。

1. 選擇**建立 MaskPhoneNumber - 串流並匯出至 S3**。請務必重新命名**應用程式名稱**，且不要用特殊字元。

1. 選擇**建置和匯出**。需要幾分鐘的時間來設定串流應用程式。

1.  建置完成後，請選擇**使用 AWS 主控台部署**。

1.  在下一頁檢閱設定，並確保選擇正確的 IAM 角色。接下來，選擇**建立串流應用程式**。

1. 幾分鐘後，您會看到串流應用程式已成功建立的訊息。

 如需部署具有持久狀態和限制之應用程式的詳細資訊，請參閱[部署為具有持久狀態的應用程式](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html)。

## 清除
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

或者，您現在也可[解除安裝 CloudFormation 堆疊](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html)。此舉將刪除您之前設定的所有服務。