diff --git a/Products/ZenUtils/MySqlZodbFactory.py b/Products/ZenUtils/MySqlZodbFactory.py index 1ede38b9b1..451a1c73a7 100644 --- a/Products/ZenUtils/MySqlZodbFactory.py +++ b/Products/ZenUtils/MySqlZodbFactory.py @@ -9,8 +9,10 @@ import logging import optparse +import time import uuid +import MySQLdb import relstorage.adapters.mysql import relstorage.options import relstorage.storage @@ -24,6 +26,7 @@ _DEFAULT_MYSQLPORT = 3306 _DEFAULT_COMMIT_LOCK_TIMEOUT = 30 +_OPERATIONAL_ERROR_RETRY_DELAY = 0.5 log = logging.getLogger("zen.MySqlZodbFactory") @@ -133,7 +136,10 @@ def getConnection(self, **kwargs): if cache_servers: relstoreParams["cache_servers"] = cache_servers - storage = relstorage.storage.RelStorage(adapter, **relstoreParams) + storage = _get_storage(adapter, relstoreParams) + if storage is None: + raise SystemExit("Unable to retrieve ZODB storage") + cache_size = kwargs.get("zodb_cachesize", 1000) db = ZODB.DB(storage, cache_size=cache_size) import Globals @@ -226,3 +232,25 @@ def buildOptions(self, parser): ), ) parser.add_option_group(group) + + +def _get_storage(adapter, params): + attempt = 0 + while attempt < 3: + try: + return relstorage.storage.RelStorage(adapter, **params) + except MySQLdb.OperationalError as ex: + error = str(ex) + # Sleep for a very short duration. Celery signal handlers + # are given short durations to complete. + time.sleep(_OPERATIONAL_ERROR_RETRY_DELAY) + attempt += 1 + except Exception as ex: + log.exception("unexpected failure") + # To avoid retrying on unexpected errors, set `attempt` to 3 to + # cause the loop to exit on the next iteration to allow the + # "else:" clause to run and cause this worker to exit. + error = str(ex) + attempt = 3 + else: + log.error("failed to initialize ZODB connection: %s", error) diff --git a/Products/ZenUtils/tests/test_MySqlZodbFactory.py b/Products/ZenUtils/tests/test_MySqlZodbFactory.py new file mode 100644 index 0000000000..b594f2d3bd --- /dev/null +++ b/Products/ZenUtils/tests/test_MySqlZodbFactory.py @@ -0,0 +1,83 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging + +from unittest import TestCase + +from mock import call, Mock, patch + +from Products.ZenUtils.MySqlZodbFactory import ( + MySQLdb, + _get_storage, + _OPERATIONAL_ERROR_RETRY_DELAY, +) + +PATH = {"src": "Products.ZenUtils.MySqlZodbFactory"} + + +class TestGetStorage(TestCase): + """Test the _get_storage function.""" + + def setUp(t): + log = logging.getLogger() + log.setLevel(logging.FATAL + 1) + + def tearDown(t): + log = logging.getLogger() + log.setLevel(logging.NOTSET) + + @patch("{src}.relstorage.storage.RelStorage".format(**PATH), autospec=True) + def test_nominal(t, relstorage_): + params = {"a": 1} + adapter = Mock() + + storage = _get_storage(adapter, params) + + t.assertEqual(storage, relstorage_.return_value) + relstorage_.assert_called_with(adapter, a=1) + + @patch("{src}.time".format(**PATH), autospec=True) + @patch("{src}.relstorage.storage.RelStorage".format(**PATH), autospec=True) + def test_operational_error(t, relstorage_, time_): + params = {"a": 1} + adapter = Mock() + + ex = MySQLdb.OperationalError() + relstorage_.side_effect = ex + + sleep_calls = ( + call(_OPERATIONAL_ERROR_RETRY_DELAY), + call(_OPERATIONAL_ERROR_RETRY_DELAY), + call(_OPERATIONAL_ERROR_RETRY_DELAY), + ) + + storage = _get_storage(adapter, params) + + t.assertIsNone(storage) + time_.sleep.assert_has_calls(sleep_calls) + t.assertEqual(len(sleep_calls), relstorage_.call_count) + t.assertEqual(len(sleep_calls), time_.sleep.call_count) + + @patch("{src}.time".format(**PATH), autospec=True) + @patch("{src}.relstorage.storage.RelStorage".format(**PATH), autospec=True) + def test_unexpected_error(t, relstorage_, time_): + params = {"a": 1} + adapter = Mock() + + ex = Exception() + relstorage_.side_effect = ex + + storage = _get_storage(adapter, params) + + t.assertIsNone(storage) + t.assertEqual(1, relstorage_.call_count) + t.assertEqual(0, time_.call_count)