The slides that accompany this demo can be found here: https://speakerdeck.com/rmoff/no-more-silos-integrating-databases-and-apache-kafka
Start the environment
cd docker-compose
./scripts/setup.sh
Verify that the Xstream capture process is running:
docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus sys/Admin123@//localhost:1521/ORCLCDB as sysdba'
COLUMN ACTION HEADING 'XStream Component' FORMAT A30
COLUMN SID HEADING 'Session ID' FORMAT 99999
COLUMN SERIAL# HEADING 'Session|Serial|Number' FORMAT 99999999
COLUMN PROCESS HEADING 'Operating System|Process ID' FORMAT A17
COLUMN PROCESS_NAME HEADING 'XStream|Program|Name' FORMAT A7
SELECT /*+PARAM('_module_action_old_length',0)*/ ACTION,
SID,
SERIAL#,
PROCESS,
SUBSTR(PROGRAM,INSTR(PROGRAM,'(')+1,4) PROCESS_NAME
FROM V$SESSION
WHERE MODULE ='XStream';
Expected output:
Session XStream
Serial Operating System Program
XStream Component Session ID Number Process ID Name
------------------------------ ---------- --------- ----------------- -------
DBZXOUT - Apply Reader 9 6842 3380 AS01
DBZXOUT - Apply Server 16 35968 1 TNS
DBZXOUT - Apply Server 145 13290 3382 AS02
CAP$_DBZXOUT_1 - Capture 270 62826 3384 CP01
DBZXOUT - Apply Coordinator 396 27273 3378 AP01
DBZXOUT - Propagation Send/Rcv 399 6031 3386 CX01
6 rows selected.
If you don’t see these running (particularly the Capture
) then refer to debezium-xstream-system-output.adoc
Check that Kafka Connect is running:
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
Check that required connectors are loaded
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'OracleConnector|JdbcSourceConnector'
"io.confluent.connect.jdbc.JdbcSourceConnector"
"io.debezium.connector.oracle.OracleConnector"
Optionally, use something like screen
or tmux
to have these both easily to hand. Or multiple Terminal tabs. Whatever works for you :)
-
ksqlDB CLI:
docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
-
SQL*Plus
docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus Debezium/dbz@localhost:1521/ORCLPDB1'
(the
sleep
is necessary to avoidrlwrap: error: My terminal reports width=0 (is it emacs?) I can’t handle this, sorry!
ref) -
Oracle log
docker logs -f oracle
-
Debezium log
docker logs -f connect-debezium
COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CREATE_TS FOR A29
COL UPDATE_TS FOR A29
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, CREATE_TS, UPDATE_TS FROM CUSTOMERS;
ID FIRST_NAME LAST_NAME CREATE_TS UPDATE_TS
---- --------------- --------------- ----------------------------- -----------------------------
1 Rica Blaisdell 04-DEC-18 08.22.32.933376 PM 04-DEC-18 08.22.32.000000 PM
2 Ruthie Brockherst 04-DEC-18 08.22.32.953342 PM 04-DEC-18 08.22.32.000000 PM
3 Mariejeanne Cocci 04-DEC-18 08.22.32.965713 PM 04-DEC-18 08.22.32.000000 PM
4 Hashim Rumke 04-DEC-18 08.22.32.977417 PM 04-DEC-18 08.22.32.000000 PM
5 Hansiain Coda 04-DEC-18 08.22.32.979967 PM 04-DEC-18 08.22.32.000000 PM
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort
Expected:
source | ora-source-debezium-xstream | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector
source | ora-source-jdbc | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
In ksqlDB:
ksql> list topics;
Show contents
PRINT 'asgard.DEBEZIUM.CUSTOMERS' FROM BEGINNING;
SET AUTOCOMMIT ON;
INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,CLUB_STATUS) VALUES ('Rick','Astley','Bronze');
Create a stream
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_DBZ_SRC WITH (KAFKA_TOPIC='asgard.DEBEZIUM.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');
LIST STREAMS;
It also supports nested data
DESCRIBE CUSTOMERS_STREAM_DBZ_SRC;
Pretty-print the source data to show why nested
echo '{"before": {"ID": 42, "FIRST_NAME": "Rick", "LAST_NAME": "Astley", "EMAIL": null, "GENDER": null, "CLUB_STATUS": "Bronze", "COMMENTS": null, "CREATE_TS": 1544000706681769, "UPDATE_TS": 1544000706000000}, "after": {"ID": 42, "FIRST_NAME": "Rick", "LAST_NAME": "Astley", "EMAIL": null, "GENDER": null, "CLUB_STATUS": "Platinum", "COMMENTS": null, "CREATE_TS": 1544000706681769, "UPDATE_TS": 1544000742000000}, "source": {"version": "0.9.0.Alpha2", "connector": "oracle", "name": "asgard", "ts_ms": 1544000742000, "txId": "6.26.734", "scn": 2796831, "snapshot": false}, "op": "u", "ts_ms": 1544000745823, "messagetopic": "asgard.DEBEZIUM.CUSTOMERS", "messagesource": "Debezium CDC from Oracle on asgard"}'|jq '.'
or use kafkacat
docker exec kafkacat kafkacat -b kafka:29092 -t asgard.DEBEZIUM.CUSTOMERS -C -u -q -o-1 -c1 -r http://schema-registry:8081 -s key=s -s value=avro|jq '.'
Look at before & after:
SELECT OP, AFTER->ID, BEFORE->CLUB_STATUS, AFTER->CLUB_STATUS FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
r | 1 | null | bronze
r | 2 | null | platinum
r | 3 | null | bronze
r | 4 | null | platinum
r | 5 | null | platinum
c | 42 | null | Bronze
u | 42 | Bronze | Platinum
JDBC only shows what it is now:
SELECT ID, CLUB_STATUS FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES;
Do an update in the database, do a delete - note the data you get with proper CDC vs not
UPDATE CUSTOMERS SET CLUB_STATUS='Silver' WHERE ID=2;
DELETE FROM CUSTOMERS WHERE ID=2;
Flattening data:
CREATE STREAM CUSTOMERS_STREAM_FLATTENED AS \
SELECT AFTER->ID AS ID, \
AFTER->FIRST_NAME AS FIRST_NAME, \
AFTER->LAST_NAME AS LAST_NAME, \
AFTER->EMAIL AS EMAIL, \
AFTER->GENDER AS GENDER, \
AFTER->CLUB_STATUS AS CLUB_STATUS, \
AFTER->COMMENTS AS COMMENTS \
FROM CUSTOMERS_STREAM_DBZ_SRC
EMIT CHANGES;
LIST TOPICS;
PRINT 'CUSTOMERS_STREAM_FLATTENED' FROM BEGINNING;
Checking lag
CREATE STREAM LAG_MONITOR_DBZ AS \
SELECT SOURCE->TS_MS, \
ROWTIME - SOURCE->TS_MS AS LAG, \
OP, \
SOURCE->SNAPSHOT, \
BEFORE->ID AS ID_BEFORE, \
AFTER->ID AS ID_AFTER \
FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
SELECT ID_BEFORE, ID_AFTER, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SYSTEM_TIME, LAG, OP FROM LAG_MONITOR_DBZ EMIT CHANGES;
(what would be nice here is to hook up LAG_MONITOR_DBZ
to Elasticsearch or InfluxDB and have a little monitoring chart)
ksql> SELECT OP, COUNT(*) AS OP_COUNT FROM CUSTOMERS_STREAM_DBZ_SRC GROUP BY OP EMIT CHANGES;
c | 1
r | 9
u | 5
d | 3
In ksqlDB:
ksql> list topics;
Show contents
PRINT 'ora-CUSTOMERS-jdbc' FROM BEGINNING;
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');
ksqlDB applies the schema to the data
DESCRIBE CUSTOMERS_STREAM_JDBC_SRC;
Lag
CREATE STREAM LAG_MONITOR_JDBC AS SELECT UPDATE_TS, ROWTIME-UPDATE_TS AS LAG, ID FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES;
SELECT ID, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SYSTEM_TIME, LAG FROM LAG_MONITOR_JDBC EMIT CHANGES;