

Após uma análise cuidadosa, decidimos descontinuar o Amazon Kinesis Data Analytics para aplicativos SQL:

1. A partir de **1º de setembro de 2025,** não forneceremos nenhuma correção de bug para aplicativos do Amazon Kinesis Data Analytics para SQL porque teremos suporte limitado para ele, devido à próxima descontinuação.

2. A partir **de 15 de outubro de 2025,** você não poderá criar novos aplicativos Kinesis Data Analytics para SQL.

3. Excluiremos as aplicações a partir de **27 de janeiro de 2026**. Você não poderá mais iniciar nem operar as aplicações do Amazon Kinesis Data Analytics para SQL. A partir dessa data, não haverá mais suporte ao Amazon Kinesis Data Analytics para SQL. Para obter mais informações, consulte [Descontinuação de aplicações do Amazon Kinesis Data Analytics para SQL](discontinuation.md).

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Exemplos de migração para o Managed Service for Apache Flink Studio
<a name="migrating-to-kda-studio-overview"></a>

Após uma análise cuidadosa, tomamos a decisão de descontinuar as aplicações do Amazon Kinesis Data Analytics para SQL. Para ajudar você a se planejar e fazer a migração das aplicações do Amazon Kinesis Data Analytics para SQL, descontinuaremos a oferta gradualmente ao longo de 15 meses. Essas são datas importantes a serem observadas: **1º de setembro de 2025,** **15 de outubro de 2025** e **27 de janeiro de 2026**.

1. A partir de **1º de setembro de 2025,** não forneceremos nenhuma correção de bug para aplicativos do Amazon Kinesis Data Analytics para SQL porque teremos suporte limitado para ele, devido à próxima descontinuação.

1. A partir de **15 de outubro de 2025**, você não poderá mais criar novas aplicações do Amazon Kinesis Data Analytics para SQL. 

1. Excluiremos as aplicações a partir de **27 de janeiro de 2026**. Você não poderá mais iniciar nem operar as aplicações do Amazon Kinesis Data Analytics para SQL. A partir dessa data, não haverá mais suporte às aplicações do Amazon Kinesis Data Analytics para SQL. Para saber mais, consulte [Descontinuação de aplicações do Amazon Kinesis Data Analytics para SQL](discontinuation.md).

Recomendamos que você use o [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html). Ele combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicações de processamento de fluxos em questão de minutos.

Esta seção fornece exemplos de código e arquitetura para ajudar você a mover workloads de aplicações do Amazon Kinesis Data Analytics para SQL para o Managed Service for Apache Flink.

Para obter mais informações, consulte também esta [postagem no blog da AWS : Migrar de aplicações do Amazon Kinesis Data Analytics para SQL para o Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/).

## Replicar as consultas do Kinesis Data Analytics para SQL no Managed Service para Apache Flink Studio
<a name="examples-migrating-to-kda-studio"></a>

Para migrar suas workloads para o Managed Service for Apache Flink Studio ou Managed Service for Apache Flink, esta seção fornece traduções de consultas que você pode usar para casos de uso comuns. 

