

Tras considerarlo detenidamente, hemos decidido dejar de utilizar Amazon Kinesis Data Analytics para aplicaciones SQL:

1. A partir del **1 de septiembre de 2025,** no proporcionaremos ninguna corrección de errores para las aplicaciones de Amazon Kinesis Data Analytics for SQL porque tendremos un soporte limitado debido a la próxima discontinuación.

2. A partir del **15 de octubre de 2025,** no podrá crear nuevas aplicaciones de Kinesis Data Analytics for SQL.

3. Eliminaremos sus aplicaciones a partir del **27 de enero de 2026**. No podrá iniciar ni utilizar sus aplicaciones de Amazon Kinesis Data Analytics para SQL. A partir de ese momento, el servicio de soporte de Amazon Kinesis Data Analytics para SQL dejará de estar disponible. Para obtener más información, consulte [Retirada de las aplicaciones de Amazon Kinesis Data Analytics para SQL](discontinuation.md).

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Ejemplos de migración a Managed Service para Apache Flink
<a name="migrating-to-kda-studio-overview"></a>

Tras considerarlo detenidamente, hemos decidido retirar las aplicaciones de Amazon Kinesis Data Analytics para SQL. Para ayudarle a planificar y migrar aplicaciones de Amazon Kinesis Data Analytics para SQL, retiraremos la oferta gradualmente a lo largo de 15 meses. Estas son fechas importantes a tener en cuenta, el **1 de septiembre de 2025,** el **15 de octubre de 2025** y el **27 de enero de 2026**.

1. A partir del **1 de septiembre de 2025,** no proporcionaremos ninguna corrección de errores para las aplicaciones de Amazon Kinesis Data Analytics for SQL porque tendremos un soporte limitado debido a la próxima discontinuación.

1. A partir del **15 de octubre de 2025**, no podrá crear nuevas aplicaciones de Amazon Kinesis Data Analytics para SQL. 

1. Eliminaremos sus aplicaciones a partir del **27 de enero de 2026**. No podrá iniciar ni utilizar sus aplicaciones de Amazon Kinesis Data Analytics para SQL. A partir de ese momento, las aplicaciones de Amazon Kinesis Data Analytics para SQL dejarán de estar disponibles. Para obtener más información, consulte [Retirada de las aplicaciones de Amazon Kinesis Data Analytics para SQL](discontinuation.md).

Le recomendamos que utilice [Amazon Managed Service para Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html). Combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones de procesamiento de flujos en cuestión de minutos.

Esta sección proporciona ejemplos de código y arquitectura para ayudarle a migrar las cargas de trabajo de las aplicaciones de Amazon Kinesis Data Analytics para SQL a Managed Service para Apache Flink.

Para obtener más información, consulte también esta [AWS entrada de blog: Migrate from Amazon Kinesis Data Analytics for SQL Applications to Amazon Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/).

## Replicación de consultas de Kinesis Data Analytics para SQL en un servicio gestionado para Apache Flink Studio
<a name="examples-migrating-to-kda-studio"></a>

Para migrar sus cargas de trabajo a Managed Service para Apache Flink Studio o Managed Service para Apache Flink, en esta sección se proporcionan traducciones de consultas que puede utilizar en casos de uso habituales. 

