Skip to content

Latest commit

 

History

History
51 lines (37 loc) · 2.77 KB

File metadata and controls

51 lines (37 loc) · 2.77 KB

02 Watermarks

Twitter Badge

💡 This example will show how to use WATERMARKs to work with timestamps in records.

The source table (doctor_sightings) is backed by the faker connector, which continuously generates rows in memory based on Java Faker expressions.

The previous recipe showed how a TUMBLE group window makes it simple to aggregate time-series data.

The Doctor is a renegade time lord who travels through space and time in a TARDIS. As different versions of the Doctor travel through time, various people log their sightings. We want to track how many times each version of the Doctor is seen each minute. Unlike the previous recipe, these records have an embedded timestamp we need to use to perform our calculation.

More often than not, most data will come with embedded timestamps that we want to use for our time series calculations. We call this timestamp an event-time attribute.

Event time represents when something actually happened in the real world. And it is unique because it is quasi-monotonically increasing; we generally see things that happened earlier before seeing things that happen later. Of course, data will never be perfectly ordered (systems go down, networks are laggy, doctor sighting take time to postmark and mail), and there will be some out-of-orderness in our data.

Flink can account for all these variabilities using a WATERMARK attribute in the tables DDL. The watermark signifies a column as the table's event time attribute and tells Flink how out of order we expect our data.

In the Doctor's case, we expect all records to arrive within 15 seconds when the sighting occurs.

Script

CREATE TABLE doctor_sightings (
  doctor        STRING,
  sighting_time TIMESTAMP(3),
  WATERMARK FOR sighting_time AS sighting_time - INTERVAL '15' SECONDS
)
WITH (
  'connector' = 'faker', 
  'fields.doctor.expression' = '#{dr_who.the_doctors}',
  'fields.sighting_time.expression' = '#{date.past ''15'',''SECONDS''}'
);

SELECT 
    doctor,
    TUMBLE_ROWTIME(sighting_time, INTERVAL '1' MINUTE) AS sighting_time,
    COUNT(*) AS sightings
FROM doctor_sightings
GROUP BY 
    TUMBLE(sighting_time, INTERVAL '1' MINUTE),
    doctor;

Example Output

02_watermarks