See the slides for context.
To keep things simple, this demo uses a Docker Compose setup that makes it easier to bundle up all the services you need for your CDC pipeline:
docker-compose build
docker-compose up -d
docker-compose ps
You should be able to access the Flink Web UI (http://localhost:8081), as well as Kibana (http://localhost:5601).
Start the Postgres client to have a look at the source tables and run some DML statements later:
docker-compose exec postgres env PGOPTIONS="--search_path=claims" bash -c 'psql -U $POSTGRES_USER postgres'
SELECT * FROM information_schema.tables WHERE table_schema='claims';
Start the Debezium Postgres connector using the configuration provided in the register-postgres.json
file:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Check that the connector is running:
curl http://localhost:8083/connectors/claims-connector/status # | jq
The first time it connects to a Postgres server, Debezium takes a consistent snapshot of all database schemas; so, you should see that the pre-existing records in the accident_claims
table have already been pushed into your Kafka topic:
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic pg_claims.claims.accident_claims
ℹ️ Have a quick read about the structure of these events in the Debezium documentation.
In the tab you used to start the Postgres client, you can now run some DML statements to see that the changes are propagated all the way to your Kafka topic:
INSERT INTO accident_claims (claim_total, claim_total_receipt, claim_currency, member_id, accident_date, accident_type, accident_detail, claim_date, claim_status) VALUES (500, 'PharetraMagnaVestibulum.tiff', 'AUD', 321, '2020-08-01 06:43:03', 'Collision', 'Blue Ringed Octopus', '2020-08-10 09:39:31', 'INITIAL');
UPDATE accident_claims SET claim_total_receipt = 'CorrectReceipt.pdf' WHERE claim_id = 1001;
DELETE FROM accident_claims WHERE claim_id = 1001;
In the output of your Kafka console consumer, you should now see three consecutive events with op
values equal to c
(an insert event), u
(an update event) and d
(a delete event).
Start the Flink SQL Client:
docker-compose exec sql-client ./sql-client.sh
Register a Postgres catalog, so you can access the metadata of the external tables over JDBC:
CREATE CATALOG postgres WITH (
'type'='jdbc',
'property-version'='1',
'base-url'='jdbc:postgresql://postgres:5432/',
'default-database'='postgres',
'username'='postgres',
'password'='postgres'
);
Create a changelog table to consume the change events from the pg_claims.claims.accident_claims
topic, with the same schema as the accident_claims
source table, that consumes the debezium-json
format:
CREATE TABLE accident_claims
WITH (
'connector' = 'kafka',
'topic' = 'pg_claims.claims.accident_claims',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test-consumer-group',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
)
LIKE postgres.postgres.`claims.accident_claims` (
EXCLUDING OPTIONS);
and register a reference members
table:
CREATE TABLE members
LIKE postgres.postgres.`claims.members` (
INCLUDING OPTIONS);
ℹ️ For more details on the Debezium Format, refer to the Flink documentation.
You can now query the changelog source table you just created:
SELECT * FROM accident_claims;
and (same as before) insert, update and delete a record in the Postgres client to see how the query results update in (near) real-time.
ℹ️ If you're curious to see what's going on behind the scenes (i.e. how queries translate into continuously running Flink jobs) check the Flink Web UI (http://localhost:8081).
Create a sink table that will write to a new agg_insurance_costs
index on Elasticsearch:
CREATE TABLE agg_insurance_costs (
es_key STRING PRIMARY KEY NOT ENFORCED,
insurance_company STRING,
accident_detail STRING,
accident_agg_cost DOUBLE
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'agg_insurance_costs'
);
and submit a continuous query to the Flink cluster that will write the aggregated insurance costs per insurance_company
, bucketed by accident_detail
(or, what animals are causing the most harm in terms of costs):
INSERT INTO agg_insurance_costs
SELECT UPPER(SUBSTRING(m.insurance_company,0,4) || '_' || SUBSTRING (ac.accident_detail,0,4)) es_key,
m.insurance_company,
ac.accident_detail,
SUM(ac.claim_total) member_total
FROM accident_claims ac
JOIN members m
ON ac.member_id = m.id
WHERE ac.claim_status <> 'DENIED'
GROUP BY m.insurance_company, ac.accident_detail;
Finally, create a simple dashboard in Kibana with a 1s refresh rate and use the (very rustic) postgres_datagen.sql
data generator script to periodically insert some records into the Postgres source table, creating visible changes in your results:
cat ./postgres_datagen.sql | docker exec -i flink-sql-cdc_postgres_1 psql -U postgres -d postgres
And that's it!
This demo will get some polishing as CDC support in Flink matures.
If you have any questions or feedback, feel free to DM me on Twitter @morsapaes.