Antes de explorar estos ejemplos, le recomendamos que consulte [Uso de un cuaderno de Studio con Managed Service para Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

### Recreación de consultas de Kinesis Data Analytics para SQL en Managed Service para Apache Flink Studio
<a name="examples-recreating-queries"></a>

Las siguientes opciones proporcionan traducciones de consultas comunes de aplicaciones de Kinesis Data Analytics basadas en SQL a Managed Service para Apache Flink Studio. 

#### Aplicación en varios pasos
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### Transformando los valores DateTime
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Alertas simples
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### Alertas limitadas
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### Agregar resultados parciales de una consulta
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### Transformación de valores de cadena
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Sustitución de una subcadena mediante Regex
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Análisis de registros de expresiones regulares
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### Transformando DateTime valores
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Ventanas y agregación
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### Ventana de saltos con Rowtime
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### Recuperación de los valores más frecuentes (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### Elementos aproximados del Top-K
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### Análisis de registros web (función W3C\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### División de cadenas en varios campos (función VARIABLE\$1COLUMN\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Uniones
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### Errores
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Migración de cargas de trabajo del bosque de corte aleatorio
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Si quiere trasladar cargas de trabajo que utiliza el bosque de corte aleatorio de Kinesis Analytics para SQL a Managed Service para Apache Flink, en [esta entrada de blog de AWS](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/) se muestra cómo utilizar Managed Service para Apache Flink con el fin de ejecutar un algoritmo RCF en línea en la detección de anomalías.

## Sustitución de Kinesis Data Firehose como origen por Kinesis Data Streams
<a name="examples-firehose"></a>

Consulte [Converting-KDASQL-KDAStudio/](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)para ver un tutorial completo.

En el siguiente ejercicio, cambiará su flujo de datos para usar Amazon Managed Service para Apache Flink Studio. Esto también implicará cambiar de Amazon Kinesis Data Firehose a Amazon Kinesis Data Streams.

En primer lugar, compartimos una arquitectura típica de KDA-SQL, antes de mostrar cómo puede sustituirla mediante Amazon Managed Service para Apache Flink Studio y Amazon Kinesis Data Streams. [Como alternativa, puede lanzar la plantilla aquí: CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)

### Amazon Kinesis Data Analytics-SQL y Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

Este es el flujo de arquitectura SQL de Amazon Kinesis Data Analytics: 

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/images/legacy-sql.png)


En primer lugar, examinamos la configuración de Amazon Kinesis Data Analytics-SQL y Amazon Kinesis Data Firehose anteriores. El caso de uso es un mercado bursátil en el que los datos de negociación, incluidos el precio y el precio de las acciones, se transmiten desde fuentes externas a los sistemas Amazon Kinesis. Amazon Kinesis Data Analytics para SQL utiliza el flujo de entrada para ejecutar consultas en ventana, como Tumbling Window, a fin de determinar el volumen de operaciones y el precio de negociación `min`, `max` y `average` durante un período de un minuto para cada cotización bursátil.  

Amazon Kinesis Data Analytics-SQL está configurado para ingerir datos de la API Amazon Kinesis Data Firehose. Tras el procesamiento, Amazon Kinesis Data Analytics-SQL envía los datos procesados a otra Amazon Kinesis Data Firehose, que luego guarda la salida en un bucket de Amazon S3. 

