From 9e8465851506fa9758c3031a7ce42000daa1b01f Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Wed, 20 Mar 2024 15:08:23 +0100 Subject: [PATCH] Fix for failing on broken connections Basic connection pool & healthcheck usage to avoid failing on broken connections. Rollback was moved to query function to make it easier to manage the connection. --- events_api/events_api/serve.py | 117 +++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 51 deletions(-) diff --git a/events_api/events_api/serve.py b/events_api/events_api/serve.py index 9459f7b..5ce6710 100644 --- a/events_api/events_api/serve.py +++ b/events_api/events_api/serve.py @@ -47,7 +47,9 @@ import dateutil.parser import copy +from contextlib import contextmanager from psycopg2.extras import RealDictCursor +from psycopg2 import pool from session import session @@ -114,30 +116,44 @@ def read_configuration() -> dict: return config if isinstance(config, dict) else {} -eventdb_conn = None +eventdb_pool = None # Using a global object for the database connection # must be initialised once +MAX_POOL_CONNECTIONS = 4 -def open_db_connection(dsn: str): - """ Open the Connection to the EventDB +def open_db_pool(dsn: str): + """Open the Connection pool to the EventDB and saves in global obj Args: dsn: a Connection - String + """ + global eventdb_pool - Returns: a Database Connection + eventdb_pool = pool.ThreadedConnectionPool(1, MAX_POOL_CONNECTIONS, dsn=dsn) - """ - global eventdb_conn - eventdb_conn = psycopg2.connect(dsn=dsn) - return eventdb_conn +@contextmanager +def db_connection(): + # Pool doesn't check the connection automatically + # Iterate max_connection+1 ensures that we either get a refreshed connection, + # or the problem isn't solvable that way + for _ in range(MAX_POOL_CONNECTIONS + 1): + connection = eventdb_pool.getconn() + try: + connection.cursor().execute("SELECT 1;") + break + except psycopg2.Error as exc: + log.info("Healthcheck connection failed: %s", repr(exc)) + eventdb_pool.putconn(connection, close=True) + else: + raise RuntimeError(f"Cannot get a healthy connection") -def __rollback_transaction(): - global eventdb_conn - log.log(DD, "Calling rollback()") - eventdb_conn.rollback() + try: + yield connection + finally: + eventdb_pool.putconn(connection) QUERY_EVENT_SUBQUERY = { @@ -245,7 +261,6 @@ def __rollback_transaction(): 'label': 'Destination FQDN contains', 'exp_type': 'string' }, - # Classification 'classification-taxonomy_is': { 'sql': 'events."classification.taxonomy" ILIKE %s', @@ -538,19 +553,24 @@ def query(prepared_query): Returns: The results of the databasequery in JSON-Format. """ - global eventdb_conn - # psycopgy2.4 does not offer 'with' for cursor() - # FUTURE: use with - cur = eventdb_conn.cursor(cursor_factory=RealDictCursor) + with db_connection() as conn: + # psycopgy2.4 does not offer 'with' for cursor() + # FUTURE: use with + cur = conn.cursor(cursor_factory=RealDictCursor) - operation = prepared_query[0] - parameters = prepared_query[1] - log.info(cur.mogrify(operation, parameters)) - cur.execute(operation, parameters) - log.log(DD, "Ran query={}".format(repr(cur.query.decode('utf-8')))) - # description = cur.description - results = cur.fetchall() + operation = prepared_query[0] + parameters = prepared_query[1] + try: + log.info(cur.mogrify(operation, parameters)) + cur.execute(operation, parameters) + log.log(DD, "Ran query={}".format(repr(cur.query.decode("utf-8")))) + # description = cur.description + results = cur.fetchall() + except (psycopg2.Error, AttributeError): + log.log(DD, "Calling rollback()") + conn.rollback() + raise return results @@ -560,7 +580,7 @@ def setup(api): config = read_configuration() if "logging_level" in config: log.setLevel(config["logging_level"]) - open_db_connection(config["libpg conninfo"]) + open_db_pool(config["libpg conninfo"]) log.debug("Initialised DB connection for events_api.") global QUERY_TABLE_NAME @@ -601,34 +621,34 @@ def setup(api): def _db_has_mailgen_tables(): """Query the database to check if the mailgen tables exist.""" - global eventdb_conn - - # psycopgy2.4 does not offer 'with' for cursor() - # FUTURE: use with - cur = eventdb_conn.cursor(cursor_factory=RealDictCursor) - - # tables `directives` and `sent` go together, so we check only one of them - cur.execute("SELECT to_regclass('public.directives')") - # this should always work if the connection to the db is okay - if cur.fetchone()['to_regclass'] == 'directives': - log.debug("Found intelmq-cb-mailgen table `directives`.") - return True - else: - return False + + with db_connection() as conn: + # psycopgy2.4 does not offer 'with' for cursor() + # FUTURE: use with + cur = conn.cursor(cursor_factory=RealDictCursor) + + # tables `directives` and `sent` go together, so we check only one of them + cur.execute("SELECT to_regclass('public.directives')") + # this should always work if the connection to the db is okay + if cur.fetchone()["to_regclass"] == "directives": + log.debug("Found intelmq-cb-mailgen table `directives`.") + return True + else: + return False def _db_get_timezone(): """Query the database for its timezone setting.""" # Original. If you change it, update the copy # in events_api/events_api/serve.py as well. - global eventdb_conn - # psycopgy2.4 does not offer 'with' for cursor() - # FUTURE: use with - cur = eventdb_conn.cursor(cursor_factory=RealDictCursor) + with db_connection() as conn: + # psycopgy2.4 does not offer 'with' for cursor() + # FUTURE: use with + cur = conn.cursor(cursor_factory=RealDictCursor) - cur.execute("SHOW timezone") - return cur.fetchone()['TimeZone'] + cur.execute("SHOW timezone") + return cur.fetchone()["TimeZone"] @hug.get(ENDPOINT_PREFIX, examples="id=1", requires=session.token_authentication) @@ -657,7 +677,6 @@ def getEvent(response, id: int = None): rows = query(prep) except psycopg2.Error as e: log.error(e) - __rollback_transaction() response.status = HTTP_INTERNAL_SERVER_ERROR return {"error": "The query could not be processed."} @@ -735,7 +754,6 @@ def search(response, **params): rows = query(prep) except psycopg2.Error as e: log.error(e) - __rollback_transaction() response.status = HTTP_INTERNAL_SERVER_ERROR return {"error": "The query could not be processed."} @@ -861,13 +879,11 @@ def stats(response, **params): except psycopg2.Error as e: log.error(e) - __rollback_transaction() response.status = HTTP_INTERNAL_SERVER_ERROR return {"error": "The query could not be processed."} except AttributeError as e: log.error(e) - __rollback_transaction() response.status = HTTP_INTERNAL_SERVER_ERROR return {"error": "Something went wrong."} @@ -893,8 +909,7 @@ def main(): print("log effective level = \"{}\"".format( logging.getLevelName(log.getEffectiveLevel()))) - global eventdb_conn - eventdb_conn = open_db_connection(config["libpg conninfo"]) + open_db_pool(config["libpg conninfo"]) # TODO: Maybe add a search interface for the CLI # params={'t.o_after': '2017-03-01', 's.ip_in_sn': '31.25.41.74'}