Antes de explorar esses exemplos, recomendamos que você primeiro leia [Usar um caderno do Studio com o Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

### Recriar as consultas do Kinesis Data Analytics para SQL no Managed Service para Apache Flink Studio
<a name="examples-recreating-queries"></a>

A tabela a seguir fornece traduções de consultas comuns de aplicação do Kinesis Data Analytics baseada em SQL para o Managed Service for Apache Flink Studio. 

#### Aplicativo em várias etapas
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### Transformando valores DateTime
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Alertas simples
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### Alertas controlados
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### Agregação dos resultados parciais de uma consulta
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### Transformação de valores de string
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Substituição de uma substring usando regex
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Análise de log Regex
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### Transformando valores DateTime
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Janelas e agregação
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### Janela em cascata usando ROWTIME
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### Recuperar os valores que ocorrem com mais frequência (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### Itens Top-K aproximados
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### Análise de logs da web (função W3C\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### Divisão de strings de caracteres em vários campos (função VARIABLE\$1COLUMN\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Junções
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### Erros
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Migração de workloads do Random Cut Forest
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Se você deseja mover workloads que usam Random Cut Forest do Kinesis Analytics para SQL para o Managed Service for Apache Flink, esta [AWS postagem do blog](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/) demonstra como usar o Managed Service for Apache Flink para executar um algoritmo RCF on-line para detecção de anomalias.

## Substituição do Kinesis Data Firehose como fonte pelo Kinesis Data Streams
<a name="examples-firehose"></a>

Veja [Converting-KDASQL- KDAStudio](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)/para um tutorial completo.

No exercício a seguir, você alterará seu fluxo de dados para usar o Amazon Managed Service for Apache Flink. Isso também significa mudar do Amazon Kinesis Data Firehose para o Amazon Kinesis Data Streams.

Primeiro, compartilhamos uma arquitetura típica do KDA-SQL, antes de mostrar como você pode substituí-la usando o Amazon Managed Service for Apache Flink e o Amazon Kinesis Data Streams. Como alternativa, você pode iniciar o CloudFormation modelo [aqui](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml):

### Amazon Kinesis Data Analytics-SQL e Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

Aqui está o fluxo arquitetônico SQL do Amazon Kinesis Data Analytics: 

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/legacy-sql.png)


Primeiro, examinamos a configuração de um Amazon Kinesis Data Analytics-SQL e do Amazon Kinesis Data Firehose legados. O caso de uso é um mercado de negociação em que os dados de negociação, incluindo cotação e preço das ações, são transmitidos de fontes externas para os sistemas Amazon Kinesis. O Amazon Kinesis Data Analytics para SQL usa o fluxo de entrada para executar consultas em janelas, como a janela em cascata, para determinar o volume e os preços `min`, `max` e `average` de negociação em uma janela de um minuto para cada ticker de ação.  

O Amazon Kinesis Data Analytics-SQL está configurado para ingerir dados da API do Amazon Kinesis Data Firehose. Após o processamento, o Amazon Kinesis Data Analytics-SQL envia os dados processados para outro Amazon Kinesis Data Firehose, que então salva a saída em um bucket do Amazon S3. 

