-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwatcher_alarm.jl
62 lines (52 loc) · 1.33 KB
/
watcher_alarm.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
using ArgParse
using CSV
using DataFrames
using DuckDB
using Rembus
function command_line()
s = ArgParseSettings()
@add_arg_table! s begin
"--element", "-e"
help = "pollutant element"
arg_type = String
default = "no2"
"--thr", "-t"
help = "a value above threshold value generate an alarm"
arg_type = Float64
default = 150.0
end
return parse_args(s)
end
function set_thr(args, value)
old_value = args["thr"]
args["thr"] = value
return old_value
end
function pollution(args, fn, content)
@info "pollution from: $fn"
#thr = parse(Float64, ARGS[1])
thr = args["thr"]
el = args["element"]
df = CSV.read(content, DataFrame)
db = DuckDB.DB()
DuckDB.register_data_frame(db, df, "df")
alarms = DataFrame(
DBInterface.execute(db, "SELECT date, station_name, no2, o3 FROM df WHERE $el>$thr")
)
@info "alarms: $alarms"
for alarm in eachrow(alarms)
@publish alarm(
Dict(
"ts" => alarm.Date,
"station" => alarm.station_name,
"no2" => alarm.NO2,
"o3" => alarm.O3
)
)
end
end
args = command_line()
@shared args
@subscribe pollution
@expose set_thr
@forever