Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic pool & healthcheck query to handle broken connections #35

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 66 additions & 51 deletions events_api/events_api/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import dateutil.parser
import copy

from contextlib import contextmanager
from psycopg2.extras import RealDictCursor
from psycopg2 import pool

from session import session

Expand Down Expand Up @@ -114,30 +116,44 @@ def read_configuration() -> dict:
return config if isinstance(config, dict) else {}


eventdb_conn = None
eventdb_pool = None
# Using a global object for the database connection
# must be initialised once
MAX_POOL_CONNECTIONS = 4


def open_db_connection(dsn: str):
""" Open the Connection to the EventDB
def open_db_pool(dsn: str):
"""Open the Connection pool to the EventDB and saves in global obj

Args:
dsn: a Connection - String
"""
global eventdb_pool

Returns: a Database Connection
eventdb_pool = pool.ThreadedConnectionPool(1, MAX_POOL_CONNECTIONS, dsn=dsn)

"""
global eventdb_conn

eventdb_conn = psycopg2.connect(dsn=dsn)
return eventdb_conn
@contextmanager
def db_connection():

# Pool doesn't check the connection automatically
# Iterate max_connection+1 ensures that we either get a refreshed connection,
# or the problem isn't solvable that way
for _ in range(MAX_POOL_CONNECTIONS + 1):
connection = eventdb_pool.getconn()
try:
connection.cursor().execute("SELECT 1;")
break
except psycopg2.Error as exc:
log.info("Healthcheck connection failed: %s", repr(exc))
eventdb_pool.putconn(connection, close=True)
else:
raise RuntimeError(f"Cannot get a healthy connection")

def __rollback_transaction():
global eventdb_conn
log.log(DD, "Calling rollback()")
eventdb_conn.rollback()
try:
yield connection
finally:
eventdb_pool.putconn(connection)


QUERY_EVENT_SUBQUERY = {
Expand Down Expand Up @@ -245,7 +261,6 @@ def __rollback_transaction():
'label': 'Destination FQDN contains',
'exp_type': 'string'
},

# Classification
'classification-taxonomy_is': {
'sql': 'events."classification.taxonomy" ILIKE %s',
Expand Down Expand Up @@ -538,19 +553,24 @@ def query(prepared_query):
Returns: The results of the databasequery in JSON-Format.

"""
global eventdb_conn

# psycopgy2.4 does not offer 'with' for cursor()
# FUTURE: use with
cur = eventdb_conn.cursor(cursor_factory=RealDictCursor)
with db_connection() as conn:
# psycopgy2.4 does not offer 'with' for cursor()
# FUTURE: use with
cur = conn.cursor(cursor_factory=RealDictCursor)

operation = prepared_query[0]
parameters = prepared_query[1]
log.info(cur.mogrify(operation, parameters))
cur.execute(operation, parameters)
log.log(DD, "Ran query={}".format(repr(cur.query.decode('utf-8'))))
# description = cur.description
results = cur.fetchall()
operation = prepared_query[0]
parameters = prepared_query[1]
try:
log.info(cur.mogrify(operation, parameters))
cur.execute(operation, parameters)
log.log(DD, "Ran query={}".format(repr(cur.query.decode("utf-8"))))
# description = cur.description
results = cur.fetchall()
except (psycopg2.Error, AttributeError):
log.log(DD, "Calling rollback()")
conn.rollback()
raise

return results

Expand All @@ -560,7 +580,7 @@ def setup(api):
config = read_configuration()
if "logging_level" in config:
log.setLevel(config["logging_level"])
open_db_connection(config["libpg conninfo"])
open_db_pool(config["libpg conninfo"])
log.debug("Initialised DB connection for events_api.")

global QUERY_TABLE_NAME
Expand Down Expand Up @@ -601,34 +621,34 @@ def setup(api):

def _db_has_mailgen_tables():
"""Query the database to check if the mailgen tables exist."""
global eventdb_conn

# psycopgy2.4 does not offer 'with' for cursor()
# FUTURE: use with
cur = eventdb_conn.cursor(cursor_factory=RealDictCursor)

# tables `directives` and `sent` go together, so we check only one of them
cur.execute("SELECT to_regclass('public.directives')")
# this should always work if the connection to the db is okay
if cur.fetchone()['to_regclass'] == 'directives':
log.debug("Found intelmq-cb-mailgen table `directives`.")
return True
else:
return False

with db_connection() as conn:
# psycopgy2.4 does not offer 'with' for cursor()
# FUTURE: use with
cur = conn.cursor(cursor_factory=RealDictCursor)

# tables `directives` and `sent` go together, so we check only one of them
cur.execute("SELECT to_regclass('public.directives')")
# this should always work if the connection to the db is okay
if cur.fetchone()["to_regclass"] == "directives":
log.debug("Found intelmq-cb-mailgen table `directives`.")
return True
else:
return False


def _db_get_timezone():
"""Query the database for its timezone setting."""
# Original. If you change it, update the copy
# in events_api/events_api/serve.py as well.
global eventdb_conn

# psycopgy2.4 does not offer 'with' for cursor()
# FUTURE: use with
cur = eventdb_conn.cursor(cursor_factory=RealDictCursor)
with db_connection() as conn:
# psycopgy2.4 does not offer 'with' for cursor()
# FUTURE: use with
cur = conn.cursor(cursor_factory=RealDictCursor)

cur.execute("SHOW timezone")
return cur.fetchone()['TimeZone']
cur.execute("SHOW timezone")
return cur.fetchone()["TimeZone"]


@hug.get(ENDPOINT_PREFIX, examples="id=1", requires=session.token_authentication)
Expand Down Expand Up @@ -657,7 +677,6 @@ def getEvent(response, id: int = None):
rows = query(prep)
except psycopg2.Error as e:
log.error(e)
__rollback_transaction()
response.status = HTTP_INTERNAL_SERVER_ERROR
return {"error": "The query could not be processed."}

Expand Down Expand Up @@ -735,7 +754,6 @@ def search(response, **params):
rows = query(prep)
except psycopg2.Error as e:
log.error(e)
__rollback_transaction()
response.status = HTTP_INTERNAL_SERVER_ERROR
return {"error": "The query could not be processed."}

Expand Down Expand Up @@ -861,13 +879,11 @@ def stats(response, **params):

except psycopg2.Error as e:
log.error(e)
__rollback_transaction()
response.status = HTTP_INTERNAL_SERVER_ERROR
return {"error": "The query could not be processed."}

except AttributeError as e:
log.error(e)
__rollback_transaction()
response.status = HTTP_INTERNAL_SERVER_ERROR
return {"error": "Something went wrong."}

Expand All @@ -893,8 +909,7 @@ def main():
print("log effective level = \"{}\"".format(
logging.getLevelName(log.getEffectiveLevel())))

global eventdb_conn
eventdb_conn = open_db_connection(config["libpg conninfo"])
open_db_pool(config["libpg conninfo"])

# TODO: Maybe add a search interface for the CLI
# params={'t.o_after': '2017-03-01', 's.ip_in_sn': '31.25.41.74'}
Expand Down