-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfingr.py
executable file
·735 lines (611 loc) · 22.4 KB
/
fingr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
#!/usr/bin/env python3
import os
import sys
import argparse
import logging
import asyncio
import math
import datetime
import pytz
import secrets
import string
from geopy.geocoders import Nominatim
from metno_locationforecast import Place, Forecast
import redis
import pysolar
import timezonefinder
import socket # To catch connection error
from typing import Tuple, Optional
__version__ = "2024-10"
__url__ = "https://github.com/ways/fingr"
__license__ = "GPL3"
input_limit = 30
weather_legend = (
"\nLegend left axis: - Sunny ^ Scattered = Clouded =V= Thunder # Fog"
+ "\nLegend right axis: | Rain ! Sleet * Snow\n"
)
last_reply_file = "/tmp/fingr" # nosec B108
def read_useragent() -> str:
"""Met.no requires a contact address as user agent."""
uafile = "useragent.txt"
try:
with open(uafile, mode="r", encoding="utf-8") as f:
for line in f:
return line.strip()
logger.info("Read useragent file <%s>", uafile)
except FileNotFoundError:
logger.warning(
"Unable to read useragent file <%s>. This is required by upstream API. You risk getting your IP banned.",
uafile,
)
return "default fingr useragent"
def read_motdlist() -> list:
"""Random message to user."""
motdfile = "motd.txt"
motdlist = []
count = 0
try:
with open(motdfile, mode="r", encoding="utf-8") as f:
for line in f:
count += 1
line = line.strip()
if line.startswith("#"):
continue
if len(line) == 0:
continue
motdlist.append(line.strip())
logger.info("Read motd file with <%s> lines.", count)
except FileNotFoundError as err:
logger.warning(
"Unable to read motd list, <%s/%s>. Warning: %s", os.getcwd(), motdfile, err
)
return motdlist
def random_message(messages: list) -> str:
"""Pick a random message of the day."""
if 0 == len(messages):
return ""
return "[" + messages[secrets.randbelow(len(messages) - 1)] + "]\n"
def read_denylist() -> list:
"""Populate list of IPs to deny service."""
denyfile = "deny.txt"
denylist = []
count = 0
try:
with open(denyfile, mode="r", encoding="utf-8") as f:
for line in f:
count += 1
line = line.strip()
if line.startswith("#"):
continue
if len(line) == 0:
continue
denylist.append(line.strip())
logger.info("Read denylist with %s lines.", count)
except FileNotFoundError as err:
logger.warning(
"Unable to read deny list, <%s/%s>. Warning: %s", os.getcwd(), denyfile, err
)
return denylist
def get_timezone(lat: float, lon: float) -> pytz.BaseTzInfo:
"""Return timezone for coordinate."""
return pytz.timezone(timezone_finder.timezone_at(lng=lon, lat=lat))
def wind_direction(deg: int) -> str:
"""Return compass direction from degrees."""
symbol = ""
if 293 <= deg < 338:
symbol = "NW"
elif 338 <= deg < 360:
symbol = " N"
elif 0 <= deg < 23:
symbol = " N"
elif 23 <= deg < 68:
symbol = "NE"
elif 68 <= deg < 113:
symbol = " E"
elif 113 <= deg < 158:
symbol = "SE"
elif 158 <= deg < 203:
symbol = " S"
elif 203 <= deg < 248:
symbol = "SW"
elif 248 <= deg < 293:
symbol = " W"
else:
symbol = " ?"
return symbol
def clean_input(data: str) -> str:
"""Only allow numbers, letters, and some special chars from user."""
# Change sub score to space
data = data.replace("_", " ")
# TODO: include all weird characters for other languages
SPECIAL_CHARS = "^-.,:/~¤£ øæåØÆÅéüÜÉýÝ"
return "".join(
c for c in data if c in string.digits + string.ascii_letters + SPECIAL_CHARS
)
def resolve_location(
redis_client: redis.client,
data="Oslo/Norway",
) -> Tuple[float | None, float | None, str, bool]:
"""Get coordinates from location name. Return lat, long, name."""
cache = None
# Check if coordinates
if "," in data:
lat, lon = data.split(",")
try:
lat = float(lat)
lon = float(lon)
return lat, lon, f"coordinates {lat}, {lon}", False
except ValueError:
pass
lat = None
lon = None
address = None
# Check if in redis cache
if redis_client is not None:
cache = redis_client.get(data)
if cache:
lat, lon, address = cache.decode("utf-8").split("|")
lat = float(lat)
lon = float(lon)
else:
coordinate = None
try:
coordinate = geolocator.geocode(data, language="en")
except socket.timeout as err:
# nominatim.openstreetmap.org down
print("nominatim.openstreetmap.org down. %s" % err)
return None, None, "No service", False
if coordinate:
lat = coordinate.latitude
lon = coordinate.longitude
try:
address = coordinate.address.decode("utf-8")
except AttributeError:
address = coordinate.address
if lat:
# Store to redis cache as <search>: "lat,lon,address"
if not cache:
redis_client.setex(
data,
datetime.timedelta(days=7),
"|".join([str(lat), str(lon), str(address)]),
)
return lat, lon, address, cache
return None, None, "No location found", False
def fetch_weather(lat: float, lon: float, address: str = ""):
"""Get forecast data using metno-locationforecast."""
location = Place(address, lat, lon)
forecast = Forecast(location, user_agent=user_agent)
updated = forecast.update()
if forecast.json["status_code"] != 200:
logger.error("Forecast response: %s", forecast.json["status_code"])
return forecast, updated
def calculate_wind_chill(temperature: float, wind_speed: float):
return int(
13.12
+ (0.615 * float(temperature))
- (11.37 * (float(wind_speed) * 3.6) ** 0.16)
+ (0.3965 * float(temperature)) * ((float(wind_speed) * 3.6) ** 0.16)
)
def sun_up(latitude: float, longitude: float, date: datetime.datetime) -> bool:
"""Return symbols showing if sun is up at a place and time."""
if 0 < pysolar.solar.get_altitude(latitude, longitude, date):
return True
return False
def format_meteogram(
forecast,
lat,
lon,
timezone,
imperial=False,
beaufort=False,
offset=0,
hourstep=1,
screenwidth=80,
wind_chill=False,
):
"""Format a meteogram from forcast data."""
output = ""
# Init graph
graph = dict()
tempheight = 11
timeline = 13
windline = 15
windstrline = 16
graph[timeline] = " " # time
graph[timeline + 1] = " " # date line
graph[windline] = " " # wind
graph[windstrline] = " " # wind strenght
hourcount = int((screenwidth - 14) / 3 + offset)
# Rain in graph:
rainheight = 10
rainstep = -1
rainhigh = 0 # highest rain on graph
# First iteration to collect temperature and rain max, min.
iteration = 0
temphigh = -99
templow = 99
tempstep = -1
for interval in forecast.data.intervals:
iteration += 1
if iteration > hourcount:
break
if imperial:
interval.variables["air_temperature"].convert_to("fahrenheit")
temperature = int(interval.variables["air_temperature"].value)
if wind_chill:
wind_speed = int(interval.variables["wind_speed"].value)
temperature = calculate_wind_chill(temperature, wind_speed)
precipitation = 0
try:
precipitation = math.ceil(
float(interval.variables["precipitation_amount"].value)
)
if imperial:
precipitation = (
precipitation / 25.4
) # No convert_to for this unit in lib
except KeyError:
pass
if temperature > temphigh:
temphigh = temperature
if temperature < templow:
templow = temperature
if math.ceil(precipitation) > rainhigh:
rainhigh = precipitation
# Scale y-axis based on first iteration. default = -1
if tempheight <= (temphigh - templow):
tempstep = -2
if temphigh == templow:
templow = temphigh - 1
# Create temp range
temps = []
for t in range(int(temphigh), int(templow) - 1, tempstep):
temps.append(t)
# Extend temp range
for t in range(0, tempheight):
if len(temps) + 1 < tempheight:
if t % 2 == 0: # extend down
temps.append(temps[len(temps) - 1] - abs(tempstep))
else: # extend up
temps = [temps[0] + abs(tempstep)] + temps
# write temps to graph
for i in range(1, tempheight):
try:
graph[i] = str(temps[i - 1]).rjust(3, " ")
except IndexError: # list empty
pass
# create rainaxis #TODO: make this scale
rainaxis = []
for r in range(rainheight, 0, rainstep):
if r <= rainhigh: # + 1
rainaxis.append("%2.0f mm " % r)
else:
rainaxis.append(" ")
# draw graph elements:
iteration = 0
for interval in forecast.data.intervals:
temperature = int(interval.variables["air_temperature"].value)
wind_from_direction = int(interval.variables["wind_from_direction"].value)
if wind_chill:
temperature = calculate_wind_chill(temperature, wind_speed)
if beaufort:
interval.variables["wind_speed"].convert_to("beaufort")
elif imperial:
interval.variables["wind_speed"].convert_to("mph")
wind_speed = int(interval.variables["wind_speed"].value)
precipitation = 0
try:
rain = math.ceil(float(interval.variables["precipitation_amount"].value))
if imperial:
rain = rain / 25.4 # No convert_to for this unit in lib
except KeyError:
pass
iteration += 1
if iteration > hourcount:
break
# Rain
rainmax = 0 # max rain for this hour
# Wind on x axis
graph[windline] += " " + (
wind_direction(wind_from_direction) if wind_speed != 0.0 else " O"
)
# Wind strength on x axis
graph[windstrline] += " " + "%2.0f" % wind_speed
# Time on x axis
start_time = interval.start_time.replace(
tzinfo=pytz.timezone("UTC")
).astimezone(timezone)
date = start_time.strftime("%d/%m")
hour = start_time.strftime("%H")
if sun_up(latitude=lat, longitude=lon, date=start_time):
spacer = "_"
else:
spacer = " "
if hour == "01": # Date changed
graph[timeline] = graph[timeline][:-2] + date
else:
graph[timeline] += spacer + hour
# for each y (temp) look for matching temp, draw graph
for i in range(1, tempheight): # draw temp
try:
# parse out numbers to be compared
temptomatch = [temperature]
tempingraph = int(graph[i][:3].strip())
if tempstep < -1: # TODO: this should scale higher than one step
temptomatch.append(temptomatch[0] - 1)
if tempingraph in temptomatch:
# Match symbols from https://api.met.no/weatherapi/weathericon/2.0/documentation
if not interval.symbol_code:
graph[i] += " "
elif "partlycloudy" in interval.symbol_code: # partly
graph[i] += "^^^"
elif (
"cloudy" in interval.symbol_code
or "rain" in interval.symbol_code
or "sleet" in interval.symbol_code
or "snow" in interval.symbol_code
): # clouded, rain
graph[i] += "==="
elif "thunder" in interval.symbol_code: # thunder
graph[i] += "=V="
elif "fog" in interval.symbol_code: # fog
graph[i] += "###"
elif "fair" in interval.symbol_code: # light clouds
graph[i] += "=--"
elif "clearsky" in interval.symbol_code: # clear
graph[i] += "---"
else: # Shouldn't hit this
graph[i] += interval.symbol_code
else:
graph[i] += " "
except KeyError:
continue
# compare rain, and print
# TODO: scaling
if (rain != 0) and (rain > 10 - i):
if "sleet" in interval.symbol_code: # sleet
rainsymbol = "!"
elif "snow" in interval.symbol_code: # snow
rainsymbol = "*"
else: # if int(item['symbolnumber']) in [5,6,9,10,11,14]: #rain
rainsymbol = "|"
# if 0 > int(item['temperature']): #rain but cold
# rainsymbol = "*"
# if verbose:
# print("rainmax: ", rainmax,"i",i,"rain",rain)
# if overflow, print number at top
if rain > 10 and i == 1:
rainsymbol = "%2.0f" % rain
graph[i] = graph[i][:-2] + rainsymbol
else:
# print rainmax if larger than rain.
if rainmax > rain:
try:
graph[i - 1] = graph[i - 1][:-1] + "'"
except KeyError:
pass
# print rain
graph[i] = graph[i][:-1] + rainsymbol
graph = print_units(
graph, screenwidth, imperial, beaufort, windline, windstrline, timeline
)
output += print_meteogram_header(
forecast.place.name + (" (wind chill)" if wind_chill else ""), screenwidth
)
# add rain to graph
for i in range(1, tempheight):
try:
graph[i] += rainaxis[i - 1]
except IndexError:
pass
for k in sorted(graph.keys()):
output += graph[k] + "\n"
# Weather legend
output += weather_legend
return output
def print_units(
graph, screenwidth, imperial, beaufort, windline, windstrline, timeline
):
"""Add units for rain, wind, etc."""
graph[0] = " 'C" + str.rjust("Rain (mm) ", screenwidth - 3)
if imperial:
graph[0] = " 'F" + str.rjust("Rain (in)", screenwidth - 3)
graph[windline] += " Wind dir."
if beaufort:
graph[windstrline] += " Wind(Bft)"
elif imperial:
graph[windstrline] += " Wind(mph)"
else:
graph[windstrline] += " Wind(m/s)"
graph[timeline] += " Hour"
return graph
def print_meteogram_header(display_name, screenwidth):
"""Return the header."""
headline = "-= Meteogram for %s =-" % display_name
return str.center(headline, screenwidth) + "\n"
def format_oneliner(
forecast, timezone, imperial=False, beaufort=False, offset=0, wind_chill=False
):
"""Return a one-line weather forecast. TODO: remove json, respect windchill, imperial, etc."""
start_time = None
place = forecast.place.name
next6 = forecast.json["data"]["properties"]["timeseries"][0]["data"]["next_6_hours"]
for interval in forecast.data.intervals:
start_time = interval.start_time.replace(
tzinfo=pytz.timezone("UTC")
).astimezone(timezone)
break
return "%s %s next 6 hours: %s" % (start_time, place, next6)
async def handle_request(reader, writer):
"""Receives connections and responds."""
data = await reader.read(input_limit)
response = ""
updated = None
imperial = False
beaufort = False
oneliner = False
try:
user_input = clean_input(data.decode())
addr = writer.get_extra_info("peername")
screenwidth = 80
wind_chill = False
logger.debug('%s GET "%s"', addr[0], user_input)
# Deny list
if addr[0] in denylist:
logger.info('%s BLACKLISTED "%s"', addr[0], user_input)
response = "You have been blacklisted for excessive use. Send a mail to [email protected] to be delisted."
return
if user_input.startswith("o:"):
oneliner = True
user_input = user_input.replace("o:", "")
# Imperial
if user_input.startswith("^"):
user_input = user_input[1:]
imperial = True
# Wind speed in the Beaufort scale
if user_input.startswith("£"):
user_input = user_input[1:]
beaufort = True
# Wind chill
if user_input.startswith("¤"):
user_input = user_input[1:]
wind_chill = True
if "~" in user_input:
screenwidth = int(user_input.split("~")[1])
user_input = user_input.split("~")[0]
if user_input == "help" or len(user_input) == 0:
logger.info("%s help", addr[0])
response = service_usage()
else:
lat, lon, address, cached_location = resolve_location(r, user_input)
if not lat:
if address == "No service":
response += (
"Error: address service down. You can still use coordinates."
)
else:
logger.info('%s NOTFOUND "%s"', addr[0], user_input)
response += "Location not found. Try help."
else:
timezone = get_timezone(lat, lon)
weather_data, updated = fetch_weather(lat, lon, address)
logger.info(
'%s Resolved "%s" to "%s". location cached: %s. '
+ "Weatherdata: %s. o:%s, ^:%s, £:%s, ¤:%s",
addr[0],
user_input,
address,
bool(cached_location),
updated,
bool(oneliner),
bool(imperial),
bool(beaufort),
bool(wind_chill),
)
if not oneliner:
response = format_meteogram(
weather_data,
lat,
lon,
imperial=imperial,
beaufort=beaufort,
screenwidth=screenwidth,
wind_chill=wind_chill,
timezone=timezone,
)
response += random_message(motdlist)
else:
response = format_oneliner(
weather_data,
timezone=timezone,
imperial=imperial,
beaufort=beaufort,
wind_chill=wind_chill,
)
finally:
writer.write(response.encode())
logger.debug(
"%s Replied with %s bytes. Weatherdata: %s", addr[0], len(response), updated
)
await writer.drain()
writer.close()
if last_reply_file:
with open(last_reply_file, mode="w", encoding="utf-8") as f:
f.write(addr[0] + " " + user_input + "\n\n")
f.write(response)
async def main(args):
"""Start server and bind to port."""
global r
logger.info(
"Connecting to redis host %s port %s" % (args.redis_host, args.redis_port)
)
r = redis.Redis(host=args.redis_host, port=args.redis_port)
try:
r.ping()
except redis.exceptions.ConnectionError:
logger.error(
"Unable to connect to redis at <%s>:<%s>", args.redis_host, args.redis_port
)
sys.exit(1)
logger.info("Redis connected")
logger.info("Starting on port %s", args.port)
server = await asyncio.start_server(handle_request, args.host, args.port)
addr = server.sockets[0].getsockname()
logger.info("Ready to serve on address %s:%s", addr[0], addr[1])
async with server:
await server.serve_forever()
def service_usage():
return """Weather via finger, graph.no
* Code: https://github.com/ways/fingr/
* https://nominatim.org/ is used for location lookup.
* https://www.yr.no/ is used for weather data.
* Hosted by Copyleft Solutions AS: https://copyleft.no/
* Contact: [email protected]
Usage:
finger [email protected]
Using coordinates:
finger 59.1,[email protected]
Using imperial units:
finger ^[email protected]
Using the Beaufort wind scale:
finger £[email protected]
Ask for wider output, longer forecast (~<screen width>):
finger [email protected]
Specify another location when names conflict:
finger "oslo, united states"@graph.no
Display "wind chill" / "feels like" temperature:
finger ¤[email protected]
No graph, just a one-line forecast (needs improvement):
finger o:[email protected]
Hammering will get you blacklisted. Remember the data doesn't change more than once an hour.
News:
* Launched in 2012
* 2021-05: total rewrite due to API changes. Much better location searching, proper hour-by-hour for most of the world.
"""
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S %z"
)
logger = logging.getLogger()
denylist = read_denylist()
motdlist = read_motdlist()
user_agent = read_useragent()
r = None # redis.Redis()
geolocator = Nominatim(user_agent=user_agent, timeout=3)
timezone_finder = timezonefinder.TimezoneFinder()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="fingr")
# parser.add_argument('-h', '--help')
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true")
parser.add_argument(
"-o", "--host", dest="host", default="127.0.0.1", action="store"
)
parser.add_argument("-p", "--port", dest="port", default=7979, action="store")
parser.add_argument(
"-r", "--redis_host", dest="redis_host", default="localhost", action="store"
)
parser.add_argument(
"-n", "--redis_port", dest="redis_port", default=6379, action="store"
)
args = parser.parse_args()
asyncio.run(main(args))