

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics pour les applications SQL :

1. À compter du **1er septembre 2025,** nous ne fournirons aucune correction de bogue pour les applications Amazon Kinesis Data Analytics for SQL, car leur support sera limité, compte tenu de l'arrêt prochain.

2. À compter du **15 octobre 2025,** vous ne pourrez plus créer de nouvelles applications Kinesis Data Analytics for SQL.

3. Nous supprimerons vos candidatures à compter **du 27 janvier 2026**. Vous ne serez pas en mesure de démarrer ou d'utiliser vos applications Amazon Kinesis Data Analytics for SQL. Support ne sera plus disponible pour Amazon Kinesis Data Analytics for SQL à partir de cette date. Pour de plus amples informations, veuillez consulter [Arrêt d'Amazon Kinesis Data Analytics pour les applications SQL](discontinuation.md).

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exemples de migration vers le service géré pour Apache Flink Studio
<a name="migrating-to-kda-studio-overview"></a>

Après mûre réflexion, nous avons pris la décision de mettre fin à Amazon Kinesis Data Analytics pour les applications SQL. Pour vous aider à planifier et à migrer hors d'Amazon Kinesis Data Analytics pour les applications SQL, nous supprimerons progressivement l'offre sur une période de 15 mois. Ce sont des dates importantes à noter, **le 1er septembre 2025**, **le 15 octobre 2025** et **le 27 janvier 2026**.

1. À compter du **1er septembre 2025,** nous ne fournirons aucune correction de bogue pour les applications Amazon Kinesis Data Analytics for SQL, car leur support sera limité, compte tenu de l'arrêt prochain.

1. À compter du **15 octobre 2025,** vous ne pourrez plus créer de nouvelles applications Amazon Kinesis Data Analytics for SQL. 

1. Nous supprimerons vos candidatures à compter **du 27 janvier 2026**. Vous ne serez pas en mesure de démarrer ou d'utiliser vos applications Amazon Kinesis Data Analytics for SQL. Support ne sera plus disponible pour les applications Amazon Kinesis Data Analytics for SQL à partir de cette date. Pour en savoir plus, veuillez consulter la section [Arrêt d'Amazon Kinesis Data Analytics pour les applications SQL](discontinuation.md).

Nous vous recommandons d'utiliser [Amazon Managed Service pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html). Il allie facilité d'utilisation et fonctionnalités analytiques avancées, vous permettant de créer des applications de traitement de flux en quelques minutes.

Cette section fournit des exemples de code et d'architecture pour vous aider à transférer les charges de travail de vos applications Amazon Kinesis Data Analytics for SQL vers Managed Service for Apache Flink.

Pour plus d'informations, consultez également ce billet de [AWS blog : Migrer d'Amazon Kinesis Data Analytics pour les applications SQL vers le service géré pour Apache Flink](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/) Studio.

## Réplication des requêtes Kinesis Data Analytics pour SQL dans un service géré pour Apache Flink Studio
<a name="examples-migrating-to-kda-studio"></a>

Cette section fournit des conversions de requêtes que vous pouvez utiliser pour les cas d’utilisation courants lors de la migration de vos charges de travail vers le service géré pour Apache Flink Studio ou le service géré pour Apache Flink. 

