

Setelah mempertimbangkan dengan cermat, kami memutuskan untuk menghentikan Amazon Kinesis Data Analytics untuk aplikasi SQL:

1. Mulai **1 September 2025,** kami tidak akan memberikan perbaikan bug untuk Amazon Kinesis Data Analytics untuk aplikasi SQL karena kami akan memiliki dukungan terbatas untuk itu, mengingat penghentian yang akan datang.

2. Mulai **15 Oktober 2025,** Anda tidak akan dapat membuat Kinesis Data Analytics baru untuk aplikasi SQL.

3. Kami akan menghapus aplikasi Anda mulai **27 Januari 2026**. Anda tidak akan dapat memulai atau mengoperasikan Amazon Kinesis Data Analytics untuk aplikasi SQL. Support tidak akan lagi tersedia untuk Amazon Kinesis Data Analytics untuk SQL sejak saat itu. Untuk informasi selengkapnya, lihat [Amazon Kinesis Data Analytics untuk penghentian Aplikasi SQL](discontinuation.md).

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Migrasi ke Layanan Terkelola untuk Contoh Apache Flink Studio
<a name="migrating-to-kda-studio-overview"></a>

Setelah mempertimbangkan dengan cermat, kami telah membuat keputusan untuk menghentikan Amazon Kinesis Data Analytics untuk aplikasi SQL. Untuk membantu Anda merencanakan dan bermigrasi dari Amazon Kinesis Data Analytics untuk aplikasi SQL, kami akan menghentikan penawaran secara bertahap selama 15 bulan. Ini adalah tanggal penting untuk dicatat, **1 September 2025**, **15 Oktober 2025,** dan **27 Januari** 2026.

1. Mulai **1 September 2025,** kami tidak akan memberikan perbaikan bug untuk Amazon Kinesis Data Analytics untuk aplikasi SQL karena kami akan memiliki dukungan terbatas untuk itu, mengingat penghentian yang akan datang.

1. Mulai **15 Oktober 2025,** Anda tidak akan dapat membuat Amazon Kinesis Data Analytics baru untuk aplikasi SQL. 

1. Kami akan menghapus aplikasi Anda mulai **27 Januari 2026**. Anda tidak akan dapat memulai atau mengoperasikan Amazon Kinesis Data Analytics untuk aplikasi SQL. Support tidak akan lagi tersedia untuk Amazon Kinesis Data Analytics untuk aplikasi SQL sejak saat itu. Untuk mempelajari selengkapnya, lihat [Amazon Kinesis Data Analytics untuk penghentian Aplikasi SQL](discontinuation.md).

Kami menyarankan Anda menggunakan [Amazon Managed Service untuk Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html). Ini menggabungkan kemudahan penggunaan dengan kemampuan analitis tingkat lanjut, memungkinkan Anda membangun aplikasi pemrosesan aliran dalam hitungan menit.

Bagian ini menyediakan contoh kode dan arsitektur untuk membantu Anda memindahkan beban kerja Amazon Kinesis Data Analytics untuk aplikasi SQL ke Managed Service for Apache Flink.

Untuk informasi tambahan, lihat juga [posting AWS blog ini: Migrasi dari Amazon Kinesis Data Analytics untuk Aplikasi SQL ke Managed Service untuk Apache](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/) Flink Studio.

## Mereplikasi Kinesis Data Analytics untuk Kueri SQL di Managed Service untuk Apache Flink Studio
<a name="examples-migrating-to-kda-studio"></a>

Untuk memigrasikan beban kerja Anda ke Managed Service for Apache Flink Studio atau Managed Service for Apache Flink, bagian ini menyediakan terjemahan kueri yang dapat Anda gunakan untuk kasus penggunaan umum. 