Nesse caso, você usa o Amazon Kinesis Data Generator. O Amazon Kinesis Data Generator permite que você envie dados de teste para seus streams de entrega do Amazon Kinesis Data Streams ou do Amazon Kinesis Data Firehose. Para começar, siga as instruções [aqui](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Use o CloudFormation modelo [aqui](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) no lugar do fornecido nas [instruções:](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

Depois de executar o CloudFormation modelo, a seção de saída fornecerá a URL do Amazon Kinesis Data Generator. Faça login no portal usando a ID de usuário e a senha do Cognito que você configurou [aqui](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Selecione a região e o nome do stream de destino. Para o estado atual, escolha os streams de entrega do Amazon Kinesis Data Firehose. Para o novo estado, escolha nome dos streams do Amazon Kinesis Data Firehose. Você pode criar vários modelos, dependendo dos seus requisitos, e testar o modelo usando o botão **Testar modelo** antes de enviá-lo para o stream de destino.

Veja a seguir um exemplo de carga útil usando o Amazon Kinesis Data Generator. O gerador de dados tem como alvo a entrada dos streams do Amazon Kinesis Firehose para transmitir os dados continuamente. O cliente SDK do Amazon Kinesis também pode enviar dados de outros produtores. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

O JSON a seguir é usado para gerar uma série aleatória de data e hora da negociação, código de negociação da ação e preço da ação:

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

Depois de escolher **Enviar dados**, o gerador começará a enviar dados simulados.

Os sistemas externos transmitem os dados para o Amazon Kinesis Data Firehose. Usando o Amazon Kinesis Data Analytics para aplicativos SQL, você pode analisar dados de transmissão usando o SQL padrão. O serviço permite que você crie e execute código SQL em fontes de streaming para realizar análises de séries temporais, alimentar painéis em tempo real e criar métricas em tempo real. O Amazon Kinesis Data Analytics para aplicativos SQL pode criar um stream de destino a partir de consultas SQL no stream de entrada e enviar o stream de destino para outro Amazon Kinesis Data Firehose. O Amazon Kinesis Data Firehose de destino pode enviar os dados analíticos para o Amazon S3 como estado final.

O código legado do Amazon Kinesis Data Analytics-SQL é baseado em uma extensão do padrão SQL. 

Você usa a consulta a seguir no Amazon Kinesis Data Analytics-SQL. Primeiro, você cria um stream de destino para a saída da consulta. Então, você usa `PUMP`, que é um objeto de repositório do Amazon Kinesis Data Analytics (uma extensão do padrão SQL) que fornece uma função de consulta de execução contínua `INSERT INTO stream SELECT ... FROM`, permitindo assim que os resultados de uma consulta sejam inseridos continuamente em um fluxo nomeado. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

O SQL acima usa duas janelas de tempo: `tradeTimestamp`, que vem da carga útil do fluxo de entrada, e `ROWTIME.tradeTimestamp`, que também é chamada de `Event Time` ou `client-side time`. Muitas vezes, é desejável usar esse horário em análises, porque é o momento em que um evento ocorreu. No entanto, muitas fontes de eventos, como celulares e clientes da Web, não têm relógios confiáveis, o que pode levar a horários imprecisos. Além disso, problemas de conectividade podem levar a registros que aparecem em um stream não na mesma ordem em que os eventos ocorreram. 

Os streams no aplicativo também incluem uma coluna especial chamada `ROWTIME`. Ela armazena um timestamp quando o Amazon Kinesis Data Analytics insere uma linha no primeiro stream do aplicativo. `ROWTIME` reflete o timestamp no qual o Amazon Kinesis Data Analytics inseriu um registro no primeiro stream no aplicativo após ler a partir da fonte de streaming. Esse valor `ROWTIME` então é mantido em todo o aplicativo. 

O SQL determina a contagem do ticker como preço `volume`, `min`, `max` e `average` em um intervalo de 60 segundos. 

Usar cada um desses horários nas consultas em janelas baseadas em horário tem vantagens e desvantagens. Escolha um ou mais desses horários e uma estratégia para lidar com as relevantes desvantagens de acordo com o caso de uso. 

Uma estratégia de duas janelas que usam dois horários, o `ROWTIME` e um dos outros horários como a hora do evento.
+ Use o `ROWTIME` como a primeira janela, que controla a frequência com que a consulta emite os resultados, como mostrado no exemplo a seguir. Ele não é usado como um horário lógico. 
+ Use um dos outros horários lógicos que deseja associar à sua análise. Esse horário representa quando o evento ocorreu. No exemplo a seguir, o objetivo da análise é agrupar os registros e retornar a contagem pelo marcador. 

### Amazon Managed Service for Apache Flink Studio 
<a name="examples-studio"></a>

Na arquitetura atualizada, você substitui o Amazon Kinesis Data Firehose pelo Amazon Kinesis Data Streams. O Amazon Kinesis Data Analytics para aplicativos SQL foi substituído pelo Amazon Managed Service for Apache Flink Studio. O código do Apache Flink é executado interativamente em um caderno de notas Apache Zeppelin. O Amazon Managed Service for Apache Flink Studio envia os dados comerciais agregados em um bucket do Amazon S3 para armazenamento. As etapas são mostradas a seguir:

Aqui está o fluxo arquitetônico do Amazon Managed Service for Apache Flink Studio:

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Criar um fluxo de dados Kinesis
<a name="examples-studio-create-data-stream"></a>

**Para criar um fluxo de dados usando o console**

1. [Faça login no Console de gerenciamento da AWS e abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Na barra de navegação, expanda o seletor de região e escolha uma região.

1. Selecione **Criar fluxo de dados**.

1. Na página **Criar stream Kinesis**, insira um nome para seu fluxo de dados e aceite o modo de capacidade **sob demanda** padrão. 

   No modo **sob demanda**, pode-se, em seguida, escolher **Criar fluxo do Kinesis** para criar o fluxo de dados. 

   Na página **Fluxos do Kinesis**, o **Status** do fluxo é **Criando** enquanto o fluxo está sendo criado. Quando o fluxo estiver pronto para ser usado, o **Status** mudará para **Ativo**.

1. Escolha o nome do fluxo. A página **Detalhes do fluxo** exibe um resumo da configuração do fluxo com informações de monitoramento.

1. **No Amazon Kinesis Data Generator, altere o stream para o novo Amazon Kinesis Data Stream/delivery Streams: TRADE\$1SOURCE\$1STREAM.**

   O JSON e a carga útil serão os mesmos que você usou para o Amazon Kinesis Data Analytics-SQL. Use o Amazon Kinesis Data Generator para produzir alguns exemplos de dados de carga útil de negociação e direcionar o fluxo de dados **TRADE\$1SOURCE\$1STREAM** para este exercício: 

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1. Em seguida, Console de gerenciamento da AWS vá para Managed Service for Apache Flink e escolha **Create** application.

1. No painel de navegação à esquerda, escolha **Cadernos de nota Studio** e selecione **Criar caderno de notas studio**.

1. Insira um nome para o caderno de notas studio.

1. Em **Banco de dados Glue AWS **, forneça um banco de dados AWS Glue existente que definirá os metadados para suas fontes e destinos. Se você não tiver um AWS Glue banco de dados, escolha **Criar** e faça o seguinte:

   1. No console AWS Glue, escolha **Bancos de** **dados em Catálogo** de dados no menu à esquerda.

   1. Escolha **Criar banco de dados**

   1. Na página **Criar banco de dados**, insira um nome para o banco de dados. Na seção **Localização – opcional**, escolha **Procurar no Amazon S3** e selecione o bucket do Amazon S3. Se ainda não tiver um bucket do Amazon S3 configurado, você pode pular essa etapa e retornar posteriormente.

   1. (Optional). Insira uma descrição para o banco de dados.

   1. Selecione **Criar banco de dados**.

1. Escolha **Criar bloco de anotações**.

1. Depois que seu caderno de notas for criado, escolha **Executar**.

1. Depois que o caderno for iniciado com sucesso, inicialize um caderno do Zeppelin escolhendo **Abrir no Apache Zeppelin**.

1. Na página do Caderno Zeppelin, escolha **Criar nova nota** e dê um nome a ela. *MarketDataFeed*

O código SQL do Flink é explicado a seguir, mas primeiro [essa é a aparência da tela de um caderno de notas Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). Cada janela dentro do caderno de notas é um bloco de código separado e elas podem ser executadas uma de cada vez.

##### Código do Amazon Managed Service for Apache Flink Studio
<a name="examples-studio-code"></a>

O Amazon Managed Service for Apache Flink Studio usa os cadernos de nota Zeppelin para executar o código. O mapeamento é feito neste exemplo para código ssql baseado no Apache Flink 1.13. O código no caderno do Zeppelin é mostrado abaixo, um bloco por vez.  

Antes de executar qualquer código em seu caderno de notas Zeppelin, os comandos de configuração do Flink devem ser executados. Se precisar alterar qualquer configuração após executar o código (ssql, Python ou Scala), você precisará parar e reiniciar o caderno. Neste exemplo, você precisará definir o ponto de verificação. É necessário um ponto de verificação para que você possa transmitir dados em um arquivo no Amazon S3. Isso permite que o fluxo de dados para o Amazon S3 seja transferido para um arquivo. A instrução abaixo define o intervalo para 5.000 milissegundos.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` indica que esse bloco são instruções de configuração. Para obter mais informações sobre a configuração do Flink, incluindo pontos de verificação, consulte [Definição de pontos de verificação do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/).  

A tabela de entrada para o Amazon Kinesis Data Streams de origem é criada com o código ssql do Flink abaixo. Observe que o `TRADE_TIME` campo armazena o date/time criado pelo gerador de dados.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

Você pode visualizar o fluxo de entrada com esta instrução:

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Antes de enviar os dados agregados para o Amazon S3, você pode visualizá-los diretamente no Amazon Managed Service for Apache Flink com uma janela em cascata para selecionar uma consulta. Isso agrega os dados de negociação em uma janela de um minuto. Observe que a instrução %flink.ssql deve ter uma designação (tipo=atualizar):

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Em seguida, você poderá criar uma tabela para o destino no Amazon S3. Você precisa usar uma marca d'água. Uma marca d'água é uma métrica de progresso que indica um momento em que você tem certeza de que não haverá mais eventos atrasados. A marca d'água é para contabilizar chegadas tardias. O intervalo `‘5’ Second` permite que as negociações entrem no Amazon Kinesis Data Stream com 5 segundos de atraso e ainda sejam incluídas se tiverem um registro de data e hora na janela. Para obter mais informações, consulte [Geração de marcas d'água](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/).    

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

Essa instrução insere os dados no `TRADE_DESTINATION_S3`. `TUMPLE_ROWTIME` é o time stamp do limite superior inclusivo da janela em cascata.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Deixe sua instrução ser executada por 10 a 20 minutos para acumular alguns dados no Amazon S3. Então aborte sua instrução. 

Isso fecha o arquivo no Amazon S3 para que fique visível. 

Aqui está a aparência do conteúdo: 

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


Você pode usar o [modelo CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) para criar a infraestrutura. 

CloudFormation criará os seguintes recursos em sua AWS conta:
+  Amazon Kinesis Data Streams
+  Amazon Managed Service for Apache Flink Studio
+  AWS Glue banco de dados
+  Bucket do Amazon S3.
+  Perfis e políticas do IAM para o Amazon Managed Service for Apache Flink Studio para acessar os recursos adequados

Importe o notebook e altere o nome do bucket do Amazon S3 com o novo bucket do Amazon S3 criado por. CloudFormation

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### Veja mais
<a name="more"></a>

Aqui estão alguns recursos adicionais que você pode usar para saber mais sobre o uso do Managed Service for Apache Flink Studio: 
+ [Guia para desenvolvedores de serviços gerenciados para o caderno de notas Apache Flink Studio](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Documentação do Apache Flink 1.13](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Managed Service for Apache Flink Studio Workshop ](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Janelas do Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Guia do desenvolvedor do Amazon Kinesis Data Analytics — Como escrever um stream do Kinesis Data Analytics para um bucket S3](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# Aproveitando funções definidas pelo usuário () UDFs
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

O objetivo do padrão é demonstrar como usar os notebooks Kinesis Data Analytics-Studio UDFs Zeppelin para processar dados no stream do Kinesis. O Managed Service for Apache Flink Studio usa o Apache Flink para fornecer funcionalidades analíticas avançadas, incluindo semântica de processamento exatamente uma vez, janelas de horário de eventos, extensibilidade usando funções definidas pelo usuário e integrações personalizadas, suporte a linguagens imperativas, estado durável da aplicação, escalabilidade horizontal, suporte a várias fontes de dados, integrações extensíveis e muito mais. Eles são essenciais para garantir a precisão, integridade, consistência e confiabilidade do processamento de fluxos de dados e não estão disponíveis com o Amazon Kinesis Data Analytics para SQL.

Neste aplicativo de exemplo, demonstraremos como usar UDFs o notebook KDA-Studio Zeppelin para processar dados no stream do Kinesis. Os cadernos de notas Studio para Kinesis Data Analytics permitem que você consulte interativamente fluxos de dados em tempo real e crie e execute facilmente aplicativos de processamento de streams usando SQL, Python e Scala padrão. Com alguns cliques no Console de gerenciamento da AWS, você pode iniciar um notebook sem servidor para consultar fluxos de dados e obter resultados em segundos. Para obter mais informações, consulte [Usar um caderno de notas Studio com o Kinesis Data Analytics para Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

Funções Lambda usadas para pre/post processamento de dados em aplicativos KDA-SQL:

![\[alt text not found\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/sql-udf-1.png)


Funções definidas pelo usuário para pre/post processamento de dados usando notebooks KDA-Studio Zeppelin

![\[alt text not found\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/sql-udf.png)


## Funções definidas pelo usuário () UDFs
<a name="examples-migrating-to-kda-studio-udfs"></a>

Para reutilizar a lógica comercial comum em um operador, pode ser útil fazer referência a uma função definida pelo usuário para transformar seu fluxo de dados. Isso pode ser feito no caderno de notas Managed Service for Apache Flink Studio ou como um arquivo jar de aplicativo referenciado externamente. A utilização de funções definidas pelo usuário pode simplificar as transformações ou enriquecimentos de dados que você pode realizar em fluxo de dados.

 Em seu caderno de notas, você fará referência a um simples jar de aplicativos Java que tem a funcionalidade de anonimizar números de telefone pessoais. Você também pode escrever em Python ou Scala UDFs para uso no notebook. Escolhemos um jar de aplicativo Java para destacar a funcionalidade de importar um jar de aplicativo em um caderno de notas Pyflink.

## Configuração do ambiente
<a name="examples-migrating-to-kda-studio-setup"></a>

Para seguir este guia e interagir com seus dados de streaming, você usará um script do AWS CloudFormation para inicializar os seguintes recursos:
+ Kinesis Data Streams como origem e destino
+ Banco de dados Glue
+ perfil do IAM
+ Aplicativo do Managed Service for Apache Flink Studio
+ Função do Lambda para iniciar o aplicativo Managed Service for Apache Flink Studio
+ Perfil do Lambda para executar a função do Lambda acima
+ Recurso personalizado para invocar a função do Lambda

Baixe o CloudFormation modelo [aqui](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml).

**Crie a CloudFormation pilha**

1. Vá até Console de gerenciamento da AWS e escolha **CloudFormation**abaixo da lista de serviços.

1. Na **CloudFormation**página, escolha **Pilhas** e, em seguida, escolha **Criar pilha com novos recursos (padrão)**. 

1. Na página **Criar pilha**, escolha **Carregar um arquivo de modelo** e, em seguida, escolha o `kda-flink-udf.yml` que você baixou anteriormente. Faça o upload do arquivo e escolha **Próximo**. 

1. Dê um nome ao modelo, como `kinesis-UDF`, para que seja fácil de lembrar, e atualize os parâmetros de entrada, como fluxo de entrada, se quiser um nome diferente. Escolha **Próximo**.

1. Na página **Configurar opções de pilha**, adicione **Etiquetas** se desejar e escolha **Próximo**.

1. Na página **Revisão**, marque as caixas que permitem a criação de recursos do IAM e escolha **Enviar**.

 A CloudFormation pilha pode levar de 10 a 15 minutos para ser lançada, dependendo da região em que você está lançando. Depois de ver o status `CREATE_COMPLETE` de toda a pilha, você está pronto para continuar.

## Trabalhar com o caderno de notas Managed Service for Apache Flink Studio
<a name="examples-migrating-to-kda-studio-notebook"></a>

Os cadernos de notas Studio para Kinesis Data Analytics permitem que você consulte interativamente fluxos de dados em tempo real e crie e execute facilmente aplicativos de processamento de streams usando SQL, Python e Scala padrão. Com alguns cliques no Console de gerenciamento da AWS, você pode iniciar um notebook sem servidor para consultar fluxos de dados e obter resultados em segundos.

Um caderno de notas é um ambiente de desenvolvimento baseado na web. Com o caderno de notas, você obtém uma experiência simples de desenvolvimento interativo combinada com os recursos avançados de processamento de fluxo de dados fornecidos pelo Apache Flink. Os cadernos do Studio são baseados em Apache Zeppelin e usam o Apache Flink como mecanismo de processamento de fluxos. Os cadernos de notas Studio combinam perfeitamente essas tecnologias para tornar a análise avançada em fluxos de dados acessível a desenvolvedores de todos os conjuntos de habilidades. 

 O Apache Zeppelin fornece aos seus cadernos de notas Studio um conjunto completo de ferramentas de análise, incluindo as seguintes:
+ Visualização de dados
+ Exportar dados para arquivos
+ Controlar o formato da saída para análise mais fácil 

**Uso de caderno de notas**

1. Acesse Console de gerenciamento da AWS e escolha **Amazon Kinesis** na lista de serviços.

1. Na página de navegação à esquerda, escolha **Aplicativos Analytics** e, em seguida, escolha **Cadernos de notas Studio**.

1. Verifique se o **KinesisDataAnalyticsStudio**notebook está funcionando.

1. Escolha o caderno de notas e, em seguida, escolha **Abrir no Apache Zeppelin**. 

1. Faça o download do arquivo do [Caderno de notas produtor de dados Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln), que você usará para ler e carregar dados no Kinesis Stream. 

1.  Importe o caderno de notas Zeppelin `Data Producer`. Certifique-se de modificar a entrada `STREAM_NAME` e `REGION` o código do caderno de notas. O nome do fluxo de entrada pode ser encontrado na [saída da pilha CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). 

1. Execute o caderno de notas **Produtor de dados** escolhendo o botão **Executar este parágrafo** para inserir dados de amostra no Kinesis Data Stream de entrada.

1. Enquanto os dados de amostra são carregados, baixe o [notebook MaskPhoneNumber -Interactive](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln), que lerá os dados de entrada, anonimizará os números de telefone do fluxo de entrada e armazenará dados anônimos no fluxo de saída.

1. Importe o caderno de notas Zeppelin `MaskPhoneNumber-interactive`. 

1. Execute cada parágrafo no caderno de notas.

   1. No parágrafo 1, importe uma Função Definida pelo Usuário para anonimizar os números de telefone.

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. No próximo parágrafo, você cria uma tabela na memória para ler os dados do fluxo de entrada. Verifique se o nome do stream e a AWS região estão corretos. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. Verifique se os dados estão carregados na tabela na memória. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. Invoque a função definida pelo usuário para anonimizar o número de telefone. 

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Agora que os números de telefone estão mascarados, crie uma visualização com um número mascarado. 

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Verifique os dados. 

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. Crie uma tabela na memória para a saída do Kinesis Stream. Verifique se o nome do stream e a AWS região estão corretos. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. Insira registros atualizados no Kinesis Stream de destino. 

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. Visualize e verifique os dados do Kinesis Stream de destino. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## Promover um caderno de notas como aplicativo
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

Agora que você testou o código do seu caderno de notas de forma interativa, você implantará o código como um aplicativo de streaming com estado durável. Você precisará primeiro modificar a configuração do aplicativo para especificar um local para seu código no Amazon S3. 

1. No Console de gerenciamento da AWS, escolha seu notebook e, em **Implantar como configuração do aplicativo - opcional**, escolha **Editar**. 

1. Em **Destino para código no Amazon S3**, escolha o bucket do Amazon S3 que foi criado pelos [scripts CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). O processo pode levar alguns minutos.

1. Não será possível promover a nota do jeito que está. Se você tentar, receberá um erro como as declarações `Select` não são suportadas. Para evitar esse problema, baixe o notebook [MaskPhoneNumber-Streaming Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln).

1. Importe o caderno de notas Zeppelin `MaskPhoneNumber-streaming`.

1. Abra a nota e escolha **Ações para KinesisDataAnalyticsStudio**.

1. Escolha **Build MaskPhoneNumber -Streaming e exporte para o S3**. Certifique-se de renomear o **Nome do aplicativo** e não incluir caracteres especiais.

1. Escolha **Criar e exportar**. Levará alguns minutos para configurar o aplicativo de streaming.

1.  Quando a compilação estiver concluída, escolha **Implantar usando o console AWS **. 

1.  Na próxima página, revise as configurações e certifique-se de escolher o perfil do IAM adequado. Em seguida, escolha **Criar aplicativo de streaming**. 

1. Depois de alguns minutos, você verá uma mensagem informando que o aplicativo de streaming foi criado com sucesso.

 Para obter mais informações sobre a implantação de aplicativos com estado durável e limites, consulte [Implantação como um aplicativo com estado durável](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html). 

## Limpeza
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

Opcionalmente, agora você pode [desinstalar a pilha CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html). Isso removerá todos os serviços que você configurou anteriormente.