Avant d'explorer ces exemples, nous vous recommandons de consulter d'abord [l'article Utilisation d'un bloc-notes Studio avec un service géré pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

### Recréation des requêtes Kinesis Data Analytics pour SQL dans un service géré pour Apache Flink Studio
<a name="examples-recreating-queries"></a>

Les options suivantes fournissent des traductions des requêtes courantes de l'application Kinesis Data Analytics basées sur SQL vers Managed Service pour Apache Flink Studio. 

#### Application à plusieurs étapes
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### Transformation DateTime des valeurs
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Alertes simples
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### Alertes limitées
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### Regroupement d’une partie des résultats à partir d’une requête
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### Transformation de valeurs de chaîne
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Remplacement d’une sous-chaîne à l’aide de Regex
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Analyse du journal Regex
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### Transformation DateTime des valeurs
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Fenêtres et regroupement
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### Fenêtre bascule utilisant Rowtime
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### Récupération des valeurs les plus fréquentes (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### Éléments Top-K approximatifs
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### Analyse de journaux web (fonction W3C\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### Fractionnement de chaînes en plusieurs champs (fonction VARIABLE\$1COLUMN\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Jointures
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### Erreurs
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Migration des charges de travail Random Cut Forest
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Si vous souhaitez déplacer des charges de travail utilisant Random Cut Forest de Kinesis Analytics pour SQL vers le service géré pour Apache Flink, [ce billet de blog AWS](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/) explique comment utiliser le service géré pour Apache Flink afin d’exécuter un algorithme RCF en ligne en vue de détecter des anomalies.

## Remplacement de Kinesis Data Firehose en tant que source par Kinesis Data Streams
<a name="examples-firehose"></a>

Voir [KDAStudioConverting-KDASQL-/ pour un didacticiel complet](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio).

Dans l’exercice suivant, vous allez modifier votre flux de données afin d’utiliser le service géré Amazon pour Apache Flink Studio. Cela impliquera également de passer d’Amazon Kinesis Data Firehose à Amazon Kinesis Data Streams.

Nous partageons d’abord une architecture KDA basée sur SQL typique, avant de montrer comment la remplacer à l’aide du service géré Amazon pour Apache Flink Studio et Amazon Kinesis Data Streams. Vous pouvez également lancer le CloudFormation modèle [ici](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) :

### Amazon Kinesis Data Analytics basée sur SQL et Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

Voici le flux architectural de l’application Amazon Kinesis Data Analytics basée sur SQL : 

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/legacy-sql.png)


Nous examinons d’abord la configuration d’une application héritée Amazon Kinesis Data Analytics basée sur SQL et Amazon Kinesis Data Firehose. Le cas d’utilisation concerne un marché boursier sur lequel des données commerciales, notamment des données de symbole boursier et de cours des actions, sont transmises depuis des sources externes aux systèmes Amazon Kinesis. Amazon Kinesis Data Analytics for SQL utilise le flux d'entrée pour exécuter des requêtes fenêtrées, telles que la fenêtre Tumbling`max`, afin de déterminer le volume des transactions et `min` le cours des transactions sur une fenêtre d'une minute pour chaque ticker boursier. `average`  

Amazon Kinesis Data Analytics pour SQL est configuré pour ingérer les données de l’API Amazon Kinesis Data Firehose. Après le traitement, Amazon Kinesis Data Analytics pour SQL envoie les données traitées vers un autre Amazon Kinesis Data Firehose, qui enregistre ensuite le résultat dans un compartiment Amazon S3. 

