-
What do we show in this demo
- Flink SQL processing data from different storage systems
- Flink SQL using Hive Metastore as an external, persistent catalog
- Batch/Stream unification of queries in action
- Different ways to join dynamic data
- Creating Tables with DDL
-
Scenario is an online store receiving orders.
-
The order system consists of six tables (derived from the well-known TPC-H benchmark)
- orders: one row for each order
- lineitem: individual items of an order
- customer: customer data
- nation: denormalized nation data
- region: denormalized region data
- rates: exchange rates for currencies
Depending on their update characteristic (frequency, insert-only) tables are stored in different systems:
- Kafka: orders, linetimes, rates
- MySQL: customer, nation, region
Please download the TPCH demo data from Google Drive and extract the zip archive into the ./data
folder (as ./data/*.tbl
).
https://drive.google.com/file/d/15LWUBGZenWaW3R_WNxqvcTQOuA_Kgtjv
The folder will be mounted by the Docker containers.
docker-compose build
docker-compose up -d
The demo environment consists of the following components.
- Show Kafka data streams
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic orders
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic lineitem
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic rates
- Show data in MySQL
docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
SHOW TABLES;
DESCRIBE PROD_CUSTOMER;
SELECT * FROM PROD_CUSTOMER LIMIT 10;
quit;
- No files here yet
- Show in Web UI: http://localhost:9000 (user:
sql-demo
, password:demo-sql
)
- Queries are executed on a FLink cluster
- Show Web UI: http://localhost:8081
- SQL client provides SQL environment and submits queries to Flink for execution
- Hive Metastore presistently stores catalog data
docker-compose exec sql-client ./sql-client.sh
All dynamic (Kafka-backed) tables are in the default catalog:
SHOW TABLES;
DESCRIBE prod_orders;
SELECT * FROM prod_orders;
All static (MySQL-backed) tables are in the Hive catalog:
USE CATALOG hive;
SHOW TABLES;
DESCRIBE prod_nation;
SELECT * FROM prod_nation;
- Flink SQL unifies batch and stream processing.
- Many queries can be executed on static and dynamic table.
- Create a table backed by a file in S3.
CREATE TABLE dev_orders (
o_orderkey INTEGER,
o_custkey INTEGER,
o_orderstatus STRING,
o_totalprice DOUBLE,
o_currency STRING,
o_ordertime TIMESTAMP(3),
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INTEGER,
o_comment STRING,
WATERMARK FOR o_ordertime AS o_ordertime - INTERVAL '5' MINUTE
) WITH (
'connector.type' = 'filesystem',
'connector.path' = 's3://sql-demo/orders.tbl',
'format.type' = 'csv',
'format.field-delimiter' = '|'
);
- Write a sample of a dynamic streaming table to the static table.
INSERT INTO dev_orders SELECT * FROM default_catalog.default_database.prod_orders;
-
Go to Flink UI and show number of processed records.
-
Manually cancel the job
-
Show file in Minio: http://localhost:9000
-
Show data in SQL client
SELECT * FROM dev_orders;
SELECT COUNT(*) AS rowCnt FROM dev_orders;
- Using the batch engine for better efficieny
SET execution.type=batch;
- Compute revenue per currency and minute
SELECT
CEIL(o_ordertime TO MINUTE) AS `minute`,
o_currency AS `currency`,
SUM(o_totalprice) AS `revenue`,
COUNT(*) AS `orderCnt`
FROM dev_orders
GROUP BY
o_currency,
CEIL(o_ordertime TO MINUTE);
- Run the same in streaming
SET execution.type=streaming;
- Show why we should streamify the query
SET execution.result-mode=changelog;
- Reset to previous settings
SET execution.result-mode=table;
SET execution.type=batch;
- We can streamify the query a bit with a TUMBLE window
SELECT
TUMBLE_END(o_ordertime, INTERVAL '1' MINUTE) AS `minute`,
o_currency AS `currency`,
SUM(o_totalprice) AS `revenue`,
COUNT(*) AS `orderCnt`
FROM dev_orders
GROUP BY
o_currency,
TUMBLE(o_ordertime, INTERVAL '1' MINUTE);
- Query is still executed with the batch engine
- We can run the same query on the dynamic table
SET execution.type=streaming;
- We only change the table in the FROM clause
SELECT
TUMBLE_END(o_ordertime, INTERVAL '1' MINUTE) AS `minute`,
o_currency AS `currency`,
SUM(o_totalprice) AS `revenue`,
COUNT(*) AS `orderCnt`
FROM default_catalog.default_database.prod_orders
GROUP BY
o_currency,
TUMBLE(o_ordertime, INTERVAL '1' MINUTE);
- Dynamic tables can be joined just like static tables.
- However, joins can be inefficient, if you ignore the dynamic property of tables.
- There are common patterns to join that can be efficiently executed with low resource consumption.
SET execution.type=batch;
- show customers and their orders by region and priority for a specific day
- query joins the snapshot of the orders table
SELECT
r_name AS `region`,
o_orderpriority AS `priority`,
COUNT(DISTINCT c_custkey) AS `number_of_customers`,
COUNT(o_orderkey) AS `number_of_orders`
FROM dev_orders
JOIN prod_customer ON o_custkey = c_custkey
JOIN prod_nation ON c_nationkey = n_nationkey
JOIN prod_region ON n_regionkey = r_regionkey
WHERE
FLOOR(o_ordertime TO DAY) = TIMESTAMP '2020-04-01 0:00:00.000'
AND NOT o_orderpriority = '4-NOT SPECIFIED'
GROUP BY r_name, o_orderpriority
ORDER BY r_name, o_orderpriority;
- We can run the same query on the
prod_orders
table - Change
dev_orders
toprod_orders
- Remove
ORDER BY
clause
SET execution.type=streaming;
SELECT
r_name AS `region`,
o_orderpriority AS `priority`,
COUNT(DISTINCT c_custkey) AS `number_of_customers`,
COUNT(o_orderkey) AS `number_of_orders`
FROM default_catalog.default_database.prod_orders
JOIN prod_customer ON o_custkey = c_custkey
JOIN prod_nation ON c_nationkey = n_nationkey
JOIN prod_region ON n_regionkey = r_regionkey
WHERE
FLOOR(o_ordertime TO DAY) = TIMESTAMP '2020-04-01 0:00:00.000'
AND NOT o_orderpriority = '4-NOT SPECIFIED'
GROUP BY r_name, o_orderpriority;
- Notable properties of this join
- The batch tables are read just once. Updates are not considered!
- All input tables are completely materialized in state.
-
A common requirement is to join events of two (or more) dynamic tables that are related with each other in a temporal context, for example events that happened around the same time.
-
Flink SQL features special optimizations for such joins.
-
First switch to the default catalog (which contains all dynamic tables)
USE CATALOG default_catalog;
- Join
prod_lineitem
andprod_orders
to compute open lineitems with urgent order priority.
SELECT
o_ordertime AS `ordertime`,
o_orderkey AS `order`,
l_linenumber AS `linenumber`,
l_partkey AS `part`,
l_suppkey AS `supplier`,
l_quantity AS `quantity`
FROM prod_lineitem
JOIN prod_orders ON o_orderkey = l_orderkey
WHERE
l_ordertime BETWEEN o_ordertime - INTERVAL '5' MINUTE AND o_ordertime AND
l_linestatus = 'O' AND
o_orderpriority = '1-URGENT';
- Notable properties of the join
- Event-time semantics (processing time also supported)
- Small state requirements. Only the 5 minute tails of the streams need to be held in state.
-
Enriching an insert-only, dynamic table with data from a static or slowly changing table
-
Example query: enrich
prod_lineitem
table with current exchange rate fromprod_rates
table to compute EUR-normalize amounts.
SELECT
l_proctime AS `querytime`,
l_orderkey AS `order`,
l_linenumber AS `linenumber`,
l_currency AS `currency`,
rs_rate AS `cur_rate`,
(l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM prod_lineitem
JOIN hive.`default`.prod_rates FOR SYSTEM_TIME AS OF l_proctime ON rs_symbol = l_currency
WHERE
l_linestatus = 'O';
- Check
PROD_RATES
table in MySQL and update a rate
docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
SELECT * FROM PROD_RATES;
UPDATE PROD_RATES SET RS_TIMESTAMP = '2020-04-01 01:00:00.000', RS_RATE = 1.234 WHERE RS_SYMBOL='USD';
- Notable properties of the join
- Processing time semantics: rates are looked up from MySQL table as of the processing time of a row
- MySQL table can be updated while the job is running
-
Same use case as before.
-
Instead of looking up rates from MySQL, we ingest updates from another Kafka table.
-
Kafka table is accessed via a TemporalTableFunction
prod_rates_temporal
which looks up the most-recent exchange rate for a currency.
SELECT
l_ordertime AS `ordertime`,
l_orderkey AS `order`,
l_linenumber AS `linenumber`,
l_currency AS `currency`,
rs_rate AS `cur_rate`,
(l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM
prod_lineitem,
LATERAL TABLE(prod_rates_temporal(l_ordertime))
WHERE rs_symbol = l_currency AND
l_linestatus = 'O';
- Notable properties of the join
- Event time semantics: rates are looked up from temporal table (backed by a Kafka topic) as of the order time of the item.
- Rates change as records are produced into Kafka topic
-
Matching patterns on new data with low latency is a common use case for stream processing
-
SQL:2016 introduced the MATCH RECOGNIZE clause to match patterns on tables
-
The combination of MATCH RECOGNIZE and dynamic tables is very powerful
-
Flink supports MATCH RECOGNIZE since several releases
-
Find customers that have changed their delivery behavior.
- Search for a pattern where the last x lineitems had regular shippings
- But now the customer whats to pay on delivery (collect on delivery = CoD)
CREATE VIEW lineitem_with_customer AS
SELECT * FROM (
SELECT
o_custkey AS `custkey`,
l_orderkey AS `orderkey`,
l_linenumber AS `linenumber`,
l_ordertime AS `ordertime`,
l_shipinstruct AS `shipinstruct`
FROM prod_lineitem
JOIN prod_orders ON o_orderkey = l_orderkey
WHERE
l_ordertime BETWEEN o_ordertime - INTERVAL '5' MINUTE AND o_ordertime
);
SELECT *
FROM lineitem_with_customer
MATCH_RECOGNIZE(
PARTITION BY custkey
ORDER BY ordertime
MEASURES
COUNT(OTHER.orderkey) AS `last regular shipings`,
LAST(OTHER.orderkey) AS `last regular order`,
LAST(OTHER.linenumber) AS `last regular linenumber`,
COD.orderkey AS `cod order`,
COD.linenumber AS `cod linenumber`
PATTERN (OTHER{5,} COD)
DEFINE
OTHER AS NOT shipinstruct = 'COLLECT COD',
COD AS shipinstruct = 'COLLECT COD'
);
- Continuous queries on dynamic tables update their result with low latency.
- Results can be maintained in external systems similar to materialized views.
- Let's write some data back to Kafka
- We use the window aggregation query that we've discussed before
USE CATALOG default_catalog;
SELECT
TUMBLE_END(o_ordertime, INTERVAL '1' MINUTE) AS `minute`,
o_currency AS `currency`,
SUM(o_totalprice) AS `revenue`,
COUNT(*) AS `orderCnt`
FROM default_catalog.default_database.prod_orders
GROUP BY
o_currency,
TUMBLE(o_ordertime, INTERVAL '1' MINUTE);
- Create a Kafka-backed table
minute_stats
with JSON encoding
CREATE TABLE minute_stats (
`minute` TIMESTAMP(3),
`currency` STRING,
`revenueSum` DOUBLE,
`orderCnt` BIGINT,
WATERMARK FOR `minute` AS `minute` - INTERVAL '10' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'minute_stats',
'connector.properties.zookeeper.connect' = 'not-needed',
'connector.properties.bootstrap.servers' = 'kafka:9092',
'connector.startup-mode' = 'earliest-offset',
'format.type' = 'json'
);
-
Automatic topic creation is enabled in Kafka
-
Write query result into table
INSERT INTO minute_stats
SELECT
TUMBLE_END(o_ordertime, INTERVAL '1' MINUTE) AS `minute`,
o_currency AS `currency`,
SUM(o_totalprice) AS `revenue`,
COUNT(*) AS `orderCnt`
FROM default_catalog.default_database.prod_orders
GROUP BY
o_currency,
TUMBLE(o_ordertime, INTERVAL '1' MINUTE);
- Check results in Kafka
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic minute_stats
- We would like to store and mainain the result of the following query in MySQL.
USE CATALOG hive;
SELECT
r_name AS `region`,
HOUR(o_ordertime) AS `hour_of_day`,
COUNT(DISTINCT c_custkey) AS `number_of_customers`,
COUNT(o_orderkey) AS `number_of_orders`
FROM (
SELECT CAST(o_ordertime AS TIMESTAMP) o_ordertime, o_orderkey, o_custkey
FROM default_catalog.default_database.prod_orders)
JOIN prod_customer ON o_custkey = c_custkey
JOIN prod_nation ON c_nationkey = n_nationkey
JOIN prod_region ON n_regionkey = r_regionkey
GROUP BY
r_name,
HOUR(o_ordertime);
- Create a backing table in MySQL
docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
CREATE TABLE REGION_STATS (
region VARCHAR(20) NOT NULL,
hour_of_day BIGINT NOT NULL,
number_of_customers BIGINT NOT NULL,
number_of_orders BIGINT NOT NULL,
PRIMARY KEY (region, hour_of_day));
- Create a table in Flink
CREATE TABLE region_stats (
region STRING,
hour_of_day BIGINT,
number_of_customers BIGINT,
number_of_orders BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://mysql:3306/sql-demo',
'connector.table' = 'REGION_STATS',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sql-demo',
'connector.password' = 'demo-sql',
'connector.write.flush.interval' = '2s',
'connector.write.flush.max-rows' = '1000'
);
-
The table is configured to emit updates every second (or every 1000 updates).
-
Write query result to MySQL
INSERT INTO region_stats
SELECT
r_name AS `region`,
HOUR(o_ordertime) AS `hour_of_day`,
COUNT(DISTINCT c_custkey) AS `number_of_customers`,
COUNT(o_orderkey) AS `number_of_orders`
FROM (
SELECT CAST(o_ordertime AS TIMESTAMP) o_ordertime, o_orderkey, o_custkey
FROM default_catalog.default_database.prod_orders)
JOIN prod_customer ON o_custkey = c_custkey
JOIN prod_nation ON c_nationkey = n_nationkey
JOIN prod_region ON n_regionkey = r_regionkey
GROUP BY
r_name,
HOUR(o_ordertime);
- Monitor query result in MySQL
docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
SELECT * FROM REGION_STATS ORDER BY hour_of_day, region;