En este caso, utiliza Amazon Kinesis Data Generator. Amazon Kinesis Data Generator le permite enviar datos de prueba a sus flujos de entrega de Amazon Kinesis Data Streams o Amazon Kinesis Data Firehose. Para empezar, siga las instrucciones que aparecen [aquí](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Utilice la CloudFormation plantilla [aquí](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) en lugar de la que se proporciona en las [instrucciones:](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

Una vez que ejecute la CloudFormation plantilla, la sección de resultados proporcionará la URL del generador de datos de Amazon Kinesis. Inicie sesión en el portal con el ID y la contraseña de Cognito que configuró [aquí](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Seleccione la región y el nombre del flujo de destino. Para ver el estado actual, elija los flujos de Amazon Kinesis Data Firehose Delivery. Para ver el estado nuevo, elija los flujos de Amazon Kinesis Data Firehose. Puede crear varias plantillas, en función de sus requisitos, y probarlas con el botón **Probar plantilla** antes de enviarlas al flujo de destino.

A continuación, se presenta un ejemplo de carga útil con Amazon Kinesis Data Generator. El generador de datos se dirige a la entrada de Amazon Kinesis Firehose Streams para transmitir los datos de forma continua. El cliente del SDK de Amazon Kinesis también puede enviar datos de otros productores. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

El siguiente JSON se utiliza para generar una serie aleatoria de fecha y hora de negociación, cotización bursátil y precio bursátil:

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

Una vez que seleccione **Enviar datos**, el generador empezará a enviar datos simulados.

Los sistemas externos transmiten los datos a Amazon Kinesis Data Firehose. Con aplicaciones de Amazon Kinesis Data Analytics para SQL, puede analizar datos de flujo utilizando SQL estándar. El servicio le permite crear y ejecutar código SQL en orígenes de streaming para realizar análisis de series temporales, alimentar paneles en tiempo real y crear métricas en tiempo real. Las aplicaciones de Amazon Kinesis Data Analytics para SQL podrían crear un flujo de destino a partir de consultas SQL en el flujo de entrada y enviar el flujo de destino a otra Amazon Kinesis Data Firehose. El Amazon Kinesis Data Firehose de destino podría enviar los datos analíticos a Amazon S3 como estado final.

El código heredado de Amazon Kinesis Data Analytics-SQL se basa en una extensión de SQL Standard. 

Se utiliza la siguiente consulta en Amazon Kinesis Data Analytics-SQL. Primero debe crear un flujo de destino para el resultado de la consulta. A continuación, usaría `PUMP`, que es un objeto de repositorio de Amazon Kinesis Data Analytics (una extensión del estándar de SQL) que ofrece una funcionalidad de consulta `INSERT INTO stream SELECT ... FROM` en constante ejecución, que permite ingresar los resultados de una consulta de manera constante en una secuencia determinada. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

El SQL anterior usa dos ventanas de tiempo, `tradeTimestamp` que proviene de la carga útil del flujo entrante y `ROWTIME.tradeTimestamp` también denominado `Event Time` o `client-side time`. Suele ser conveniente utilizar estos momentos en análisis, ya que es el momento en el que se produjo un evento. No obstante, muchas fuentes de eventos como, por ejemplo, clientes de teléfonos móviles y web, no tienen relojes de confianza, lo que puede provocar tiempos inexactos. Además, los problemas de conectividad pueden hacer que los registros aparezcan en la secuencia y no lo en el mismo orden los eventos. 

Las secuencias en la aplicación incluyen una columna especial llamada `ROWTIME`. Almacena una marca temporal cuando Amazon Kinesis Data Analytics inserta una fila en la primera secuencia en la aplicación. `ROWTIME` refleja la marca temporal en la que Amazon Kinesis Data Analytics insertó un registro en la primera secuencia en la aplicación después de leer desde el origen de streaming. Este valor `ROWTIME` se mantiene en toda su aplicación. 

SQL determina el número de ticker como `volume`, `min`, `max` y `average` lo valora en un intervalo de 60 segundos. 

Utilizar cada uno de estos tiempos en las consultas en ventana basadas en el tiempo tiene ventajas y desventajas. Le recomendamos que elija uno o varios de estos tiempos, y una estrategia para abordar las posibles desventajas en función de su caso de uso. 

Recomendamos una estrategia de dos ventanas que utilice dos ventanas basadas en el tiempo: una `ROWTIME` y una para los otros tiempos, como el tiempo de evento.
+ Utilice `ROWTIME` como la primera ventana, que controla la frecuencia con la que la consulta emite los resultados, tal y como se muestra en el siguiente ejemplo. No se utiliza como tiempo lógico. 
+ Utilice uno de los otros tiempos que es el tiempo lógico que desea asociar a su análisis. Este tiempo representa cuándo se produjo el evento. En el siguiente ejemplo, el objetivo de análisis es agrupar los registros y devolver un recuento por cada símbolo. 

### Amazon Managed Service para Apache Flink Studio 
<a name="examples-studio"></a>

En la arquitectura actualizada, se sustituye Amazon Kinesis Data Firehose por Amazon Kinesis Data Streams. Las aplicaciones de Amazon Kinesis Data Analytics para SQL se sustituyen por Amazon Managed Service para Apache Flink Studio. El código de Apache Flink se ejecuta de forma interactiva en un cuaderno Apache Zeppelin. Amazon Managed Service para Apache Flink Studio envía los datos de comercio agregado a un bucket de Amazon S3 para su almacenamiento. Los pasos se muestran a continuación:

Este es el flujo de arquitectura de Amazon Managed Service para Apache Flink:

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Cree de un flujo de datos de Kinesis
<a name="examples-studio-create-data-stream"></a>

**Para crear un flujo de datos con la consola**

1. [Inicie sesión en la consola de Kinesis Consola de administración de AWS y ábrala en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. En la barra de navegación, expanda el selector de regiones y seleccione una región.

1. Elija **Create data stream (Crear flujo de datos)**.

1. En la página **Crear flujo de Kinesis**, escriba un nombre para su flujo de datos y, a continuación, elija el modo de capacidad **Bajo demanda** predeterminado. 

   Con el modo **Bajo demanda**, puede seleccionar **Crear flujo de Kinesis** para crear su flujo de datos. 

   En la página **Flujos de Kinesis**, el valor **Estado** del flujo es **Creándose** mientras se crea. Cuando el flujo está listo para usarse, el valor **Estado** cambia a **Activo**.

1. Elija el nombre del flujo. La página **Detalles del flujo** muestra un resumen de la configuración del flujo, junto con información de monitoreo.

1. **En el generador de datos de Amazon Kinesis, cambie la Stream/delivery transmisión por la nueva Amazon Kinesis Data Streams: TRADE\$1SOURCE\$1STREAM.**

   El JSON y la carga útil serán los mismos que los que utilizó para Amazon Kinesis Data Analytics-SQL. Utilice el generador de datos de Amazon Kinesis para generar algunos ejemplos de datos de carga útil de negociación y diríjase al flujo de datos **TRADE\$1SOURCE\$1STREAM** para este ejercicio: 

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1.  Consola de administración de AWS **Vaya a Managed Service for Apache Flink y, a continuación, seleccione Crear aplicación.**

1. En el panel de navegación izquierdo, elija **Bloc de notas de Studio** y, a continuación, seleccione **Crear bloc de notas de Studio**.

1. Escriba el nombre del bloc de notas de Studio.

1. En **AWS Glue database**, proporcione una base de datos AWS Glue existente que defina los metadatos de sus fuentes y destinos. Si no tiene una AWS Glue base de datos, elija **Crear** y haga lo siguiente:

   1. En la consola AWS Glue, selecciona **Bases de datos** en **Catálogo de datos** en el menú de la izquierda.

   1. Elija **Crear base de datos**.

   1. En la página **Crear base de datos**, ingrese el nombre de la base de datos. En la sección **Ubicación - opcional**, elija **Examinar Amazon S3** y seleccione el bucket de Amazon S3. Si aún no tiene configurado un bucket de Amazon S3, puede omitir este paso y volver a él más tarde.

   1. (Opcional). Ingrese la descripción de la base de datos.

   1. Elija **Creación de base de datos**.

1. Elija **Crear bloc de notas**.

1. Una vez creado el bloc de notas, seleccione **Ejecutar**.

1. Una vez que el cuaderno se haya iniciado correctamente, abra un cuaderno Zeppelin seleccionando **Abrir en Apache Zeppelin**.

1. En la página del bloc de notas de Zeppelin, selecciona **Crear nueva nota** y asígnale un nombre. *MarketDataFeed*

El código SQL de Flink se explica a continuación, pero primero [así es como se ve la pantalla de un bloc de notas Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). Cada ventana del bloc de notas es un bloque de códigos independiente y se pueden ejecutar de una en una.

##### Código de Amazon Managed Service para Apache Flink Studio
<a name="examples-studio-code"></a>

Amazon Managed Service para Apache Flink utiliza Zeppelin Notebooks para ejecutar el código. En este ejemplo, la asignación se realiza a código ssql basado en Apache Flink 1.13. El código del cuaderno Zeppelin se muestra debajo de un bloque a la vez.  

Antes de ejecutar cualquier código en su bloc de notas Zeppelin, debe ejecutar los comandos de configuración de Flink. Si necesita cambiar algún ajuste de configuración después de ejecutar el código (ssql, Python o Scala), tendrá que detener y reiniciar el cuaderno. En este ejemplo, tendrá que establecer puntos de control. Se requieren puntos de control para poder transmitir datos a un archivo en Amazon S3. Esto permite que los datos que se transmiten a Amazon S3 se vacíen en un archivo. La siguiente afirmación establece el intervalo en 5000 milisegundos.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` indica que este bloque son declaraciones de configuración. Para obtener más información sobre la configuración de Flink, incluidos los puntos de control, consulte [Puntos de control de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/).  

La tabla de entrada para la fuente Amazon Kinesis Data Streams se crea con el código ssql de Flink que aparece a continuación. Tenga en cuenta que el `TRADE_TIME` campo almacena lo date/time creado por el generador de datos.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

Puede ver el flujo de entrada con esta declaración:

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Antes de enviar los datos agregados a Amazon S3, puede verlos directamente en Amazon Managed Service para Apache Flink Studio con una consulta de selección en una ventana desplegable. Esto agrega los datos de negociación en intervalos de tiempo de un minuto. Tenga en cuenta que la sentencia %flink.ssql debe tener una designación (type=update):

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

A continuación, podrá crear una tabla para el destino en Amazon S3. Tiene que utilizar una marca de agua. Una marca de agua es una métrica de progreso que indica un momento en el que está seguro de que no se producirán más eventos retrasados. El motivo de la marca de agua es tener en cuenta las llegadas tardías. El intervalo `‘5’ Second` permite que las operaciones entren en Amazon Kinesis Data Streams con 5 segundos de retraso y que se sigan incluyendo si tienen una marca de tiempo dentro de la ventana. Para obtener más información, consulte [Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/).    

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

Esta declaración inserta los datos en `TRADE_DESTINATION_S3`. `TUMPLE_ROWTIME` es la marca de tiempo del límite superior inclusivo de la ventana de saltos.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Deje que su estado de cuenta se ejecute durante 10 a 20 minutos para acumular algunos datos en Amazon S3. A continuación, aborte su instrucción. 

Esto cierra el archivo en Amazon S3 para que se pueda ver. 

Este es el aspecto del contenido: 

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


Puede usar la [plantilla de CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) para crear la infraestructura. 

CloudFormation creará los siguientes recursos en su AWS cuenta:
+  Amazon Kinesis Data Streams
+  Amazon Managed Service para Apache Flink Studio
+  AWS Glue base de datos
+  Bucket de Amazon S3
+  Roles y políticas de IAM para que Amazon Managed Service para Apache Flink Studio acceda a los recursos adecuados

Importe el bloc de notas y cambie el nombre del bucket de Amazon S3 por el nuevo bucket de Amazon S3 creado por CloudFormation. 

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### Ver más
<a name="more"></a>

Estos son algunos recursos adicionales que puede utilizar para obtener más información sobre el uso de Managed Service para Apache Flink Studio: 
+ [Guía para desarrolladores de Managed Service para Apache Flink Studio Notebooks](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Documentación de Apache Flink 1.13](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Taller de Managed Service for Apache Flink Studio ](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Creación de ventanas de Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Guía para desarrolladores de Amazon Kinesis Data Analytics: escritura desde un flujo de Kinesis Data Analytics a un bucket de S3](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# Aprovechar las funciones definidas por el usuario () UDFs
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

El propósito del patrón es demostrar cómo aprovechar las libretas Zeppelin de Kinesis Data Analytics-Studio para procesar datos UDFs en la transmisión de Kinesis. Managed Service para Apache Flink Studio utiliza Apache Flink para proporcionar capacidades analíticas avanzadas, que incluyen semántica de procesamiento de una sola vez, ventanas temporales de eventos, extensibilidad mediante funciones definidas por el usuario e integraciones de clientes, compatibilidad con lenguajes imperativos, estado de aplicación duradero, escalado horizontal, soporte para múltiples orígenes de datos, integraciones extensibles y más. Son fundamentales para garantizar la precisión, la integridad, la coherencia y la fiabilidad del procesamiento de los flujos de datos y no están disponibles con Amazon Kinesis Data Analytics para SQL.

En este ejemplo de aplicación, demostraremos cómo aprovechar UDFs el cuaderno Zeppelin de KDA-Studio para procesar datos en la transmisión de Kinesis. Los blocs de notas de Studio para Kinesis Data Analytics le permiten consultar flujos de datos de forma interactiva en tiempo real y crear y ejecutar fácilmente aplicaciones de procesamiento de flujos mediante SQL, Python y Scala estándares. Con unos pocos clics Consola de administración de AWS, puede abrir un bloc de notas sin servidor para consultar flujos de datos y obtener resultados en cuestión de segundos. Para obtener más información, consulte [Uso de un bloc de notas de Studio con Kinesis Data Analytics para Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

Funciones Lambda utilizadas para el pre/post procesamiento de datos en aplicaciones KDA-SQL:

![\[alt text not found\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/images/sql-udf-1.png)


Funciones definidas por el usuario para el pre/post procesamiento de datos con los cuadernos Zeppelin de KDA-Studio

![\[alt text not found\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/images/sql-udf.png)


## Funciones UDFs definidas por el usuario ()
<a name="examples-migrating-to-kda-studio-udfs"></a>

Para reutilizar la lógica empresarial habitual en un operador, puede resultar útil hacer referencia a una función definida por el usuario para transformar el flujo de datos. Esto se puede hacer desde el bloc de notas Managed Service para Apache Flink Studio o como un archivo jar de aplicación con referencia externa. El uso de funciones definidas por el usuario puede simplificar las transformaciones o los enriquecimientos de datos que se podrían realizar a través del flujo de datos.

 En su bloc de notas, hará referencia a un sencillo contenedor de aplicaciones Java que tiene la funcionalidad de anonimizar números de teléfono personales. También puedes escribir Python o Scala UDFs para usarlos en el cuaderno. Elegimos una aplicación Java jar para resaltar la funcionalidad de importar una aplicación jar a un bloc de notas de Pyflink.

## Configuración del entorno
<a name="examples-migrating-to-kda-studio-setup"></a>

Para seguir esta guía e interactuar con sus datos de flujo, utilizará un script AWS CloudFormation para lanzar los siguientes recursos:
+ Flujo de datos de origen y destino de Kinesis
+ Base de datos Glue
+ rol de IAM
+ Aplicación Managed Service para Apache Flink Studio
+ Función de Lambda para iniciar la aplicación Managed Service para Apache Flink Studio
+ Rol de Lambda para ejecutar la anterior función de Lambda
+ Recurso personalizado para invocar la función de Lambda

Descarga la CloudFormation plantilla [aquí.](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)

**Crea la CloudFormation pila**

1. Ve a Consola de administración de AWS y elige **CloudFormation**en la lista de servicios.

1. En la **CloudFormation**página, selecciona **Pilas y,** a continuación, selecciona **Crear pila con nuevos recursos (estándar).** 

1. En la página **Crear pila**, elija **Cargar un archivo de plantilla** y, a continuación, elija el `kda-flink-udf.yml` que haya descargado anteriormente. Elija el archivo y después elija **Siguiente**. 

1. Asigne un nombre a la plantilla, como `kinesis-UDF` de modo que sea fácil de recordar, y actualice los parámetros de entrada, como flujo de entrada, si desea un nombre diferente. Elija **Siguiente**.

1. En la página **Configurar opciones de pila**, añada **Etiquetas** si lo desea y, a continuación, seleccione **Siguiente**.

1. En la página de **revisión**, marque las casillas que permiten la creación de recursos de IAM y, a continuación, seleccione **Enviar**.

 El lanzamiento de la CloudFormation pila puede tardar entre 10 y 15 minutos, en función de la región en la que lo hagas. Cuando vea el estado `CREATE_COMPLETE` de toda la pila, estará listo para continuar.

## Uso del bloc de notas de Managed Service para Apache Flink Studio
<a name="examples-migrating-to-kda-studio-notebook"></a>

Los blocs de notas de Studio para Kinesis Data Analytics le permiten consultar flujos de datos de forma interactiva en tiempo real y crear y ejecutar fácilmente aplicaciones de procesamiento de flujos mediante SQL, Python y Scala estándar. Con unos pocos clics Consola de administración de AWS, puedes abrir una libreta sin servidor para consultar flujos de datos y obtener resultados en cuestión de segundos.

Un bloc de notas es un entorno de desarrollo basado en la web. Con los blocs de notas, obtiene una experiencia de desarrollo interactiva sencilla combinada con las capacidades avanzadas de procesamiento de flujos de datos que proporciona Apache Flink. Los cuadernos de Studio utilizan la tecnología Apache Zeppelin y utilizan Apache Flink como motor de procesamiento de flujos. Los blocs de notas de Studio combinan estas tecnologías a la perfección para que los desarrolladores con todas las habilidades puedan acceder a los análisis avanzados de los flujos de datos. 

 Apache Zeppelin proporciona a sus blocs de notas de Studio un conjunto completo de herramientas de análisis, entre las que se incluyen las siguientes:
+ Visualización de datos
+ Exportación de datos a un archivo CSV
+ Control del formato de salida para facilitar el análisis 

**Uso del bloc de notas**

1. Vaya a Amazon Kinesis Consola de administración de AWS y elija **Amazon Kinesis** en la lista de servicios.

1. En la página de navegación de la izquierda, elija **Aplicaciones de análisis** y, a continuación, elija **blocs de notas de Studio**.

1. Compruebe que el **KinesisDataAnalyticsStudio**portátil esté funcionando.

1. Elija el bloc de notas y, a continuación, elija **Abrir en Apache Zeppelin**. 

1. Descargue el archivo de [Data Producer Zeppelin Notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln) que utilizará para leer y cargar datos en Kinesis Stream. 

1.  Importe el bloc de notas Zeppelin `Data Producer`. Asegúrese de modificar la entrada `STREAM_NAME` y `REGION` en el código del bloc de notas. El nombre del flujo de entrada se encuentra en la [salida de la pila CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). 

1. Ejecute el bloc de notas **Data Producer** pulsando el botón **Ejecutar este párrafo** para insertar datos de muestra en la entrada de Kinesis Data Stream.

1. Mientras se cargan los datos de muestra, descargue [MaskPhoneNumber-Interactive Notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln), que leerá los datos de entrada, anonimizará los números de teléfono del flujo de entrada y almacenará los datos anónimos en el flujo de salida.

1. Importe el bloc de notas Zeppelin `MaskPhoneNumber-interactive`. 

1. Ejecute cada párrafo del bloc de notas.

   1. En el párrafo 1, se importa una función definida por el usuario para anonimizar los números de teléfono.

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. En el siguiente párrafo, creará una tabla en memoria para leer los datos del flujo de entrada. Asegúrese de que el nombre de la transmisión y la región sean correctos. AWS 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. Compruebe si los datos están cargados en la tabla en memoria. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. Invoque la función definida por el usuario para anonimizar el número de teléfono. 

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Ahora que los números de teléfono están enmascarados, cree una vista con un número enmascarado. 

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Compruebe los datos. 

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. Cree una tabla en memoria para la salida de Kinesis Stream. Asegúrese de que el nombre de la transmisión y AWS la región sean correctos. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. Inserte registros actualizados en el flujo de Kinesis de destino. 

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. Vea y verifique los datos del flujo de Kinesis de destino. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## Promoción de un bloc de notas como aplicación
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

Ahora que ha probado el código de su bloc de notas de forma interactiva, implementará el código como una aplicación de flujo con un estado duradero. Primero tendrá que modificar la configuración de la aplicación para especificar una ubicación para su código en Amazon S3. 

1. En Consola de administración de AWS, elija su bloc de notas y, en **Implementar como configuración de aplicación (opcional)**, elija **Editar**. 

1. En **Destino del código en Amazon S3**, elija el bucket de Amazon S3 que crearon los [CloudFormation scripts](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). El proceso puede demorar unos minutos.

1. No podrá promocionar la nota tal como está. Si lo intenta, se producirá un error ya que no se admiten las instrucciones `Select`. Para evitar este problema, descargue el cuaderno [MaskPhoneNumber-Streaming Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln).

1. Importe el bloc de notas Zeppelin `MaskPhoneNumber-streaming`.

1. **Abre la nota y selecciona Acciones para. KinesisDataAnalyticsStudio**

1. Elija **Build MaskPhoneNumber -Streaming y exporte a S3**. Asegúrese de cambiar el **nombre de la aplicación** y de no incluir caracteres especiales.

1. Seleccione **Crear y exportar**. La configuración de la aplicación de flujo tardará unos minutos.

1.  Una vez que se complete la compilación, elija **Implementar mediante la consola de AWS **. 

1.  En la página siguiente, revise la configuración y asegúrese de elegir el rol de IAM correcto. A continuación, seleccione **Crear aplicación de streaming**. 

1. Después de unos minutos, verá un mensaje que indica que la aplicación de flujo se creó correctamente.

 Para obtener más información sobre la implementación de aplicaciones con un estado y límites duraderos, consulte [Implementación como una aplicación con un estado duradero](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html). 

## Limpieza
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

Si lo desea, ahora puede [desinstalar la pila CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html). Esto eliminará todos los servicios que configuró anteriormente.