Sebelum Anda menjelajahi contoh-contoh ini, kami sarankan Anda terlebih dahulu meninjau [Menggunakan notebook Studio dengan Layanan Terkelola untuk Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

### Membuat ulang Kinesis Data Analytics untuk kueri SQL di Managed Service untuk Apache Flink Studio
<a name="examples-recreating-queries"></a>

Opsi berikut menyediakan terjemahan kueri aplikasi Kinesis Data Analytics berbasis SQL yang umum ke Managed Service for Apache Flink Studio. 

#### Aplikasi Multi-Langkah
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### Mengubah nilai DateTime
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Peringatan sederhana
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### Peringatan terhambat
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### Menggabungkan Hasil Sebagian dari Kueri
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### Mengubah nilai string
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Mengganti substring menggunakan Regex
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Penguraian log Regex
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### Mengubah nilai DateTime
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Windows dan agregasi
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### Jendela Tumbling menggunakan Rowtime
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### Mengambil nilai yang paling sering terjadi (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### Perkiraan item Top-K
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### Mengurai Log Web (Fungsi W3C\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### Pisahkan String menjadi Beberapa Bidang (Fungsi VARIABLE\$1COLUMN\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Gabungan
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### Kesalahan
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Migrasi beban kerja Random Cut Forest
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Jika Anda ingin memindahkan beban kerja yang menggunakan Random Cut Forest dari Kinesis Analytics untuk SQL ke Managed Service untuk Apache Flink, posting [blog AWS ini](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/) menunjukkan cara menggunakan Managed Service untuk Apache Flink untuk menjalankan algoritma RCF online untuk deteksi anomali.

## Mengganti Kinesis Data Firehose sebagai sumber dengan Kinesis Data Streams
<a name="examples-firehose"></a>

Lihat [Converting-kdasql- KDAStudio](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)/untuk tutorial lengkap.

Dalam latihan berikut, Anda akan mengubah aliran data Anda untuk menggunakan Amazon Managed Service untuk Apache Flink Studio. Ini juga berarti beralih dari Amazon Kinesis Data Firehose ke Amazon Kinesis Data Streams.

Pertama, kami membagikan arsitektur KDA-SQL yang khas, sebelum menunjukkan bagaimana Anda dapat mengganti ini menggunakan Amazon Managed Service untuk Apache Flink Studio dan Amazon Kinesis Data Streams. Atau Anda dapat meluncurkan CloudFormation template di [sini](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml):

### Analisis Data Amazon Kinesis - SQL dan Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

Berikut adalah alur arsitektur SQL Amazon Kinesis Data Analytics: 

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/id_id/kinesisanalytics/latest/dev/images/legacy-sql.png)


Kami pertama-tama memeriksa penyiapan Amazon Kinesis Data Analytics-SQL dan Amazon Kinesis Data Firehose yang lama. Kasus penggunaan adalah pasar perdagangan di mana data perdagangan, termasuk ticker saham dan harga, mengalir dari sumber eksternal ke sistem Amazon Kinesis. Amazon Kinesis Data Analytics untuk SQL menggunakan aliran input untuk mengeksekusi kueri Windowed seperti jendela Tumbling untuk menentukan volume perdagangan dan`max`,, `average` dan `min` harga perdagangan selama satu menit untuk setiap ticker saham.  

Amazon Kinesis Data Analytics-SQL diatur untuk menelan data dari Amazon Kinesis Data Firehose API. Setelah diproses, Amazon Kinesis Data Analytics-SQL mengirimkan data yang diproses ke Amazon Kinesis Data Firehose lainnya, yang kemudian menyimpan output dalam bucket Amazon S3. 

