-
Notifications
You must be signed in to change notification settings - Fork 22
/
pipeline_kafka--1.0.0.sql
133 lines (117 loc) · 3.87 KB
/
pipeline_kafka--1.0.0.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pipeline_kafka" to load this file. \quit
-- Consumers added with pipeline_kafka.consume_begin
CREATE TABLE pipeline_kafka.consumers (
id serial PRIMARY KEY,
topic text NOT NULL,
relation text NOT NULL,
group_id text,
format text NOT NULL,
delimiter text,
quote text,
escape text,
batchsize integer NOT NULL,
maxbytes integer NOT NULL,
parallelism integer NOT NULL,
timeout integer NOT NULL,
UNIQUE (topic, relation)
);
CREATE TABLE pipeline_kafka.offsets (
consumer_id integer NOT NULL REFERENCES pipeline_kafka.consumers (id),
partition integer NOT NULL,
"offset" bigint NOT NULL,
PRIMARY KEY (consumer_id, partition)
);
CREATE INDEX offsets_customer_idx ON pipeline_kafka.offsets (consumer_id);
-- Brokers added with pipeline_kafka.add_broker
CREATE TABLE pipeline_kafka.brokers (
host text PRIMARY KEY
);
CREATE FUNCTION pipeline_kafka.consume_begin (
topic text,
relation text,
group_id text DEFAULT NULL,
format text DEFAULT 'text',
delimiter text DEFAULT E'\t',
quote text DEFAULT NULL,
escape text DEFAULT NULL,
batchsize integer DEFAULT 10000,
maxbytes integer DEFAULT 32000000, -- 32mb
parallelism integer DEFAULT 1,
timeout integer DEFAULT 250,
start_offset bigint DEFAULT NULL
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_consume_begin'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.consume_end (
topic text,
relation text
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_consume_end'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.consume_begin()
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_consume_begin_all'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.consume_end()
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_consume_end_all'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.produce_message (
topic text,
message bytea,
partition integer DEFAULT NULL,
key bytea DEFAULT NULL
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_produce_msg'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.emit_tuple()
RETURNS trigger
AS 'MODULE_PATHNAME', 'kafka_emit_tuple'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.add_broker (
host text
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_add_broker'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.remove_broker (
host text
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_remove_broker'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.consume_begin_stream_partitioned (
topic text,
group_id text DEFAULT NULL,
format text DEFAULT 'text',
delimiter text DEFAULT E'\t',
quote text DEFAULT NULL,
escape text DEFAULT NULL,
batchsize integer DEFAULT 10000,
maxbytes integer DEFAULT 32000000, -- 32mb
parallelism integer DEFAULT 1,
timeout integer DEFAULT 250,
start_offset bigint DEFAULT NULL
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_consume_begin_stream_partitioned'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.consume_end_stream_partitioned (
topic text
)
RETURNS text
AS 'MODULE_PATHNAME', 'kafka_consume_end_stream_partitioned'
LANGUAGE C IMMUTABLE;
CREATE FUNCTION pipeline_kafka.topic_watermarks (topic text, partition integer)
RETURNS TABLE (partition integer, low_watermark bigint, high_watermark bigint)
AS 'MODULE_PATHNAME', 'kafka_topic_watermarks'
LANGUAGE C IMMUTABLE;
CREATE VIEW pipeline_kafka.consumer_lag AS
SELECT c.id AS consumer_id, c.group_id, c.topic, o.partition, o.offset + 1 AS offset, w.high_watermark,
w.high_watermark - ("offset" + 1) AS lag FROM pipeline_kafka.consumers c
JOIN pipeline_kafka.offsets o ON c.id = o.consumer_id
JOIN pipeline_kafka.topic_watermarks(c.topic, o.partition) w ON w.partition = o.partition;