

Nach reiflicher Überlegung haben wir uns entschieden, Amazon Kinesis Data Analytics für SQL-Anwendungen einzustellen:

1. Ab dem **1. September 2025** werden wir keine Bugfixes für Amazon Kinesis Data Analytics for SQL-Anwendungen bereitstellen, da wir aufgrund der bevorstehenden Einstellung nur eingeschränkten Support dafür haben werden.

2. Ab dem **15. Oktober 2025** können Sie keine neuen Kinesis Data Analytics for SQL-Anwendungen mehr erstellen.

3. Wir werden Ihre Anwendungen ab dem **27. Januar 2026** löschen. Sie können Ihre Amazon Kinesis Data Analytics for SQL-Anwendungen nicht starten oder betreiben. Ab diesem Zeitpunkt ist kein Support mehr für Amazon Kinesis Data Analytics for SQL verfügbar. Weitere Informationen finden Sie unter [Einstellung von Amazon Kinesis Data Analytics für SQL-Anwendungen](discontinuation.md).

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Beispiel für die Migration zu einem Managed Service für Apache Flink Studio
<a name="migrating-to-kda-studio-overview"></a>

Nach reiflicher Überlegung haben wir die Entscheidung getroffen, Amazon Kinesis Data Analytics für SQL-Anwendungen einzustellen. Um Ihnen bei der Planung und Migration weg von Amazon Kinesis Data Analytics for SQL-Anwendungen zu helfen, werden wir das Angebot über einen Zeitraum von 15 Monaten schrittweise einstellen. Dies sind wichtige Daten, die es zu beachten gilt: **1. September 2025,** **15. Oktober 2025** und **27. Januar** 2026.

1. Ab dem **1. September 2025** werden wir keine Bugfixes für Amazon Kinesis Data Analytics for SQL-Anwendungen bereitstellen, da wir aufgrund der bevorstehenden Einstellung nur eingeschränkten Support dafür haben werden.

1. Ab dem **15. Oktober 2025** können Sie keine neuen Amazon Kinesis Data Analytics for SQL-Anwendungen mehr erstellen. 

1. Wir werden Ihre Anwendungen ab dem **27. Januar 2026** löschen. Sie können Ihre Amazon Kinesis Data Analytics for SQL-Anwendungen nicht starten oder betreiben. Ab diesem Zeitpunkt ist kein Support mehr für Amazon Kinesis Data Analytics for SQL-Anwendungen verfügbar. Weitere Informationen hierzu finden Sie unter [Einstellung von Amazon Kinesis Data Analytics für SQL-Anwendungen](discontinuation.md).

Wir empfehlen Ihnen, [Amazon Managed Service für Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html) zu verwenden. Es kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie Anwendungen zur Stream-Verarbeitung in wenigen Minuten erstellen können.

Dieser Abschnitt enthält Code- und Architekturbeispiele, die Ihnen helfen, Ihre Amazon Kinesis Data Analytics for SQL-Anwendungs-Workloads auf Managed Service for Apache Flink zu migrieren.

Weitere Informationen finden Sie auch in diesem [AWS Blogbeitrag: Migration von Amazon Kinesis Data Analytics for SQL Applications zu Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/).

## Replizieren von Kinesis Data Analytics for SQL-Abfragen in Managed Service für Apache Flink Studio
<a name="examples-migrating-to-kda-studio"></a>

Für die Migration Ihrer Workloads auf Managed Service für Apache Flink Studio oder Managed Service für Apache Flink finden Sie in diesem Abschnitt Abfrageübersetzungen, die Sie für allgemeine Anwendungsfälle verwenden können. 

Bevor Sie sich mit diesen Beispielen befassen, empfehlen wir Ihnen, zunächst den Artikel [Verwenden eines Studio-Notebooks mit einem Managed Service für Apache](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) Flink zu lesen. 

### Neuerstellung von Kinesis Data Analytics für SQL-Abfragen in Managed Service für Apache Flink Studio
<a name="examples-recreating-queries"></a>

Die folgenden Optionen bieten Übersetzungen gängiger SQL-basierter Kinesis Data Analytics Analytics-Anwendungsabfragen an Managed Service for Apache Flink Studio. 

