Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler service #32

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions background_workers/README.md

This file was deleted.

7 changes: 7 additions & 0 deletions scheduler_service/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DATABASE_USER=
DATABASE_PASSWORD=
DATABASE_HOST=
DATABASE_NAME=
AUTH_KEY=
FLASK_APP_PORT=7001
OPENWEATHERMAP_API_KEY=
25 changes: 25 additions & 0 deletions scheduler_service/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
blinker==1.6.3
certifi==2023.7.22
charset-normalizer==3.3.1
click==8.1.7
colorama==0.4.6
Flask==3.0.0
greenlet==3.0.1
idna==3.4
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.3
numpy==1.26.1
pandas==2.1.2
psycopg2==2.9.9
python-dateutil==2.8.2
python-dotenv==1.0.0
pytz==2023.3.post1
requests==2.31.0
schedule==1.2.1
six==1.16.0
SQLAlchemy==2.0.22
typing_extensions==4.8.0
tzdata==2023.3
urllib3==2.0.7
Werkzeug==3.0.1
64 changes: 64 additions & 0 deletions scheduler_service/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
import threading
import time
import logging

import schedule
from dotenv import load_dotenv
from utils import connected_to_internet
from update_weather.update_weather import update_weather

# Env vars
load_dotenv()
DATABASE_USER = os.environ.get("DATABASE_USER")
DATABASE_PASSWORD = os.environ.get("DATABASE_PASSWORD")
DATABASE_HOST = os.environ.get("DATABASE_HOST")
DATABASE_NAME = os.environ.get("DATABASE_NAME")
AUTH_KEY = os.environ.get("AUTH_KEY")
FLASK_APP_PORT = os.environ.get("FLASK_APP_PORT")
OPENWEATHERMAP_API_KEY = os.environ.get("OPENWEATHERMAP_API_KEY")


def threaded_update_weather(weather_row_id=1):
if connected_to_internet():
thread = threading.Thread(
target=update_weather,
name="update_weather_thread",
daemon=True,
args=(
DATABASE_USER,
DATABASE_PASSWORD,
DATABASE_HOST,
DATABASE_NAME,
OPENWEATHERMAP_API_KEY,
weather_row_id,
),
)
thread.start()


# def print_hi():
# time.sleep(10)
# logging.info("Hi")


# def thread_test():
# thread = threading.Thread(
# target=print_hi,
# name="print_hi_thread",
# )
# thread.start()


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="TID-%(thread)d | %(threadName)s: %(message)s"
)
schedule.every(30).seconds.do(threaded_update_weather, 15)
schedule.every(30).seconds.do(threaded_update_weather, 1)

while True:
thread_names = ", ".join(thread.name for thread in threading.enumerate())
logging.info(f"{threading.active_count()} active threads: {thread_names}")
schedule.run_pending()
time.sleep(5)
105 changes: 105 additions & 0 deletions scheduler_service/src/update_weather/update_weather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
from datetime import datetime

import pandas as pd
import requests
from sqlalchemy import create_engine, inspect

API_BASE_URL = "https://api.openweathermap.org/data/2.5/weather"


def get_weather(lat, long, API_KEY): # Copied from etl_weather
units = "metric"
exclude = "minutely,hourly,daily,alerts" # To exclude certain weather reports, right now just using current
url = f"{API_BASE_URL}?lat={lat}&lon={long}&units={units}&appid={API_KEY}"
response = requests.get(url)
if response.status_code != 200:
print(response.json())
raise response.raise_for_status()

weather_data = response.json()
precipitation = (
0
if "rain" not in weather_data
else weather_data["rain"]["1h"] or weather_data["rain"]["3h"]
)
weather_dict = {
"Latitude": lat,
"Longitude": long,
"Temperature (C)": weather_data["main"]["temp"],
"Wind Speed (m/s)": weather_data["wind"]["speed"],
"Wind Direction": weather_data["wind"]["deg"],
"Weather": weather_data["weather"][0]["main"],
"Weather Description": weather_data["weather"][0]["description"],
"Pressure (hPa)": weather_data["main"]["pressure"],
"Precipitation (mm)": precipitation,
"Cloud Cover": weather_data["clouds"]["all"],
"Humidity": weather_data["main"]["humidity"],
}
return weather_dict


def update_weather(db_user, db_password, db_host, db_name, API_KEY, weather_row_id=1):
logging.info(f"update_weather func with weather_row_id={weather_row_id}")

assert weather_row_id >= 1
engine = create_engine(
f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}/{db_name}"
)

if not inspect(engine).has_table("weather"):
raise SystemError("weather table does not exist in database")
weather_df = pd.read_sql_query(
sql="SELECT * FROM weather ORDER BY id ASC", con=engine
)
weather_df.index += 1 # Make it 1-indexed
weather_df_len = len(weather_df) + 1

if weather_row_id >= weather_df_len:
return (
f"weather_row_id {weather_row_id} exceeds number of weather rows ({len(weather_df)})",
404,
)

print(f"Updating weather data from id {weather_row_id}...")
for i in range(weather_row_id, weather_df_len):
lat = weather_df.loc[i, "lat"]
lon = weather_df.loc[i, "lon"]
weather_data = get_weather(lat, lon, API_KEY)
weather_df.loc[i, "temperature"] = weather_data["Temperature (C)"]
weather_df.loc[i, "humidity"] = weather_data["Humidity"]
weather_df.loc[i, "wind_speed"] = weather_data["Wind Speed (m/s)"]
weather_df.loc[i, "wind_direction"] = weather_data["Wind Direction"]
weather_df.loc[i, "cloud_cover"] = weather_data["Cloud Cover"]

weather_response = weather_df.to_sql(
name="weather",
con=engine,
schema="public",
if_exists="replace",
index=False,
method="multi",
)
if weather_df.shape[0] != weather_response:
raise SystemError("weather table update failed")
logging.info(
f"weather table starting at id={weather_row_id} updated at {datetime.now().strftime('%m/%d/%Y %H:%M:%S')}"
)
return


if __name__ == "__main__":
db_user = ""
db_password = ""
db_host = ""
db_name = ""
API_KEY = ""
weather_row_id = 1
update_weather(
db_user,
db_password,
db_host,
db_name,
API_KEY,
weather_row_id,
)
13 changes: 13 additions & 0 deletions scheduler_service/src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import requests


def connected_to_internet():
try:
request = requests.get("http://www.google.com", timeout=5)
return True
except (requests.ConnectionError, requests.Timeout) as exception:
return False


if __name__ == "__main__":
pass