Dans ce cas, vous utilisez Amazon Kinesis Data Generator. Amazon Kinesis Data Generator vous permet d’envoyer des données de test à vos flux de diffusion Amazon Kinesis Data Streams ou Amazon Kinesis Data Firehose. Pour commencer, suivez les instructions [ici](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Utilisez le CloudFormation modèle [ici](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) à la place de celui fourni dans les [instructions :](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

Une fois le CloudFormation modèle exécuté, la section de sortie fournit l'URL d'Amazon Kinesis Data Generator. Connectez-vous au portail à l’aide de l’identifiant utilisateur et du mot de passe Cognito que vous avez définis [ici](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). Sélectionnez la région et le nom du flux cible. Pour l’état actuel, choisissez les flux de diffusion Amazon Kinesis Data Firehose. Pour l’état actuel, choisissez le nom de flux Amazon Kinesis Data Firehose. Vous pouvez créer plusieurs modèles, en fonction de vos besoins, et tester le modèle à l’aide du bouton **Tester le modèle** avant de l’envoyer au flux cible.

Vous trouverez ci-dessous un exemple de charge utile utilisant Amazon Kinesis Data Generator. Le générateur de données cible les flux Amazon Kinesis Firehose en entrée pour diffuser les données en continu. Le client Amazon Kinesis SDK peut également envoyer des données provenant d’autres producteurs. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

Le code JSON suivant est utilisé pour générer une série aléatoire de date et heure de transaction, de symbole boursier et de cours d’action :

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

Une fois que vous avez choisi **Envoyer les données**, le générateur commence à envoyer des données fictives.

Les systèmes externes transmettent les données à Amazon Kinesis Data Firehose. Grâce aux applications Amazon Kinesis Data Analytics pour SQL, vous pouvez analyser les données de streaming à l’aide du langage SQL standard. Ce service vous permet de créer et d’exécuter rapidement un code sur des sources de streaming afin d’effectuer des analyses de séries chronologiques, d’alimenter des tableaux de bord en temps réel et de générer des métriques en temps réel. Les applications Amazon Kinesis Data Analytics pour SQL peuvent créer un flux de destination à partir de requêtes SQL sur le flux d’entrée et envoyer le flux de destination à un autre Amazon Kinesis Data Firehose. L’Amazon Kinesis Data Firehose de destination peut envoyer les données analytiques à Amazon S3 en dernière étape.

Le code hérité d’Amazon Kinesis Data Analytics pour SQL est basé sur une extension de la norme SQL. 

Vous utilisez la requête suivante dans Amazon Kinesis Data Analytics pour SQL. Vous créez d’abord un flux de destination pour la sortie de requête. Ensuite, vous devez utiliser `PUMP`, un objet de référentiel Amazon Kinesis Data Analytics (une extension de la norme SQL) qui fournit une fonctionnalité de requête `INSERT INTO stream SELECT ... FROM` continue, pour saisir les résultats d’une requête en continu dans un flux nommé. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

Le code SQL précédent utilise deux fenêtres temporelles : `tradeTimestamp` celle qui provient de la charge utile du flux entrant et `ROWTIME.tradeTimestamp` est également appelée `Event Time` ou`client-side time`. Il est souvent préférable d'utiliser cette heure dans les analyses car il s'agit du moment où un événement s'est produit. Cependant, de nombreuses sources d'événements, telles que les téléphones mobiles et les clients web, n'ont pas horloges fiables, ce qui peut entraîner des heures inexactes. En outre, des problèmes de connectivité peuvent provoquer le fait que des enregistrements figurant dans un flux ne sont pas dans le même ordre que celui où les événements se sont produits. 

Les flux intégrés à l’application incluent également une colonne spéciale appelée `ROWTIME`. Celle-ci stocke un horodatage quand Amazon Kinesis Data Analytics insère une ligne dans le premier flux intégré à l’application. `ROWTIME` reflète l’horodatage du moment où Amazon Kinesis Data Analytics a inséré un enregistrement dans le premier flux intégré à l’application après la lecture de la source de streaming. Cette valeur `ROWTIME` est ensuite gérée tout au long de votre application. 

Le code SQL détermine le nombre de tickers sous forme de`volume`, `min``max`, et le `average` prix sur un intervalle de 60 secondes. 

L'utilisation de chacun de ces types d'heure dans des requêtes à fenêtres temporelles présente des avantages et des inconvénients. Sélectionnez un ou plusieurs de ces types d’heure, et une stratégie pour traiter les inconvénients pertinents en fonction du scénario d’utilisation. 

Une stratégie à deux fenêtres utilise deux types d’heure, `ROWTIME` et l’un des autres types d’heure, comme l’heure de l’événement.
+ Utilisez `ROWTIME`comme première fenêtre, qui contrôle la fréquence à laquelle la requête émet les résultats, comme illustré dans l'exemple suivant. Cette valeur n'est pas utilisée comme une heure logique. 
+ Utilisez l'un des autres types d'heure comme heure logique à associer à vos analyses. Cette heure représente le moment où l'événement s'est produit. Dans l'exemple suivant, l'objectif de l'analyse est de regrouper les enregistrements et de renvoyer un comptage par symbole boursier. 

### Service géré Amazon pour Apache Flink Studio 
<a name="examples-studio"></a>

Dans l’architecture mise à jour, vous remplacez Amazon Kinesis Data Firehose par Amazon Kinesis Data Streams. Les applications Amazon Kinesis Data Analytics pour SQL sont remplacées par le service géré Amazon pour Apache Flink Studio. Le code Apache Flink est exécuté de manière interactive dans un bloc-notes Apache Zeppelin. Le service géré Amazon pour Apache Flink Studio envoie les données commerciales agrégées vers un compartiment Amazon S3 en vue de leur stockage. Les étapes sont indiquées ci-dessous :

Voici le flux architectural du service géré Amazon pour Apache Flink Studio :

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Créer un flux de données Kinesis
<a name="examples-studio-create-data-stream"></a>

**Pour créer un flux de données avec la console**

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Dans la barre de navigation, développez le sélecteur de région et choisissez une région.

1. Choisissez **Create data stream (Créer un flux de données)**.

1. Sur la page **Créer un flux Kinesis**, saisissez le nom de votre flux de données, puis acceptez le mode de capacité **à la demande** par défaut. 

   En mode **à la demande**, vous pouvez ensuite choisir **Créer un flux Kinesis** pour créer votre flux de données. 

   Dans la page **Flux Kinesis**, la valeur **Statut** de votre flux est **En création** lorsque le flux est en cours de création. Lorsque le flux est à prêt à être utilisé, le **statut** passe à **Actif**.

1. Choisissez le nom de votre flux. La page **Détails du flux** affiche un récapitulatif de la configuration de flux, ainsi que des informations de surveillance.

1. **Dans le générateur de données Amazon Kinesis, remplacez le Stream/delivery flux par le nouveau flux Amazon Kinesis Data Streams : TRADE\$1SOURCE\$1STREAM.**

   Le format JSON et la charge utile seront les mêmes que ceux que vous avez utilisés pour Amazon Kinesis Data Analytics pour SQL. Utilisez Amazon Kinesis Data Generator pour produire des exemples de données de charge utile commerciales et ciblez le flux de données **TRADE\$1SOURCE\$1STREAM** pour cet exercice : 

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1.  AWS Management Console Accédez à Managed Service for Apache Flink, puis choisissez **Create application**.

1. Dans le panneau de navigation à gauche, sélectionnez **Blocs-notes Studio**, puis **Créer un bloc-notes Studio**.

1. Saisissez un nom pour le bloc-notes Studio.

1. Sous **Base de données AWS Glue**, indiquez une base de données AWS Glue existante qui définira les métadonnées de vos sources et destinations. Si vous n'avez pas de AWS Glue base de données, choisissez **Create** et procédez comme suit :

   1. Dans la console AWS Glue, choisissez **Databases** sous **Catalogue de données** dans le menu de gauche.

   1. Sélectionnez **Créer une base de données**.

   1. Sur la page **Créer une base de données**, saisissez un nom pour la base de données. Dans la section **Emplacement - facultatif**, choisissez **Parcourir Amazon S3** et sélectionnez le compartiment Amazon S3. Si vous n'avez pas de compartiment Amazon S3 déjà configuré, vous pouvez ignorer cette étape et y revenir plus tard.

   1. (Facultatif) Saisissez une description pour la base de données.

   1. Choisissez **Créer une base de données**.

1. Choisissez **Créer un bloc-notes**.

1. Une fois votre bloc-notes créé, choisissez **Exécuter**.

1. Une fois le bloc-notes démarré avec succès, lancez un bloc-notes Zeppelin en choisissant **Ouvrir dans Apache** Zeppelin.

1. Sur la page Zeppelin Notebook, choisissez **Créer une nouvelle note** et nommez-la. *MarketDataFeed*

Le code SQL Flink est expliqué ci-dessous, mais voici d’abord [à quoi ressemble un écran de bloc-notes Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). Chaque fenêtre du bloc-notes est un bloc de code distinct ; elles peuvent être exécutées une par une.

##### Code de service géré Amazon pour Apache Flink Studio
<a name="examples-studio-code"></a>

Le service géré Amazon pour Apache Flink Studio utilise les blocs-notes Zeppelin pour exécuter le code. Pour cet exemple, le mappage est effectué avec du code SSQL basé sur Apache Flink 1.13. Le code du carnet Zeppelin est affiché ci-dessous, bloc par bloc.  

Avant d’exécuter du code dans votre bloc-notes Zeppelin, les commandes de configuration Flink doivent être exécutées. Si vous devez modifier un paramètre de configuration après avoir exécuté du code (ssql, Python ou Scala), vous devez arrêter et redémarrer votre bloc-notes. Dans cet exemple, vous devez définir le point de contrôle. Un point de contrôle est nécessaire pour diffuser des données vers un fichier dans Amazon S3. Cela permet de transférer vers un fichier la diffusion de données vers Amazon S3. L'instruction suivante définit l'intervalle à 5 000 millisecondes.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` indique que ce bloc est constitué d’instructions de configuration. Pour plus d'informations sur la configuration de Flink, y compris le pointage de contrôle, voir [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/) Checkpointing.  

La table d'entrée pour la source Amazon Kinesis Data Streams est créée avec le code Flink ssql suivant. Notez que le `TRADE_TIME` champ stocke les données date/time créées par le générateur de données.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

Vous pouvez afficher le flux d’entrée avec cette instruction :

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Avant d’envoyer les données agrégées à Amazon S3, vous pouvez les afficher directement dans le service géré Amazon pour Apache Flink Studio à l’aide d’une requête de sélection à fenêtres bascules. Cela agrège les données de trading dans une fenêtre de temps d'une minute. Notez que l’instruction %flink.ssql doit avoir une désignation (type=update) :

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Vous pouvez ensuite créer une table de destination dans Amazon S3. Vous devez utiliser un filigrane. Un filigrane est une mesure de progression qui indique un instant donné à partir duquel vous êtes certain qu’aucun autre événement retardé ne se produira. L’objectif du filigrane est de prendre en compte les arrivées tardives. L’intervalle `‘5’ Second` permet aux transactions d’entrer dans le flux Amazon Kinesis Data Streams avec 5 secondes de retard et d’être incluses si elles sont horodatées dans cette fenêtre. Pour plus d'informations, consultez la section [Génération de filigranes](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/).    

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

Cette instruction insère les données dans le code `TRADE_DESTINATION_S3`. `TUMPLE_ROWTIME` correspond à l’horodatage de la limite supérieure incluse de la fenêtre bascule.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Exécutez votre instruction pendant 10 à 20 minutes afin d’accumuler des données dans Amazon S3. Puis annulez l’instruction. 

Cela ferme le fichier dans Amazon S3 afin qu’il soit consultable. 

Voici à quoi ressemble le contenu : 

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


Vous pouvez utiliser le [modèle CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) pour créer l’infrastructure. 

CloudFormation créera les ressources suivantes dans votre AWS compte :
+  Amazon Kinesis Data Streams
+  Service géré Amazon pour Apache Flink Studio
+  AWS Glue base de données
+  Compartiment Amazon S3
+  Rôles et politiques IAM du service géré Amazon pour Apache Flink Studio pour accéder aux ressources appropriées

Importez le bloc-notes et remplacez le nom du compartiment Amazon S3 par le nouveau compartiment Amazon S3 créé par CloudFormation. 

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### Voir plus
<a name="more"></a>

Voici quelques ressources supplémentaires que vous pouvez utiliser pour en savoir plus sur l'utilisation du service géré pour Apache Flink Studio : 
+ [Manuel du développeur relatifs aux blocs-notes du service géré pour Apache Flink Studio](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Documentation Apache Flink 1.13](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Atelier Service géré Amazon pour Apache Flink Studio](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Fenêtrage Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Guide du développeur Amazon Kinesis Data Analytics — Rédaction depuis un flux Kinesis Data Analytics vers un compartiment S3](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# Tirer parti des fonctions définies par l'utilisateur () UDFs
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

L'objectif de ce modèle est de montrer comment tirer parti UDFs des blocs-notes Zeppelin de Kinesis Data Analytics-Studio pour traiter les données du flux Kinesis. Le service géré pour Apache Flink Studio utilise Apache Flink pour fournir des fonctionnalités analytiques avancées, notamment une sémantique de traitement unique, des fenêtres temporelles, une extensibilité grâce à des fonctions définies par l'utilisateur et des intégrations client, une prise en charge linguistique impérative, un état durable des applications, une mise à l'échelle horizontale, la prise en charge de plusieurs sources de données, des intégrations extensibles, etc. Ces fonctionnalités sont essentielles pour garantir l’exactitude, l’exhaustivité, la cohérence et la fiabilité du traitement des flux de données et ne sont pas disponibles avec Amazon Kinesis Data Analytics pour SQL.

Dans cet exemple d'application, nous allons montrer comment tirer parti UDFs du bloc-notes Zeppelin de KDA-Studio pour traiter les données du flux Kinesis. Les blocs-notes Studio pour Kinesis Data Analytics vous permettent d’interroger des flux de données de manière interactive en temps réel, et de créer et d’exécuter facilement des applications de traitement de flux à l’aide des normes SQL, Python et Scala. En quelques clics AWS Management Console, vous pouvez lancer un bloc-notes sans serveur pour interroger des flux de données et obtenir des résultats en quelques secondes. Pour plus d’informations, consultez [Utilisation d’un bloc-notes Studio avec Kinesis Data Analytics pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

Fonctions Lambda utilisées pour le pre/post traitement des données dans les applications KDA-SQL :

![\[alt text not found\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/sql-udf-1.png)


Fonctions définies par l'utilisateur pour le pre/post traitement des données à l'aide des ordinateurs portables KDA-Studio Zeppelin

![\[alt text not found\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/sql-udf.png)


## Fonctions définies par l'utilisateur () UDFs
<a name="examples-migrating-to-kda-studio-udfs"></a>

Pour réutiliser une logique métier courante dans un opérateur, il peut être utile de référencer une fonction définie par l’utilisateur afin de transformer votre flux de données. Vous pouvez le faire soit dans le bloc-notes du service géré pour Apache Flink Studio, soit sous forme de fichier jar d’application référencé en externe. L’utilisation de fonctions définies par l’utilisateur peut simplifier les transformations ou les enrichissements de données que vous pouvez effectuer sur des données de streaming.

 Dans votre bloc-notes, vous ferez référence à un simple fichier jar d’application Java doté de fonctionnalités permettant d’anonymiser les numéros de téléphone personnels. Vous pouvez également écrire du Python ou du Scala UDFs à utiliser dans le bloc-notes. Nous avons choisi un fichier jar d’application Java pour mettre en évidence la fonctionnalité d’importation d’un fichier jar d’application dans un bloc-notes Pyflink.

## Configuration de l’environnement
<a name="examples-migrating-to-kda-studio-setup"></a>

Pour suivre ce guide et interagir avec vos données de streaming, vous allez utiliser un AWS CloudFormation script pour lancer les ressources suivantes :
+ Flux de données sources et cibles Kinesis
+ Base de données Glue
+ Rôle IAM
+ Service géré pour l’application Apache Flink Studio
+ Fonction Lambda pour démarrer le service géré pour l’application Apache Flink Studio
+ Rôle Lambda pour exécuter la fonction Lambda précédente
+ Ressource personnalisée pour appeler la fonction Lambda

Téléchargez le CloudFormation modèle [ici](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml).

**Créez la CloudFormation pile**

1. Accédez à AWS Management Console et choisissez **CloudFormation**dans la liste des services.

1. Sur la **CloudFormation**page, choisissez **Stacks**, puis **Create Stack with new resources (standard).** 

1. Sur la page **Créer une pile**, choisissez **Télécharger un fichier modèle**, puis choisissez le fichier `kda-flink-udf.yml` que vous avez téléchargé précédemment. Chargez le fichier, puis sélectionnez **Suivant**. 

1. Donnez un nom au modèle, par exemple pour `kinesis-UDF` afin qu’il soit facile à mémoriser, et mettez à jour les paramètres d’entrée tels que input-stream si vous souhaitez un autre nom. Choisissez **Suivant**.

1. Sur la page **Configurer les options de pile**, ajoutez des **balises** si vous le souhaitez, puis choisissez **Next**.

1. Sur la page **Révision**, cochez les cases permettant la création de ressources IAM, puis choisissez **Envoyer**.

 Le lancement de la CloudFormation pile peut prendre de 10 à 15 minutes selon la région dans laquelle vous le lancez. Une fois que vous voyez le statut `CREATE_COMPLETE` pour l’ensemble de la pile, vous êtes prêt à continuer.

## Utilisation du bloc-notes du service géré pour Apache Flink Studio
<a name="examples-migrating-to-kda-studio-notebook"></a>

Les blocs-notes Studio pour Kinesis Data Analytics vous permettent d’interroger des flux de données de manière interactive en temps réel, et de créer et d’exécuter facilement des applications de traitement de flux à l’aide des normes SQL, Python et Scala. En quelques clics AWS Management Console, vous pouvez lancer un bloc-notes sans serveur pour interroger des flux de données et obtenir des résultats en quelques secondes.

Un bloc-notes est un environnement de développement basé sur le Web. Grâce aux blocs-notes, vous bénéficiez d’une expérience de développement interactive simple associée aux capacités avancées de traitement des flux de données fournies par Apache Flink. Les blocs-notes Studio utilisent des blocs-notes alimentés par Apache Zeppelin et utilisent Apache Flink comme moteur de traitement des flux. Les blocs-notes Studio combinent parfaitement ces technologies pour rendre les analyses avancées sur les flux de données accessibles aux développeurs de tous niveaux de compétences. 

 Apache Zeppelin fournit à vos blocs-notes Studio une suite complète d’outils d’analyse, y compris les outils suivants :
+ Visualisation des données
+ Exportation de données dans des fichiers
+ Contrôle du format de sortie pour une analyse simplifiée 

**Utilisation du bloc-notes**

1. Accédez au AWS Management Console et sélectionnez **Amazon Kinesis** dans la liste des services.

1. Dans le volet de navigation de gauche, choisissez **Applications d’analyse**, puis **Blocs-notes Studio**.

1. Vérifiez que le **KinesisDataAnalyticsStudio**bloc-notes fonctionne.

1. Sélectionnez le bloc-notes, puis choisissez **Ouvrir dans Apache Zeppelin**. 

1. Téléchargez le fichier [Data Producer Zeppelin Notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln) que vous utiliserez pour lire et charger des données dans le flux Kinesis. 

1.  Importez le bloc-notes Zeppelin `Data Producer`. Assurez-vous de modifier l’entrée `STREAM_NAME` et `REGION` dans le code du bloc-notes. Le nom du flux d’entrée se trouve dans la [sortie de pile CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). 

1. Exécutez le bloc-notes **Data Producer** en cliquant sur le bouton **Exécuter ce paragraphe** pour insérer des exemples de données dans le flux de données Kinesis d’entrée.

1. Pendant le chargement des exemples de données, téléchargez [MaskPhoneNumber-Interactive Notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln), qui lira les données d'entrée, anonymisera les numéros de téléphone du flux d'entrée et stockera les données anonymisées dans le flux de sortie.

1. Importez le bloc-notes Zeppelin `MaskPhoneNumber-interactive`. 

1. Exécutez chaque paragraphe du bloc-notes.

   1. Au paragraphe 1, vous importez une fonction définie par l'utilisateur pour anonymiser les numéros de téléphone.

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. Dans le paragraphe suivant, vous créez une table en mémoire pour lire les données du flux d’entrée. Assurez-vous que le nom du flux et AWS la région sont corrects. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. Vérifiez si les données sont chargées dans la table en mémoire. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. Invoquez la fonction définie par l’utilisateur pour anonymiser le numéro de téléphone. 

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Maintenant que les numéros de téléphone sont masqués, créez une vue avec un numéro masqué. 

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Vérifiez les données. 

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. Créez une table en mémoire pour le flux Kinesis en sortie. Assurez-vous que le nom du flux et AWS la région sont corrects. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. Insérez les enregistrements mis à jour dans le flux Kinesis cible. 

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. Affichez et vérifiez les données du flux Kinesis cible. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## Promotion d’un bloc-notes en tant qu’application
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

Maintenant que vous avez testé le code de votre bloc-notes de manière interactive, vous allez le déployer en tant qu’application de streaming durable. Vous devez d’abord modifier la configuration de l’application pour indiquer l’emplacement de votre code dans Amazon S3. 

1. Sur le AWS Management Console, choisissez votre bloc-notes et dans **Déployer en tant que configuration d'application (facultatif)**, sélectionnez **Modifier**. 

1. Sous **Destination du code dans Amazon S3**, choisissez le compartiment Amazon S3 créé par les [scripts CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml). Ce processus peut prendre quelques minutes.

1. Vous ne pourrez pas promouvoir la note telle quelle. Si vous essayez, vous obtenez une erreur car les instructions `Select` ne sont pas prises en charge. Pour éviter ce problème, téléchargez le carnet [Zeppelin MaskPhoneNumber -Streaming](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln).

1. Importez le bloc-notes Zeppelin `MaskPhoneNumber-streaming`.

1. Ouvrez la note et choisissez **Actions pour KinesisDataAnalyticsStudio**.

1. Choisissez **Build MaskPhoneNumber -Streaming et exportez vers S3**. Assurez-vous de modifier le **nom d’application** et de ne pas inclure de caractères spéciaux.

1. Choisissez **Générer et exporter**. La configuration de l’application de streaming prendra quelques minutes.

1.  Une fois la génération terminée, choisissez **Déployer à l’aide de la console AWS **. 

1.  Sur la page suivante, vérifiez les paramètres et assurez-vous de choisir le rôle IAM approprié. Ensuite, choisissez **Créer une application de streaming**. 

1. Après quelques minutes, vous verrez un message indiquant que l’application de streaming a été créée avec succès.

 Pour plus d’informations sur le déploiement d’applications à état durable et avec limitations, consultez [Déploiement en tant qu’application à état durable](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html). 

## Nettoyage
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

Si vous le souhaitez, vous pouvez à présent [désinstaller la pile CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html). Cela supprimera tous les services que vous avez configurés précédemment.