#### Mehrschrittige Anwendung
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### DateTime Werte transformieren
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Einfache Warnungen
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### Gedrosselte Warnungen
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### Aggregieren von Teilergebnissen aus einer Abfrage
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### Umwandeln von Zeichenfolgewerten
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Ersetzen einer Teilzeichenfolge mit Regex
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Regex-Protokollanalyse
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### DateTime Werte transformieren
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Fenster und Aggregation
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### Rollierendes Fenster mit Rowtime
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### Die am häufigsten vorkommenden Werte werden abgerufen (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### Ungefähre Top-K-Artikel
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### Analysieren von Web-Protokollen (Funktion W3C\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### Aufteilen von Zeichenfolgen auf mehrere Felder (Funktion VARIABLE\$1COLUMN\$1LOG\$1PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Joins
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### Fehler
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Migration von Random Cut Forest-Workloads
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Wenn Sie Workloads, die Random Cut Forest verwenden, von Kinesis Analytics für SQL zu Managed Service für Apache Flink verschieben möchten, zeigt dieser [AWS -Blogbeitrag](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/), wie Sie Managed Service für Apache Flink verwenden, um einen Online-RCF-Algorithmus zur Erkennung von Anomalien auszuführen.

## Kinesis Data Firehose als Quelle durch Kinesis Data Streams ersetzen
<a name="examples-firehose"></a>

[Eine vollständige Anleitung finden Sie unter Converting-KDAsql-/. KDAStudio](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)

In der folgenden Übung ändern Sie Ihren Datenfluss, um Amazon-Managed-Service für Apache Flink Studio zu verwenden. Dies bedeutet auch, von Amazon-Kinesis-Data-Firehose zu Amazon-Kinesis-Data-Streams zu wechseln.

Zunächst stellen wir eine typische KDA-SQL-Architektur vor, bevor wir zeigen, wie Sie diese mithilfe von Amazon-Managed-Service für Apache Flink Studio und Amazon-Kinesis-Data-Streams ersetzen können. [Alternativ können Sie die Vorlage auch hier starten: CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)

### Amazon-Kinesis-Data-Analytics-SQL und Amazon-Kinesis-Data-Firehose
<a name="examples-firehose-legacy-setup"></a>

Hier ist der SQL-Architekturfluss von Amazon-Kinesis-Data-Analytics: 

![\[Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.\]](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/legacy-sql.png)


Wir untersuchen zunächst die Einrichtung von Amazon-Kinesis-Data-Analytics-SQL und Amazon-Kinesis-Data-Firehose. Der Anwendungsfall ist ein Handelsmarkt, auf dem Handelsdaten, einschließlich Börsenticker und Preise, aus externen Quellen in Amazon-Kinesis-Systeme gestreamt werden. Amazon Kinesis Data Analytics for SQL verwendet den Input-Stream, um Fensterabfragen wie Tumbling Window auszuführen, um das Handelsvolumen und den`min`,`max`, und `average` Handelspreis über ein einminütiges Fenster für jeden Börsenticker zu ermitteln.  

Amazon-Kinesis-Data-Analytics-SQL ist so eingerichtet, dass es Daten aus der Amazon-Kinesis-Data-Firehose-API aufnimmt. Nach der Verarbeitung sendet Amazon-Kinesis-Data-Analytics-SQL die verarbeiteten Daten an eine andere Amazon-Kinesis-Data-Firehose, die dann die Ausgabe in einem Amazon-S3-Bucket speichert. 