Dalam hal ini, Anda menggunakan Amazon Kinesis Data Generator. Amazon Kinesis Data Generator memungkinkan Anda mengirim data pengujian ke Amazon Kinesis Data Streams atau aliran pengiriman Amazon Kinesis Data Firehose. Untuk memulai, ikuti instruksi [di sini](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Gunakan CloudFormation templat di [sini](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) sebagai pengganti yang disediakan dalam [instruksi:](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

Setelah Anda menjalankan CloudFormation template, bagian output akan memberikan url Amazon Kinesis Data Generator. [Masuk ke portal menggunakan id pengguna Cognito dan kata sandi yang Anda atur di sini.](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) Pilih Wilayah dan nama aliran target. Untuk status saat ini, pilih aliran Pengiriman Amazon Kinesis Data Firehose. Untuk status baru, pilih nama Amazon Kinesis Data Firehose Streams. Anda dapat membuat beberapa template, tergantung pada kebutuhan Anda, dan menguji template menggunakan tombol **Uji template** sebelum mengirimnya ke aliran target.

Berikut ini adalah contoh payload menggunakan Amazon Kinesis Data Generator. Generator data menargetkan input Amazon Kinesis Firehose Streams untuk mengalirkan data secara terus menerus. Klien Amazon Kinesis SDK dapat mengirim data dari produsen lain juga. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

JSON berikut digunakan untuk menghasilkan serangkaian waktu dan tanggal perdagangan acak, ticker saham, dan harga saham:

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

Setelah Anda memilih **Kirim data**, generator akan mulai mengirim data tiruan.

Sistem eksternal mengalirkan data ke Amazon Kinesis Data Firehose. Menggunakan Amazon Kinesis Data Analytics untuk Aplikasi SQL, Anda dapat menganalisis data streaming menggunakan SQL standar. Layanan ini memungkinkan Anda untuk membuat dan menjalankan kode SQL terhadap sumber streaming untuk melakukan analitik deret waktu, memberi umpan dasbor waktu nyata, dan membuat metrik waktu nyata. Amazon Kinesis Data Analytics untuk Aplikasi SQL dapat membuat aliran tujuan dari kueri SQL pada aliran input dan mengirim aliran tujuan ke Amazon Kinesis Data Firehose lainnya. Tujuan Amazon Kinesis Data Firehose dapat mengirim data analitik ke Amazon S3 sebagai status akhir.

Kode warisan Amazon Kinesis Data Analytics-SQL didasarkan pada perpanjangan Standar SQL. 

Anda menggunakan kueri berikut di Amazon Kinesis Data Analytics-SQL. Anda pertama kali membuat aliran tujuan untuk output kueri. Kemudian, Anda akan menggunakan`PUMP`, yang merupakan Objek Repositori Amazon Kinesis Data Analytics (perpanjangan dari Standar SQL) yang menyediakan fungsionalitas kueri yang terus `INSERT INTO stream SELECT ... FROM` berjalan, sehingga memungkinkan hasil kueri untuk terus dimasukkan ke dalam aliran bernama. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

SQL sebelumnya menggunakan dua jendela waktu — `tradeTimestamp` yang berasal dari payload aliran masuk dan `ROWTIME.tradeTimestamp` juga disebut atau. `Event Time` `client-side time` Waktu ini sering kali berguna ketika digunakan dalam analitik karena merupakan waktu ketika peristiwa terjadi. Namun, banyak sumber peristiwa, seperti ponsel dan klien web, tidak memiliki jam yang dapat diandalkan, yang dapat menyebabkan waktu yang tidak akurat. Selain itu, masalah konektivitas dapat menyebabkan catatan yang muncul di aliran tidak dalam urutan yang sama dengan peristiwa yang terjadi. 

Aliran dalam aplikasi juga menyertakan kolom khusus yang disebut. `ROWTIME` Kolom ini menyimpan stempel waktu ketika Amazon Kinesis Data Analytics memasukkan baris di aliran dalam aplikasi pertama. `ROWTIME` mencerminkan stempel waktu tempat Amazon Kinesis Data Analytics memasukkan catatan ke aliran dalam aplikasi pertama setelah membaca dari sumber streaming. Nilai `ROWTIME` ini selanjutnya dipertahankan di seluruh aplikasi Anda. 

SQL menentukan jumlah ticker sebagai`volume`,, `min``max`, dan `average` harga selama interval 60 detik. 

Menggunakan setiap waktu ini dalam kueri jendela yang berbasis waktu memiliki kelebihan dan kekurangan. Pilih satu atau lebih dari waktu-waktu ini, dan strategi untuk menangani kerugian yang relevan berdasarkan skenario kasus penggunaan Anda. 

Strategi dua jendela menggunakan dua waktu berbasis, keduanya `ROWTIME` dan salah satu waktu lainnya seperti waktu acara.
+ Gunakan `ROWTIME` sebagai jendela pertama, yang mengontrol seberapa sering kueri memancarkan hasil, seperti yang ditunjukkan dalam contoh berikut. Ini tidak digunakan sebagai waktu logis. 
+ Gunakan salah satu waktu lain yang merupakan waktu logis yang ingin Anda kaitkan dengan analitik Anda. Waktu ini mewakili kapan peristiwa terjadi. Pada contoh berikut, tujuan analitik adalah mengelompokkan catatan dan dan kembali menghitung dengan ticker 

### Layanan Dikelola Amazon untuk Apache Flink Studio 
<a name="examples-studio"></a>

Dalam arsitektur yang diperbarui, Anda mengganti Amazon Kinesis Data Firehose dengan Amazon Kinesis Data Streams. Amazon Kinesis Data Analytics untuk Aplikasi SQL digantikan oleh Amazon Managed Service untuk Apache Flink Studio. Kode Apache Flink dijalankan secara interaktif dalam Notebook Apache Zeppelin. Amazon Managed Service untuk Apache Flink Studio mengirimkan data perdagangan agregat ke bucket Amazon S3 untuk penyimpanan. Langkah-langkahnya ditunjukkan sebagai berikut:

Berikut adalah Amazon Managed Service untuk aliran arsitektur Apache Flink Studio:

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/id_id/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Buat Aliran Data Kinesis
<a name="examples-studio-create-data-stream"></a>

**Untuk membuat aliran data menggunakan konsol**

1. [Masuk ke Konsol Manajemen AWS dan buka konsol Kinesis di /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Di bilah navigasi, perluas pemilih Wilayah dan pilih Wilayah.

1. Pilih **Create data stream** (Buat aliran data).

1. Pada halaman **Create Kinesis stream**, masukkan nama untuk aliran data Anda dan terima mode kapasitas **On-Demand** default. 

   Dengan mode **On-Demand**, Anda kemudian dapat memilih **Create Kinesis** stream untuk membuat aliran data Anda. 

   Pada halaman **Kinesis streams**, **Status** streaming Anda adalah **Membuat** saat aliran sedang dibuat. Saat aliran siap digunakan, **Status** berubah menjadi **Aktif**.

1. Pilih nama streaming Anda. Halaman **Detail Stream** menampilkan ringkasan konfigurasi aliran Anda, bersama dengan informasi pemantauan.

1. **Di Amazon Kinesis Data Generator, ubah Stream/delivery aliran ke Amazon Kinesis Data Streams baru: TRADE\$1SOURCE\$1STREAM.**

   JSON dan Payload akan sama seperti yang Anda gunakan untuk Amazon Kinesis Data Analytics-SQL. Gunakan Amazon Kinesis Data Generator untuk menghasilkan beberapa contoh data muatan perdagangan dan menargetkan Aliran Data **TRADE\$1SOURCE\$1STREAM** untuk latihan ini: 

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1. Saat Konsol Manajemen AWS pergi ke Managed Service for Apache Flink dan kemudian pilih **Create** Application.

1. Di panel navigasi kiri, pilih **buku catatan Studio lalu pilih Buat buku catatan** **studio**.

1. Masukkan nama untuk notebook studio.

1. Di bawah **database AWS Glue**, sediakan AWS Glue database yang ada yang akan menentukan metadata untuk sumber dan tujuan Anda. Jika Anda tidak memiliki AWS Glue database, pilih **Buat** dan lakukan hal berikut:

   1. Di konsol AWS Glue, pilih **Databases** di bawah **katalog Data** dari menu sebelah kiri.

   1. Pilih **Buat database**

   1. Di halaman **Buat database**, masukkan nama untuk database. Di bagian **Lokasi - opsional**, pilih **Jelajahi Amazon S3** dan pilih bucket Amazon S3. Jika Anda belum memiliki bucket Amazon S3 yang sudah disiapkan, Anda dapat melewati langkah ini dan kembali lagi nanti.

   1. (Opsional). Masukkan deskripsi untuk database.

   1. Pilih **Buat basis data**.

1. Pilih **Buat buku catatan**

1. Setelah buku catatan Anda dibuat, pilih **Jalankan**.

1. Setelah notebook berhasil dimulai, luncurkan notebook Zeppelin dengan memilih **Buka di Apache Zeppelin**.

1. Pada halaman Notebook Zeppelin, pilih **Buat catatan baru** dan beri nama. *MarketDataFeed*

Kode SQL Flink dijelaskan berikut, tetapi pertama-tama [inilah tampilan layar notebook Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). Setiap jendela di dalam notebook adalah blok kode terpisah, dan mereka dapat dijalankan satu per satu.

##### Layanan Dikelola Amazon untuk Kode Apache Flink Studio
<a name="examples-studio-code"></a>

Amazon Managed Service untuk Apache Flink Studio menggunakan Notebook Zeppelin untuk menjalankan kode. Pemetaan dilakukan untuk contoh ini ke kode ssql berdasarkan Apache Flink 1.13. Kode di Notebook Zeppelin ditampilkan berikut, satu blok pada satu waktu.  

Sebelum menjalankan kode apa pun di Notebook Zeppelin Anda, perintah konfigurasi Flink harus dijalankan. Jika Anda perlu mengubah pengaturan konfigurasi apa pun setelah menjalankan kode (ssql, Python, atau Scala), Anda harus berhenti dan memulai ulang notebook Anda. Dalam contoh ini, Anda harus mengatur checkpointing. Checkpointing diperlukan agar Anda dapat mengalirkan data ke file di Amazon S3. Ini memungkinkan streaming data ke Amazon S3 untuk dibuang ke file. Pernyataan berikut menetapkan interval ke 5000 milidetik.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf`menunjukkan bahwa blok ini adalah pernyataan konfigurasi. Untuk informasi selengkapnya tentang konfigurasi Flink termasuk checkpointing, lihat [Apache](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/) Flink Checkpointing.  

Tabel input untuk sumber Amazon Kinesis Data Streams dibuat dengan kode ssql Flink berikut. Perhatikan bahwa `TRADE_TIME` bidang menyimpan yang date/time dibuat oleh generator data.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

Anda dapat melihat aliran input dengan pernyataan ini:

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Sebelum mengirim data agregat ke Amazon S3, Anda dapat melihatnya langsung di Amazon Managed Service untuk Apache Flink Studio dengan kueri pilih jendela tumbling. Ini mengumpulkan data perdagangan dalam jendela waktu satu menit. Perhatikan bahwa pernyataan %flink.ssql harus memiliki penunjukan (type=update):

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Anda kemudian dapat membuat tabel untuk tujuan di Amazon S3. Anda harus menggunakan watermark. Tanda air adalah metrik kemajuan yang menunjukkan titik waktu ketika Anda yakin bahwa tidak ada lagi peristiwa yang tertunda yang akan tiba. Alasan tanda air adalah untuk memperhitungkan kedatangan yang terlambat. Interval `‘5’ Second` ini memungkinkan perdagangan untuk memasuki Amazon Kinesis Data Stream terlambat 5 detik dan masih disertakan jika mereka memiliki stempel waktu di dalam jendela. Untuk informasi selengkapnya, lihat [Membuat Tanda Air](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/).    

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

Pernyataan ini menyisipkan data ke dalam. `TRADE_DESTINATION_S3` `TUMPLE_ROWTIME`adalah stempel waktu dari batas atas inklusif dari jendela yang jatuh.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Biarkan pernyataan Anda berjalan selama 10 hingga 20 menit untuk mengumpulkan beberapa data di Amazon S3. Kemudian batalkan pernyataan Anda. 

Ini menutup file di Amazon S3 sehingga dapat dilihat. 

Berikut adalah apa isinya terlihat seperti: 

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/id_id/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


Anda dapat menggunakan [CloudFormation template](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) untuk membuat infrastruktur. 

CloudFormation akan membuat sumber daya berikut di AWS akun Anda:
+  Amazon Kinesis Data Streams
+  Layanan Dikelola Amazon untuk Apache Flink Studio
+  AWS Glue basis data
+  Buket Amazon S3
+  Peran dan kebijakan IAM untuk Amazon Managed Service untuk Apache Flink Studio untuk mengakses sumber daya yang sesuai

Impor notebook dan ubah nama bucket Amazon S3 dengan bucket Amazon S3 baru yang dibuat oleh. CloudFormation

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/id_id/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### Lihat lebih banyak
<a name="more"></a>

Berikut adalah beberapa sumber daya tambahan yang dapat Anda gunakan untuk mempelajari lebih lanjut tentang penggunaan Layanan Terkelola untuk Apache Flink Studio: 
+ [Layanan Terkelola untuk Panduan Pengembang Notebook Apache Flink Studio](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Apache Flink 1.13 Dokumentasi](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Layanan Terkelola untuk Workshop Apache Flink Studio](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Jendela Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Panduan Pengembang Amazon Kinesis Data Analytics — Menulis dari Stream Kinesis Data Analytics ke Bucket S3](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# Memanfaatkan fungsi yang ditentukan pengguna () UDFs
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

Tujuan dari pola ini adalah untuk menunjukkan bagaimana memanfaatkan notebook Kinesis Data Analytics-Studio Zeppelin untuk memproses data UDFs dalam aliran Kinesis. Layanan Terkelola untuk Apache Flink Studio menggunakan Apache Flink untuk menyediakan kemampuan analitis tingkat lanjut, termasuk tepat sekali memproses semantik, jendela waktu acara, ekstensibilitas menggunakan fungsi yang ditentukan pengguna dan integrasi pelanggan, dukungan bahasa imperatif, status aplikasi yang tahan lama, penskalaan horizontal, dukungan untuk beberapa sumber data, integrasi yang dapat diperluas, dan banyak lagi. Ini sangat penting untuk memastikan akurasi, kelengkapan, konsistensi, dan keandalan pemrosesan aliran data dan tidak tersedia dengan Amazon Kinesis Data Analytics untuk SQL.

Dalam aplikasi sampel ini, kami akan mendemonstrasikan cara memanfaatkan UDFs notebook KDA-Studio Zeppelin untuk memproses data dalam aliran Kinesis. Notebook Studio untuk Kinesis Data Analytics memungkinkan Anda secara interaktif mengkueri aliran data secara langsung, dan dengan mudah membangun serta menjalankan aplikasi pemrosesan aliran menggunakan SQL, Python, dan Scala standar. Dengan beberapa klik Konsol Manajemen AWS, Anda dapat meluncurkan notebook tanpa server untuk menanyakan aliran data dan mendapatkan hasil dalam hitungan detik. Untuk informasi selengkapnya, lihat [Menggunakan notebook Studio dengan Kinesis Data Analytics untuk Apache](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) Flink. 

Fungsi Lambda yang digunakan untuk pre/post memproses data dalam aplikasi KDA-SQL:

![\[alt text not found\]](http://docs.aws.amazon.com/id_id/kinesisanalytics/latest/dev/images/sql-udf-1.png)


Fungsi yang ditentukan pengguna untuk pre/post memproses data menggunakan notebook KDA-Studio Zeppelin

![\[alt text not found\]](http://docs.aws.amazon.com/id_id/kinesisanalytics/latest/dev/images/sql-udf.png)


## Fungsi yang ditentukan pengguna () UDFs
<a name="examples-migrating-to-kda-studio-udfs"></a>

Untuk menggunakan kembali logika bisnis umum menjadi operator, akan berguna untuk mereferensikan fungsi yang ditentukan pengguna untuk mengubah aliran data Anda. Ini dapat dilakukan baik dalam Managed Service for Apache Flink Studio notebook, atau sebagai file jar aplikasi yang direferensikan secara eksternal. Memanfaatkan fungsi yang ditentukan pengguna dapat menyederhanakan transformasi atau pengayaan data yang mungkin Anda lakukan melalui streaming data.

 Di notebook Anda, Anda akan mereferensikan jar aplikasi Java sederhana yang memiliki fungsi untuk menganonimkan nomor telepon pribadi. Anda juga dapat menulis Python atau Scala UDFs untuk digunakan dalam buku catatan. Kami memilih jar aplikasi Java untuk menyoroti fungsionalitas mengimpor jar aplikasi ke notebook Pyflink.

## Pengaturan lingkungan
<a name="examples-migrating-to-kda-studio-setup"></a>

Untuk mengikuti panduan ini dan berinteraksi dengan data streaming Anda, Anda akan menggunakan AWS CloudFormation skrip untuk meluncurkan sumber daya berikut:
+ Sumber dan target Kinesis Data Streams
+ Database Glue
+ IAM Role
+ Layanan Terkelola untuk Aplikasi Apache Flink Studio
+ Fungsi Lambda untuk memulai Managed Service untuk Apache Flink Studio Application
+ Peran Lambda untuk menjalankan fungsi Lambda sebelumnya
+ Sumber daya khusus untuk menjalankan fungsi Lambda

Unduh CloudFormation template [di sini](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml).

**Buat CloudFormation tumpukan**

1. Pergi ke Konsol Manajemen AWS dan pilih **CloudFormation**di bawah daftar layanan.

1. Pada **CloudFormation**halaman, pilih **Stacks** dan kemudian pilih **Create Stack dengan sumber daya baru (standar)**. 

1. Pada halaman **Buat tumpukan**, pilih **Unggah File Template**, lalu pilih `kda-flink-udf.yml` yang Anda unduh sebelumnya. Unggah file dan kemudian pilih **Berikutnya**. 

1. Beri nama template, seperti `kinesis-UDF` agar mudah diingat, dan perbarui Parameter input seperti input-stream jika Anda menginginkan nama yang berbeda. Pilih **Berikutnya**.

1. Pada halaman **Configure stack options**, tambahkan **Tag** jika Anda mau, lalu pilih **Next**.

1. Pada halaman **Tinjauan**, centang kotak yang memungkinkan pembuatan sumber daya IAM dan kemudian pilih **Kirim**.

 CloudFormation Tumpukan mungkin membutuhkan waktu 10 hingga 15 menit untuk diluncurkan tergantung pada Wilayah yang Anda luncurkan. Setelah Anda melihat `CREATE_COMPLETE` status untuk seluruh tumpukan, Anda siap untuk melanjutkan.

## Bekerja dengan Layanan Terkelola untuk notebook Apache Flink Studio
<a name="examples-migrating-to-kda-studio-notebook"></a>

Notebook studio untuk Kinesis Data Analytics memungkinkan Anda melakukan kueri aliran data secara interaktif secara real time, dan dengan mudah membangun dan menjalankan aplikasi pemrosesan aliran menggunakan SQL, Python, dan Scala standar. Dengan beberapa klik Konsol Manajemen AWS, Anda dapat meluncurkan notebook tanpa server untuk menanyakan aliran data dan mendapatkan hasil dalam hitungan detik.

Notebook adalah lingkungan pengembangan berbasis web. Dengan notebook, Anda mendapatkan pengalaman pengembangan interaktif sederhana yang dikombinasikan dengan kemampuan pemrosesan aliran data canggih yang disediakan oleh Apache Flink. Notebook studio menggunakan notebook yang didukung oleh Apache Zeppelin, dan menggunakan Apache Flink sebagai mesin pemrosesan aliran. Notebook studio menggabungkan teknologi ini dengan mulus untuk membuat analitik lanjutan pada aliran data dapat diakses oleh pengembang dari semua keahlian. 

 Apache Zeppelin memberi notebook Studio Anda dengan rangkaian alat analitik lengkap, termasuk yang berikut:
+ Visualisasi Data
+ Mengekspor data ke file
+ Mengontrol format output untuk analisis yang lebih mudah 

**Menggunakan notebook**

1. Pergi ke Konsol Manajemen AWS dan pilih **Amazon Kinesis** di bawah daftar layanan.

1. Di halaman navigasi sebelah kiri, pilih **aplikasi Analytics** lalu pilih buku catatan **Studio**.

1. Verifikasi bahwa **KinesisDataAnalyticsStudio**notebook sedang berjalan.

1. Pilih notebook dan kemudian pilih **Buka di Apache Zeppelin**. 

1. Unduh file [Data Producer Zeppelin Notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln) yang akan Anda gunakan untuk membaca dan memuat data ke dalam Kinesis Stream. 

1.  Impor `Data Producer` Notebook Zeppelin. Pastikan untuk memodifikasi input `STREAM_NAME` dan `REGION` dalam kode notebook. Nama aliran input dapat ditemukan di [output CloudFormation tumpukan](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). 

1. Jalankan notebook **Data Producer** dengan memilih tombol **Jalankan paragraf ini** untuk menyisipkan data sampel ke dalam input Kinesis Data Stream.

1. Saat data sampel dimuat, unduh [MaskPhoneNumber-Interactive notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln), yang akan membaca data input, menganonimkan nomor telepon dari aliran input dan menyimpan data anonim ke dalam aliran output.

1. Impor notebook `MaskPhoneNumber-interactive` Zeppelin. 

1. Jalankan setiap paragraf di buku catatan.

   1. Dalam paragraf 1, Anda mengimpor Fungsi yang Ditetapkan Pengguna untuk menganonimkan nomor telepon.

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. Pada paragraf berikutnya, Anda membuat tabel dalam memori untuk membaca data aliran input. Pastikan nama stream dan AWS Region sudah benar. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. Periksa apakah data dimuat ke dalam tabel dalam memori. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. Memanggil fungsi yang ditentukan pengguna untuk menganonimkan nomor telepon. 

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Sekarang setelah nomor telepon ditutup, buat tampilan dengan nomor bertopeng. 

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Verifikasi datanya. 

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. Buat tabel dalam memori untuk Output Kinesis Stream. Pastikan nama stream dan AWS Region sudah benar. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. Masukkan catatan yang diperbarui di Aliran Kinesis target. 

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. Lihat dan verifikasi data dari Aliran Kinesis target. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## Mempromosikan notebook sebagai aplikasi
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

Sekarang setelah Anda menguji kode notebook Anda secara interaktif, Anda akan menyebarkan kode sebagai aplikasi streaming dengan status tahan lama. Anda harus terlebih dahulu memodifikasi konfigurasi Aplikasi untuk menentukan lokasi kode Anda di Amazon S3. 

1. Pada Konsol Manajemen AWS, pilih buku catatan Anda dan di **Deploy sebagai konfigurasi aplikasi - opsional**, pilih **Edit**. 

1. [Di bawah **Tujuan untuk kode di Amazon S3**, pilih bucket Amazon S3 yang dibuat oleh skrip.CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml) Prosesnya mungkin memakan waktu beberapa menit.

1. Anda tidak akan dapat mempromosikan catatan apa adanya. Jika Anda mencoba, Anda akan mengalami kesalahan karena `Select` pernyataan tidak didukung. Untuk mencegah masalah ini, unduh Notebook [MaskPhoneNumber-Streaming Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln).

1. Impor `MaskPhoneNumber-streaming` Notebook Zeppelin.

1. Buka catatan dan pilih **Tindakan untuk KinesisDataAnalyticsStudio**.

1. Pilih **Build MaskPhoneNumber -Streaming dan ekspor ke S3**. Pastikan untuk mengganti nama **Nama Aplikasi** dan tidak menyertakan karakter khusus.

1. Pilih **Bangun dan Ekspor**. Ini akan memakan waktu beberapa menit untuk mengatur Aplikasi Streaming.

1.  Setelah build selesai, pilih **Deploy using AWS console**. 

1.  Pada halaman berikutnya, tinjau pengaturan dan pastikan untuk memilih peran IAM yang benar. Selanjutnya, pilih **Buat aplikasi streaming**. 

1. Setelah beberapa menit, Anda akan melihat pesan bahwa aplikasi streaming berhasil dibuat.

 Untuk informasi selengkapnya tentang penerapan aplikasi dengan status dan batas tahan lama, lihat [Menerapkan sebagai aplikasi dengan status tahan lama](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html). 

## Pembersihan
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

Secara opsional, Anda sekarang dapat [menghapus tumpukan. CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html) Ini akan menghapus semua layanan yang Anda atur sebelumnya.