This demo uses Materialize to keep track of and explore what's happening at Data Council Austin 2022 based on Twitter activity. It was mostly an excuse to play around with ✨Hex✨! Once the event is over, it can be adjusted to track something else with some tweaks to the data generator.
You can find the published app here, as well as a notebook export in the repo (hex-data-council.ipynb
).
The pipeline uses Docker Compose to make it easier to bundle up all the services feeding into Hex:
-
Data generator
The generator script listens to tweets about Data Council in real-time using the Twitter API v2. For details, check the
data-generator
directory. -
Redpanda
The data generator produces JSON-formatted events to three different topics in Redpanda:
dc_tweets
,dc_users
anddc_places
. You can think of Redpanda as the source of truth, the system that stores and distributes Twitter data downstream. -
Materialize
Materialize is set up to consume, transform and combine the Twitter data streaming in from Redpanda. If you're completely new to Materialize, you can refer to our getting started guide for a quick rundown.
If you want to spin the demo up, you'll need to register an app in the Twitter Developer Portal to get a hold of the auth token (BEARER_TOKEN
).
# Export the credentials
export BEARER_TOKEN='<your_bearer_token>'
# Start the setup
docker-compose up -d
# Is everything really up and running?
docker-compose ps
To tap into and manage Redpanda, you can use the rpk CLI. For example, to check that the topics have been created, run:
docker-compose exec redpanda rpk topic list
and that there's data landing from the data-generator
:
docker-compose exec redpanda rpk topic consume dc_tweets
docker-compose exec redpanda rpk topic consume dc_users
# This topic isn't really used since it gets close to no data
docker-compose exec redpanda rpk topic consume dc_places
The first step to consume JSON events from Redpanda in Materialize is to create a Kafka source (since Redpanda is Kafka API-compatible) for each of the topics we're interested in:
CREATE SOURCE rp_twitter_tweets
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'dc_tweets'
FORMAT BYTES;
CREATE SOURCE rp_twitter_users
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'dc_users'
FORMAT BYTES
ENVELOPE UPSERT;
CREATE SOURCE rp_twitter_places
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'dc_places'
FORMAT BYTES
ENVELOPE UPSERT;
The source data is stored as raw bytes, so we need to do some casting to convert it to a readable format (and appropriate data types):
CREATE MATERIALIZED VIEW twitter_tweets AS
SELECT (data->>'id')::bigint AS tweet_id,
(data->'referenced_tweets'->0->>'type')::string AS tweet_type,
(data->>'text')::string AS tweet_text,
(data->'referenced_tweets'->0->>'id')::string AS tweet_id_rr,
(data->>'author_id')::bigint AS user_id,
(data->'geo'->>'place_id')::string AS place_id,
(data->>'created_at')::timestamp AS created_at
FROM (SELECT CONVERT_FROM(data,'utf8')::jsonb AS data FROM rp_twitter_tweets);
CREATE MATERIALIZED VIEW twitter_users AS
SELECT (data->>'id')::bigint AS user_id,
(data->>'username')::string AS username,
(data->>'name')::string AS user_name,
(data->>'location')::string AS location
FROM (SELECT CONVERT_FROM(data,'utf8')::jsonb AS data FROM rp_twitter_users);
CREATE MATERIALIZED VIEW twitter_places AS
SELECT (data->0->>'id')::string AS place_id,
(data->0->>'name')::string AS place_name,
(data->0->>'full_name')::string AS place_full_name,
(data->0->>'full_name')::string AS place_type
FROM (SELECT CONVERT_FROM(data,'utf8')::jsonb AS data FROM rp_twitter_places);
Then, we can get straight into creating the base views and materialized views that will support our Twitter exploration:
CREATE VIEW twitter_tweets_enriched AS
SELECT tweet_text AS tweet,
username,
CASE WHEN tweet_type = 'quoted' THEN 'quoted retweet'
WHEN tweet_type = 'replied to' THEN 'tweet reply'
ELSE 'tweet'
END AS tweet_type,
created_at
FROM twitter_tweets tt
--This is a streaming join!
JOIN twitter_users tu ON tt.user_id = tu.user_id;
CREATE MATERIALIZED VIEW agg_tweets AS
SELECT COUNT(tweet) AS total_tweets,
username
FROM twitter_tweets_enriched
GROUP BY username;
CREATE MATERIALIZED VIEW agg_users AS
SELECT COUNT(twitter_id) AS total_tweets
FROM twitter_tweets
GROUP BY twitter_id;
CREATE MATERIALIZED VIEW tweets_hourly AS
SELECT
date_bin(interval '1 hours', created_at, '2022-03-22') AS time_bucket,
COUNT(tweet_id) AS total_tweets
FROM twitter_tweets
GROUP BY 1;
To enable a little trick in Hex, we also need to create a static table to hold the Twitter usernames submitted through the UI:
CREATE TABLE users_not_there
(
username STRING
);
This was my first time using Hex and I have to say: I'm here for it. I'll follow up this demo with a blogpost walking through the magic behind it.