In diesem Fall verwenden Sie den Amazon-Kinesis-Datengenerator. Mit dem Amazon-Kinesis-Datengenerator können Sie Testdaten an Ihre Amazon-Kinesis-Data-Streams oder Amazon-Kinesis-Data-Firehose-Bereitstellungsstreams senden. [Um zu beginnen, folgen Sie den Anweisungen hier.](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) Verwenden Sie [hier](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) die CloudFormation Vorlage anstelle der in der [Anleitung angegebenen Vorlage:](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

Sobald Sie die CloudFormation Vorlage ausgeführt haben, enthält der Ausgabebereich die Amazon Kinesis Data Generator-URL. Melden Sie sich mit der Cognito-Benutzer-ID und dem Passwort, die Sie [hier](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) eingerichtet haben, beim Portal an. Wählen Sie die Region und den Namen des Zielstreams aus. Wählen Sie für den aktuellen Status die Bereitstellungsstreams von Amazon-Kinesis-Data-Firehose. Wählen Sie für den neuen Status den Namen des Amazon-Kinesis-Data-Firehose Streams. Sie können je nach Ihren Anforderungen mehrere Vorlagen erstellen und die Vorlage mithilfe der Schaltfläche **Vorlage testen** ausprobieren, bevor Sie sie an den Ziel-Stream senden.

Im Folgenden finden Sie ein Beispiel für eine Nutzlast mit Amazon-Kinesis-Datengenerator. Der Datengenerator zielt darauf ab, die eingegebenen Amazon-Kinesis-Firehose-Streams kontinuierlich zu streamen. Der Amazon-Kinesis-SDK-Client kann auch Daten von anderen Produzenten senden. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

Der folgende JSON-Code wird verwendet, um eine zufällige Reihe von Handelszeiten und -daten, Börsentickerdaten und Aktienkursen zu generieren:

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

Sobald Sie **Daten senden** auswählen, beginnt der Generator mit dem Senden von Mock-Daten.

Externe Systeme streamen die Daten an Amazon-Kinesis-Data-Firehose. Mit Amazon-Kinesis-Data-Analytics for SQL-Anwendungen können Sie Streaming-Daten mithilfe von Standard-SQL analysieren. Der Service ermöglicht die Erstellung und Ausführung von SQL-Code für Streaming-Quellen zum Durchführen von Zeitreihenanalysen, Füllen von Echtzeit-Dashboards und Erstellen von Echtzeitmetriken. Amazon-Kinesis-Data-Analytics for SQL-Anwendungen könnte einen Ziel-Stream aus SQL-Abfragen im Eingabe-Stream erstellen und den Ziel-Stream an eine andere Amazon-Kinesis-Data-Firehose senden. Das Ziel Amazon-Kinesis-Data-Firehose könnte die Analysedaten als Endzustand an Amazon-S3 senden.

Der Legacy-Code von Amazon-Kinesis-Data-Analytics-SQL basiert auf einer Erweiterung des SQL-Standards. 

Sie verwenden die folgende Abfrage in Amazon-Kinesis-Data-Analytics-SQL. Sie erstellen zunächst einen Ziel-Stream für die Abfrageausgabe. Dann verwenden Sie `PUMP`, ein Amazon-Kinesis-Data-Analytics-Repository-Objekt (eine Erweiterung des SQL-Standards), das eine kontinuierlich ablaufende `INSERT INTO stream SELECT ... FROM`-Abfragefunktion bietet und so die kontinuierliche Eingabe der Ergebnisse einer Abfrage in einen benannten Stream ermöglicht. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

Das vorangegangene SQL verwendet zwei Zeitfenster: `tradeTimestamp` das Zeitfenster stammt aus der Payload des eingehenden Streams und `ROWTIME.tradeTimestamp` wird auch oder genannt`Event Time`. `client-side time` Häufig ist es wünschenswert, diese Zeit in Analysen zu verwenden, da dies die Zeit ist, zu der ein Ereignis aufgetreten ist. Zahlreiche Ereignisquellen, wie Smartphones und Web-Clients, besitzen jedoch keine zuverlässigen Uhren, was zu ungenauen Zeiten führen kann. Zusätzlich können Konnektivitätsprobleme dazu führen, dass Datensätze in einem Stream nicht in der gleichen Reihenfolge angezeigt werden, in der sie aufgetreten sind. 

In-App-Streams enthalten außerdem eine spezielle Spalte namens `ROWTIME`. In dieser wird ein Zeitstempel gespeichert, wenn Amazon-Kinesis-Data-Analytics eine Zeile in den ersten In-Application-Stream einfügt. `ROWTIME` spiegelt den Zeitstempel wider, zu dem Amazon-Kinesis-Data-Analytics nach dem Lesen aus der Streaming-Quelle einen Datensatz in den ersten In-Application-Stream eingefügt hat. Dieser `ROWTIME`-Wert wird anschließend in der gesamten Anwendung beibehalten. 

Das SQL bestimmt die Anzahl der Ticker als`volume`, `min``max`, und den `average` Preis über ein 60-Sekunden-Intervall. 

Die Verwendung dieser Zeiten in Abfragen mit Fenstern auf Zeitbasis hat Vor- und Nachteile. Wählen Sie eine oder mehrere dieser Zeiten und entwickeln Sie eine Strategie für den Umgang mit den relevanten Nachteilen, abhängig von Ihrem Anwendungsfall. 

Eine Zwei-Fenster-Strategie mit zwei zeitbasierten Fenster verwendet sowohl `ROWTIME` als auch eine der beiden anderen Zeiten, beispielsweise die Ereigniszeit.
+ Sie sollten `ROWTIME` als erstes Fenster verwenden, das die Häufigkeit steuert, mit der die Abfrage die Ergebnisse ausgibt, wie im folgenden Beispiel gezeigt. Sie wird nicht als logische Zeit verwendet. 
+ Sie sollten eine der beiden anderen Zeiten als logische Zeit verwenden, um sie mit Ihren Analysen zu verknüpfen. Diese Zeit stellt den Zeitpunkt dar, zu dem das Ereignis aufgetreten ist. Im folgenden Beispiel besteht das Ziel der Analyse darin, die Datensätze zu gruppieren und eine Zahl nach Ticker zurückzugeben. 

### Amazon-Managed-Service für Apache Flink 
<a name="examples-studio"></a>

In der aktualisierten Architektur ersetzen Sie Amazon-Kinesis-Data-Firehose durch Amazon-Kinesis-Data-Streams . Amazon-Kinesis-Data-Analytics for SQL-Anwendungen wird durch Amazon-Managed-Service für Apache Flink Studio ersetzt. Apache Flink-Code wird interaktiv in einem Apache Zeppelin-Notebook ausgeführt. Amazon-Managed-Service für Apache Flink Studio sendet die aggregierten Handelsdaten an einen Amazon-S3-Bucket, um sie zu speichern. Die Schritte werden im Folgenden dargestellt:

Dies ist der Architekturfluss von Amazon-Managed-Service für Apache Flink Studio:

![\[Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.\]](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Erstellen Sie einen Kinesis Data Stream
<a name="examples-studio-create-data-stream"></a>

**So erstellen Sie einen Datenstrom mit der Konsole**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Erweitern Sie in der Navigationsleiste die Regionsauswahl und wählen Sie eine Region aus.

1. Klicken Sie auf **Create data stream (Daten-Stream erstellen)**.

1. Geben Sie auf der Seite **Kinesis-Stream erstellen** einen Namen für Ihren Datenstrom ein und wählen Sie dann den standardmäßigen Kapazitätsmodus **On-Demand**. 

   Im Modus **On-Demand** können Sie dann **Kinesis-Stream erstellen** wählen, um Ihren Datenstrom zu erstellen. 

   Auf der Seite **Kinesis streams (Kinesis-Streams)** wird für den Wert **Status** des Streams **Creating (Erstellen)** angezeigt, während der Stream erstellt wird. Sobald der Stream verwendet werden kann, ändert sich der Wert von **Status** in **Active (Aktiv)**.

1. Wählen Sie den Namen des Streams aus. Auf der Seite **Stream Details (Stream-Details)** wird eine Zusammenfassung der Stream-Konfiguration zusammen mit Überwachungsinformationen angezeigt.

1. **Ändern Sie im Amazon Kinesis Data Generator den Stream/delivery Stream in den neuen Amazon Kinesis Data Streams: TRADE\$1SOURCE\$1STREAM.**

   JSON und Nutzlast entsprechen denen, die Sie für Amazon-Kinesis-Data-Analytics-SQL verwendet haben. Verwenden Sie den Amazon-Kinesis-Datengenerator, um einige Beispiele für Handelsnutzdaten zu erstellen und verwenden Sie den **TRADE\$1SOURCE\$1STREAM**-Datenstrom als Ziel für diese Übung: 

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1.  AWS-Managementkonsole **Gehen Sie zu Managed Service for Apache Flink und wählen Sie dann Create application.**

1. Wählen Sie im Navigationsbereich links **Studio-Notebooks** aus und wählen Sie dann **Studio-Notebook erstellen**.

1. Geben Sie einen Namen für das Studio-Notebook ein.

1. Geben Sie unter **AWS -Glue-Datenbank** eine bestehende AWS Glue -Datenbank an, die die Metadaten für Ihre Quellen und Ziele definiert. Wenn Sie keine AWS Glue Datenbank haben, wählen Sie **Create** und gehen Sie wie folgt vor:

   1. Wählen Sie in der AWS Glue-Konsole im Menü auf der linken Seite unter **Datenkatalog** die Option **Datenbanken** aus.

   1. Wählen Sie **Datenbank erstellen** aus

   1. Geben Sie auf der Seite **Datenbank erstellen** einen Namen für die Datenbank ein. Wählen Sie im Abschnitt **Standort – optional** **Amazon-S3 durchsuchen** und dann den Amazon-S3-Bucket aus. Wenn noch keinen Amazon-S3-Bucket eingerichtet haben, können Sie diesen Schritt überspringen und später dazu zurückkehren.

   1. (Optional). Geben Sie eine Beschreibung für die Datenbank ein.

   1. Wählen Sie **Datenbank erstellen** aus.

1. Wählen Sie **Notebook erstellen** aus.

1. Sobald Ihr Notebook erstellt ist, wählen Sie **Ausführen**.

1. Sobald das Notebook erfolgreich gestartet wurde, starten Sie ein Zeppelin-Notebook, indem Sie **In Apache Zeppelin öffnen** wählen.

1. Wählen Sie auf der Seite Zeppelin-Notizbuch die Option Neue Notiz **erstellen** und geben Sie ihr einen Namen. *MarketDataFeed*

Der Flink-SQL-Code wird im Folgenden erklärt, aber zuerst einmal [sieht ein Zeppelin-Notebook-Bildschirm so aus](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). Jedes Fenster im Notebook ist ein separater Codeblock und diese können einzeln ausgeführt werden.

##### Code bei Amazon-Managed-Service für Apache Flink
<a name="examples-studio-code"></a>

Amazon-Managed-Service für Apache Flink Studio verwendet Zeppelin Notebooks, um den Code auszuführen. In diesem Beispiel erfolgt die Zuordnung zum SSQL-Code, der auf Apache Flink 1.13 basiert. Der Code im Zeppelin-Notizbuch wird im Folgenden, Block für Block, angezeigt.  

Bevor Sie Code in Ihrem Zeppelin Notebook ausführen, müssen die Flink-Konfigurationsbefehle ausgeführt werden. Wenn Sie nach dem Ausführen von Code (ssql, Python oder Scala) eine Konfigurationseinstellung ändern müssen, müssen Sie Ihr Notebook beenden und neu starten. In diesem Beispiel müssen Sie Checkpointing einrichten. Checkpointing ist erforderlich, damit Sie in Amazon-S3 Daten in eine Datei streamen können. Dadurch können Daten, die zu Amazon-S3 gestreamt werden, in eine Datei geleitet werden. Die folgende Anweisung legt das Intervall auf 5000 Millisekunden fest.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` gibt an, dass es sich bei diesem Block um Konfigurationsanweisungen handelt. [Weitere Informationen zur Flink-Konfiguration einschließlich Checkpointing finden Sie unter Apache Flink Checkpointing.](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/)  

Die Eingabetabelle für die Quelle Amazon Kinesis Data Streams wird mit dem folgenden Flink-SSQL-Code erstellt. Beachten Sie, dass das `TRADE_TIME` Feld die vom Datengenerator date/time erstellten Daten speichert.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

Sie können den Eingabestream mit dieser Anweisung anzeigen:

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Bevor Sie die aggregierten Daten an Amazon-S3 senden, können Sie sie direkt in Amazon-Managed-Service für Apache Flink Studio mit einer Auswahlabfrage im rollierenden Fenster anzeigen. Dadurch werden die Handelsdaten in einem Zeitfenster von einer Minute zusammengefasst. Beachten Sie, dass die %flink.ssql-Anweisung eine Bezeichnung (type=update) haben muss:

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Sie können dann in Amazon-S3 eine Tabelle für das Ziel erstellen. Sie müssen ein Wasserzeichen verwenden. Ein Wasserzeichen ist eine Fortschrittsmetrik, die einen Zeitpunkt angibt, zu dem Sie sicher sind, dass keine verzögerten Ereignisse mehr eintreten werden. Das Wasserzeichen wird benötigt, damit verspätete Ankünfte berücksichtigt werden. Das Intervall von `‘5’ Second` ermöglicht es Handelsaktionen mit 5-sekündiger Verspätung in den Amazon-Kinesis Data Stream einzutreten und trotzdem aufgenommen zu werden, wenn sie einen Zeitstempel haben, der innerhalb des Fensters liegt. Weitere Informationen finden Sie unter [Wasserzeichen generieren](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/).    

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

Diese Anweisung fügt die Daten in die `TRADE_DESTINATION_S3` ein. `TUMPLE_ROWTIME` ist der Zeitstempel der inklusiven Obergrenze des rollierenden Fensters.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Lassen Sie Ihre Anweisung 10 bis 20 Minuten lang laufen, um einige Daten in Amazon-S3 zu sammeln. Brechen Sie dann Ihre Anweisung ab. 

Dadurch wird die Datei in Amazon-S3 geschlossen, sodass sie angesehen werden kann. 

So sieht der Inhalt aus: 

![\[Financial data table showing stock prices and volumes for tech companies on March 1, 2023.\]](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


Sie können die [CloudFormation -Vorlage](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) verwenden, um die Infrastruktur zu erstellen. 

CloudFormation erstellt die folgenden Ressourcen in Ihrem AWS Konto:
+  Amazon-Kinesis-Data-Streams 
+  Amazon-Managed-Service für Apache Flink
+  AWS Glue Datenbank
+  Amazon-S3-Bucket
+  IAM-Rollen und Richtlinien für den Zugriff auf geeignete Ressourcen durch Amazon-Managed-Service für Apache Flink Studio

Importieren Sie das Notizbuch und ändern Sie den Namen des Amazon S3 S3-Buckets durch den neuen Amazon S3 S3-Bucket, der von erstellt wurde CloudFormation. 

![\[SQL code snippet creating a table with timestamp, ticker, volume, and price fields.\]](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### Weitere Informationen
<a name="more"></a>

Im Folgenden finden Sie einige zusätzliche Ressourcen, mit denen Sie mehr über die Verwendung von Managed Service für Apache Flink Studio erfahren können: 
+ [Entwicklerhandbuch für Managed Service für Apache Flink Studio-Notebooks](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Dokumentation für Apache Flink 1.13](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Workshop zu Managed Service für Apache Flink Studio](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Apache Flink Windowing](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Entwicklerhandbuch für Amazon-Kinesis-Data-Analytics – Aus einem Kinesis Data Analytics Stream in einen S3 Bucket schreiben](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 

# Nutzung benutzerdefinierter Funktionen () UDFs
<a name="examples-migrating-to-kda-studio-leveraging-udfs"></a>

Der Zweck des Musters besteht darin, zu demonstrieren, wie Zeppelin-Notebooks UDFs in Kinesis Data Analytics-Studio für die Verarbeitung von Daten im Kinesis-Stream genutzt werden können. Managed Service for Apache Flink Studio verwendet Apache Flink, um erweiterte Analysefunktionen bereitzustellen, darunter Semantik zur Exact-Once-Verarbeitung, Ereigniszeitfenster, Erweiterbarkeit durch benutzerdefinierte Funktionen und Kundenintegrationen, Unterstützung für wichtige Sprachen, dauerhaften Anwendungsstatus, horizontale Skalierung, Unterstützung mehrerer Datenquellen, erweiterbare Integrationen und mehr. Diese sind entscheidend für die Sicherstellung der Genauigkeit, Vollständigkeit, Konsistenz und Zuverlässigkeit der Verarbeitung von Datenströmen und sie sind in Amazon-Kinesis-Data-Analytics for SQL nicht verfügbar.

In dieser Beispielanwendung zeigen wir, wie Sie das Zeppelin-Notebook von KDA-Studio für die Verarbeitung von Daten im Kinesis-Stream nutzen UDFs können. Mit Studio-Notebooks für Kinesis Data Analytics können Sie Datenströme interaktiv in Echtzeit abfragen und auf einfache Weise Streamverarbeitungsanwendungen mit Standard-SQL, Python und Scala erstellen und ausführen. Mit ein paar Klicks können Sie ein serverloses Notebook starten AWS-Managementkonsole, um Datenströme abzufragen und innerhalb von Sekunden Ergebnisse zu erhalten. Weitere Informationen finden Sie unter [Verwenden eines Studio-Notebooks mit Kinesis Data Analytics for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html). 

Lambda-Funktionen, die für die pre/post Verarbeitung von Daten in KDA-SQL-Anwendungen verwendet werden:

![\[alt text not found\]](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/sql-udf-1.png)


Benutzerdefinierte Funktionen für die pre/post Verarbeitung von Daten mit KDA-Studio Zeppelin-Notebooks

![\[alt text not found\]](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/sql-udf.png)


## Benutzerdefinierte Funktionen () UDFs
<a name="examples-migrating-to-kda-studio-udfs"></a>

Um gängige Geschäftslogik in einem Operator wiederzuverwenden, kann es nützlich sein, auf eine benutzerdefinierte Funktion zu verweisen, um Ihren Datenstrom zu transformieren. Dies kann entweder innerhalb des Managed Service für Apache Flink Studio-Notebooks oder als extern referenzierte Anwendungs-JAR-Datei erfolgen. Die Verwendung benutzerdefinierter Funktionen kann die Transformationen oder Datenanreicherungen vereinfachen, die Sie möglicherweise bei Streaming-Daten durchführen.

 In Ihrem Notebook verweisen Sie auf eine einfache Java-Anwendungsdatei, die Funktionen zur Anonymisierung privater Telefonnummern bietet. Sie können auch Python oder Scala UDFs zur Verwendung im Notizbuch schreiben. Wir haben uns für ein Anwendungs-Jar in Java entschieden, um die Funktionalität des Imports einer Anwendungs-Jar in ein Pyflink-Notebook hervorzuheben.

## Einrichtung der Umgebung
<a name="examples-migrating-to-kda-studio-setup"></a>

Um dieser Anleitung zu folgen und mit Ihren Streaming-Daten zu interagieren, verwenden Sie ein AWS CloudFormation Skript, um die folgenden Ressourcen zu starten:
+ Kinesis Data Streams als Quelle und Ziel
+ Glue-Datenbank
+ IAM-Rolle
+ Managed Service für Apache Flink-Anwendung
+ Lambda-Funktion zum Starten der Managed Service für Apache Flink Studio-Anwendung
+ Lambda-Rolle zur Ausführung der vorherigen Lambda-Funktion
+ Benutzerdefinierte Ressource zum Aufrufen der Lambda-Funktion

[Laden Sie die CloudFormation Vorlage hier herunter.](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml)

**Erstellen Sie den CloudFormation Stapel**

1. Gehen Sie zu AWS-Managementkonsole und wählen Sie **CloudFormation**unter der Liste der Dienste aus.

1. Wählen Sie auf der **CloudFormation**Seite **Stacks** und dann **Create Stack with new resources (Standard)** aus. 

1. Wählen Sie auf der Seite **Stack erstellen** die Option **Eine Vorlagendatei hochladen** und dann die Datei `kda-flink-udf.yml` aus, die Sie zuvor heruntergeladen haben. Laden Sie die Datei hoch und wählen Sie **Weiter**. 

1. Geben Sie der Vorlage einen Namen wie zum Beispiel `kinesis-UDF`, damit Sie sich diesen leicht merken können, und aktualisieren Sie Eingabeparameter wie den Eingabe-Stream, falls Sie einen anderen Namen wünschen. Wählen Sie **Weiter** aus.

1. Fügen Sie auf der Seite „**Stack-Optionen konfigurieren**“ bei Bedarf **Tags** hinzu und wählen Sie dann **Weiter**.

1. Markieren Sie auf der Seite **Überprüfen** die Kästchen, die die Erstellung von IAM-Ressourcen ermöglichen, und wählen Sie dann **Absenden** aus.

 Der Start des CloudFormation Stacks kann je nach Region, in der Sie starten, 10 bis 15 Minuten dauern. Sobald Sie den `CREATE_COMPLETE`-Status für den gesamten Stack sehen, können Sie fortfahren.

## Arbeiten mit einem Managed Service für Apache Flink Studio Notebook
<a name="examples-migrating-to-kda-studio-notebook"></a>

Studio-Notebooks für Kinesis Data Analytics ermöglichen Ihnen die interaktive Abfrage von Datenströmen in Echtzeit und die einfache Erstellung und Ausführung von Stream-Verarbeitungsanwendungen mit Standard-SQL, Python und Scala. Mit ein paar Klicks können Sie ein serverloses Notebook starten AWS-Managementkonsole, um Datenströme abzufragen und innerhalb von Sekunden Ergebnisse zu erhalten.

Ein Notebook ist eine webbasierte Entwicklungsumgebung. Notebooks bieten ein einfaches interaktives Entwicklungserlebnis in Kombination mit den fortschrittlichen Datenstromverarbeitungsfunktionen von Apache Flink. Studio-Notebooks verwenden Notebooks, die mit Apache Zeppelin betrieben werden, und verwenden Apache Flink als Stream-Verarbeitungs-Engine. Studio-Notebooks kombinieren diese Technologien nahtlos, um Entwicklern aller Qualifikationsstufen erweiterte Analysen von Datenströmen zugänglich zu machen. 

 Apache Zeppelin bietet für Ihre Studio-Notebooks eine komplette Suite von Analysetools, darunter die folgenden:
+ Datenvisualisierung
+ Exportieren der Daten in Dateien
+ Kontrolle über das Ausgabeformat zur Erleichterung von Analysen 

**Verwendung des Notebooks**

1. Gehen Sie zu AWS-Managementkonsole und wählen Sie **Amazon Kinesis** unter der Liste der Dienste aus.

1. Wählen Sie auf der linken Navigationsseite **Analytics-Anwendungen** und dann **Studio-Notebooks** aus.

1. Stellen Sie sicher, dass das **KinesisDataAnalyticsStudio**Notebook läuft.

1. Wählen Sie das Notizbuch und dann **In Apache Zeppelin öffnen** aus. 

1. Laden Sie die Datei [Datenproduzent Zeppelin-Notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/Data%20Producer.zpln) herunter, mit der Sie Daten lesen und in den Kinesis Stream laden werden. 

1.  Importieren Sie das Zeppelin-Notebook namens `Data Producer`. Achten Sie darauf, die Eingabe-`STREAM_NAME` und -`REGION` im Code des Notebooks zu ändern. Der Name des Eingabestreams ist in der [CloudFormation -Stack-Ausgabe](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml) zu finden. 

1. Führen Sie das **Datenproduzenten-Notebook** aus, indem Sie auf die Schaltfläche **Diesen Absatz ausführen** klicken, um Beispieldaten in die Eingabe des Kinesis Data Streams einzufügen.

1. Laden Sie beim Laden der Beispieldaten [MaskPhoneNumber-Interactive notebook](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-interactive.zpln) herunter. Dieses Programm liest Eingabedaten, anonymisiert Telefonnummern aus dem Eingabestream und speichert anonymisierte Daten im Ausgabestrom.

1. Importieren Sie das `MaskPhoneNumber-interactive`-Zeppelin-Notizbuch. 

1. Führen Sie jeden Absatz im Notebook aus.

   1. In Absatz 1 importieren Sie eine benutzerdefinierte Funktion zur Anonymisierung von Telefonnummern.

      ```
      %flink(parallelism=1)
      import com.mycompany.app.MaskPhoneNumber
      stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
      ```

   1. Im nächsten Absatz erstellen Sie eine speicherinterne Tabelle zum Lesen von Eingabestreamdaten. Stellen Sie sicher, dass der Streamname und die AWS Region korrekt sind. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews;
      
      CREATE TABLE customer_reviews (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phone VARCHAR
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleInputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json');
      ```

   1. Überprüfen Sie, ob Daten in die speicherinterne Tabelle geladen werden. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews
      ```

   1. Rufen Sie die benutzerdefinierte Funktion auf, um die Telefonnummer zu anonymisieren. 

      ```
      %flink.ssql(type=update)
      select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Nachdem die Telefonnummern maskiert sind, erstellen Sie eine Ansicht mit einer maskierten Nummer. 

      ```
      %flink.ssql(type=update)
      
      DROP VIEW IF EXISTS sentiments_view;
      
      CREATE VIEW  
          sentiments_view
      AS
        select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
      ```

   1. Überprüfen Sie die Daten. 

      ```
      %flink.ssql(type=update)
      select * from sentiments_view
      ```

   1. Erstellen Sie eine speicherinterne Tabelle für die Kinesis-Stream-Ausgabe. Stellen Sie sicher, dass Streamname und AWS Region korrekt sind. 

      ```
      %flink.ssql(type=update)
      
      DROP TABLE IF EXISTS customer_reviews_stream_table;
      
      CREATE TABLE customer_reviews_stream_table (
      customer_id VARCHAR,
      product VARCHAR,
      review VARCHAR,
      phoneNumber varchar 
      )
      WITH (
      'connector' = 'kinesis',
      'stream' = 'KinesisUDFSampleOutputStream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'TRIM_HORIZON',
      'format' = 'json');
      ```

   1. Fügen Sie aktualisierte Datensätze in den Ziel-Kinesis Stream ein. 

      ```
      %flink.ssql(type=update)
      INSERT INTO customer_reviews_stream_table
      SELECT customer_id, product, review, phoneNumber
      FROM sentiments_view
      ```

   1. Sichten und überprüfen Sie Daten aus dem Ziel-Kinesis Stream. 

      ```
      %flink.ssql(type=update)
      select * from customer_reviews_stream_table
      ```

## Werbung für ein Notebook als Anwendung
<a name="examples-migrating-to-kda-studio-notebook-promoting"></a>

Nachdem Sie jetzt Ihren Notebookcode interaktiv getestet haben, stellen Sie ihn als Streaming-Anwendung mit dauerhaftem Zustand bereit. Sie müssen zuerst die Anwendungskonfiguration ändern, um einen Speicherort für Ihren Code in Amazon-S3 anzugeben. 

1. Wählen Sie auf dem AWS-Managementkonsole Ihr Notebook aus und wählen Sie unter **Als Anwendungskonfiguration bereitstellen — optional** die Option **Bearbeiten** aus. 

1. Wählen Sie unter **Ziel für Code in Amazon-S3** den Amazon-S3-Bucket aus, der durch die [CloudFormation -Skripte](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml) erstellt wurde. Der Vorgang kann einige Minuten dauern.

1. Sie können die Notiz in ihrer vorliegenden Form nicht bewerben. Wenn Sie dies versuchen, erhalten Sie eine Fehlermeldung, da `Select`-Anweisungen nicht unterstützt werden. Um dieses Problem zu vermeiden, laden Sie das [MaskPhoneNumber-Streaming Zeppelin](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/notebooks/MaskPhoneNumber-Streaming.zpln) Notebook herunter.

1. Importieren Sie das `MaskPhoneNumber-streaming`-Zeppelin-Notizbuch.

1. **Öffnen Sie die Notiz und wählen Sie Aktionen für. KinesisDataAnalyticsStudio**

1. Wähle **Build MaskPhoneNumber -Streaming und exportiere nach S3**. Achten Sie darauf, den **Anwendungsnamen** umzubenennen und keine Sonderzeichen zu verwenden.

1. Wählen Sie **Erstellen und Exportieren**. Die Einrichtung der Streaming-Anwendung dauert einige Minuten.

1.  Sobald der Build abgeschlossen ist, wählen Sie **Bereitstellen mit der AWS -Konsole**. 

1.  Überprüfen Sie auf der nächsten Seite die Einstellungen und stellen Sie sicher, dass Sie die richtige IAM-Rolle auswählen. Wählen Sie als Nächstes **Streaming-Anwendung erstellen**. 

1. Nach einigen Minuten wird die Meldung angezeigt, dass die Streaming-Anwendung erfolgreich erstellt wurde.

 Weitere Informationen zur Bereitstellung von Anwendungen mit dauerhaftem Zustand und Grenzwerten finden Sie unter [Bereitstellen als Anwendung mit dauerhaftem Zustand](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook-durable.html). 

## Bereinigen
<a name="examples-migrating-to-kda-studio-notebook-cleanup"></a>

Optional können Sie jetzt [den CloudFormation -Stack deinstallieren](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-delete-stack.html). Dadurch werden alle Dienste entfernt, die Sie zuvor eingerichtet haben.