Skip to content

Commit

Permalink
Merge pull request #4590 from zenoss/bugfix/ZEN-35118.6x
Browse files Browse the repository at this point in the history
Remove unnecessary code in zenhub's invalidations.
  • Loading branch information
jpeacock-zenoss authored Nov 1, 2024
2 parents 16ead3d + 058f230 commit 07b36ec
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 233 deletions.
35 changes: 17 additions & 18 deletions Products/ZenHub/invalidationmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from itertools import chain
from functools import wraps

from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import inlineCallbacks, returnValue
from ZODB.POSException import POSKeyError
from zope.component import getUtility, getUtilitiesFor, subscribers

Expand All @@ -36,11 +36,10 @@
)
from .invalidations import INVALIDATIONS_PAUSED

log = logging.getLogger("zen.{}".format(__name__.split(".")[-1].lower()))
log = logging.getLogger("zen.zenhub.invalidations")


class InvalidationManager(object):

_invalidation_paused_event = {
"summary": "Invalidation processing is "
"currently paused. To resume, set "
Expand All @@ -58,15 +57,13 @@ class InvalidationManager(object):
def __init__(
self,
dmd,
log,
syncdb,
poll_invalidations,
send_event,
poll_interval=30,
):
self.__dmd = dmd
self.__syncdb = syncdb
self.log = log
self.__poll_invalidations = poll_invalidations
self.__send_event = send_event
self.poll_interval = poll_interval
Expand All @@ -80,7 +77,6 @@ def __init__(
self.__dmd
)
self.processor = getUtility(IInvalidationProcessor)
log.debug("got InvalidationProcessor %s", self.processor)
app = self.__dmd.getPhysicalRoot()
self.invalidation_pipeline = InvalidationPipeline(
app, self._invalidation_filters, self._queue
Expand All @@ -105,7 +101,7 @@ def initialize_invalidation_filters(ctx):
fltr.initialize(ctx)
invalidation_filters.append(fltr)
log.info(
"Registered %s invalidation filters.",
"registered %s invalidation filters.",
len(invalidation_filters),
)
log.info("invalidation filters: %s", invalidation_filters)
Expand All @@ -127,35 +123,38 @@ def process_invalidations(self):
now = time()
yield self._syncdb()
if self._paused():
return
returnValue(None)

oids = self._poll_invalidations()
if not oids:
log.debug("no invalidations found: oids=%s", oids)
return
log.debug("no invalidations found")
returnValue(None)

for oid in oids:
yield self.invalidation_pipeline.run(oid)

self.log.debug("Processed %s raw invalidations", len(oids))
yield self.processor.processQueue(self._queue)
handled, ignored = yield self.processor.processQueue(self._queue)
log.debug(
"processed invalidations "
"handled-count=%d, ignored-count=%d",
handled,
ignored,
)
self._queue.clear()

except Exception:
log.exception("error in process_invalidations")
finally:
self.totalEvents += 1
self.totalTime += time() - now
log.debug("end process_invalidations")

@inlineCallbacks
def _syncdb(self):
try:
self.log.debug("[processQueue] syncing....")
log.debug("syncing with ZODB ...")
yield self.__syncdb()
self.log.debug("[processQueue] synced")
log.debug("synced with ZODB")
except Exception:
self.log.warn("Unable to poll invalidations, will try again.")
log.warn("Unable to poll invalidations")

def _paused(self):
if not self._currently_paused:
Expand Down Expand Up @@ -183,7 +182,7 @@ def _poll_invalidations(self):
log.debug("poll invalidations from dmd.storage")
return self.__poll_invalidations()
except Exception:
log.exception("error in _poll_invalidations")
log.exception("failed to poll invalidations")

@inlineCallbacks
def _send_event(self, event):
Expand Down
115 changes: 49 additions & 66 deletions Products/ZenHub/invalidations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

import logging

from BTrees.IIBTree import IITreeSet
from twisted.internet import defer
from ZODB.utils import u64
from zope.component import adapter, getGlobalSiteManager
from zope.interface import implementer, providedBy

Expand All @@ -24,50 +22,10 @@
from .interfaces import IInvalidationProcessor, IHubCreatedEvent
from .zodb import UpdateEvent, DeletionEvent


log = logging.getLogger("zen.ZenHub")
log = logging.getLogger("zen.zenhub.invalidations")
INVALIDATIONS_PAUSED = "PAUSED"


@defer.inlineCallbacks
def betterObjectEventNotify(event):
"""
This method re-implements zope.component.event.objectEventNotify to give
more time back to the reactor. It is slightly different, but works exactly
the same for our specific use case.
"""
gsm = getGlobalSiteManager()
subscriptions = gsm.adapters.subscriptions(
map(providedBy, (event.object, event)), None
)
for subscription in subscriptions:
yield giveTimeToReactor(subscription, event.object, event)


def handle_oid(dmd, oid):
# Go pull the object out of the database
obj = dmd._p_jar[oid]
# Don't bother with all the catalog stuff; we're depending on primaryAq
# existing anyway, so only deal with it if it actually has primaryAq.
if isinstance(obj, PrimaryPathObjectManager) or isinstance(
obj, DeviceComponent
):
try:
# Try to get the object
obj = obj.__of__(dmd).primaryAq()
except (AttributeError, KeyError):
# Object has been removed from its primary path (i.e. was
# deleted), so make a DeletionEvent
log.debug("Notifying services that %r has been deleted", obj)
event = DeletionEvent(obj, oid)
else:
# Object was updated, so make an UpdateEvent
log.debug("Notifying services that %r has been updated", obj)
event = UpdateEvent(obj, oid)
# Fire the event for all interested services to pick up
return betterObjectEventNotify(event)


@implementer(IInvalidationProcessor)
class InvalidationProcessor(object):
"""
Expand All @@ -76,12 +34,10 @@ class InvalidationProcessor(object):
cause collectors to be pushed updates.
"""

_invalidation_queue = None
_hub = None
_hub_ready = None

def __init__(self):
self._invalidation_queue = IITreeSet()
self._hub_ready = defer.Deferred()
getGlobalSiteManager().registerHandler(self.onHubCreated)

Expand All @@ -93,26 +49,53 @@ def onHubCreated(self, event):
@defer.inlineCallbacks
def processQueue(self, oids):
yield self._hub_ready
i = 0
queue = self._invalidation_queue
if self._hub.dmd.pauseHubNotifications:
log.debug("notifications are currently paused")
defer.returnValue(INVALIDATIONS_PAUSED)
for i, oid in enumerate(oids):
ioid = u64(oid)
# Try pushing it into the queue, which is an IITreeSet.
# If it inserted successfully it returns 1, else 0.
if queue.insert(ioid):
# Get the deferred that does the notification
d = self._dispatch(self._hub.dmd, oid, ioid, queue)
yield d
defer.returnValue(i)
handled, ignored = 0, 0
for oid in oids:
try:
obj = self._hub.dmd._p_jar[oid]
# Don't bother with all the catalog stuff; we're depending on
# primaryAq existing anyway, so only deal with it if it
# actually has primaryAq.
if isinstance(
obj, (PrimaryPathObjectManager, DeviceComponent)
):
handled += 1
event = _get_event(self._hub.dmd, obj, oid)
yield _notify_event_subscribers(event)
else:
ignored += 1
except KeyError:
log.warning("object not found oid=%r", oid)
defer.returnValue((handled, ignored))


def _get_event(dmd, obj, oid):
try:
# Try to get the object
obj = obj.__of__(dmd).primaryAq()
except (AttributeError, KeyError):
# Object has been removed from its primary path (i.e. was
# deleted), so make a DeletionEvent
log.debug("notifying services that %r has been deleted", obj)
return DeletionEvent(obj, oid)
else:
# Object was updated, so make an UpdateEvent
log.debug("notifying services that %r has been updated", obj)
return UpdateEvent(obj, oid)

def _dispatch(self, dmd, oid, ioid, queue):
"""
Send to all the services that care by firing events.
"""

@defer.inlineCallbacks
def _notify_event_subscribers(event):
gsm = getGlobalSiteManager()
subscriptions = gsm.adapters.subscriptions(
map(providedBy, (event.object, event)), None
)
for subscription in subscriptions:
try:
return handle_oid(dmd, oid)
finally:
queue.remove(ioid)
yield giveTimeToReactor(subscription, event.object, event)
except Exception:
log.exception(
"failure in suscriber subscriber=%r event=%r",
subscription,
event,
)
8 changes: 4 additions & 4 deletions Products/ZenHub/services/PerformanceConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ def remote_propertyItems(self):
return self.conf.propertyItems()

@onUpdate(PerformanceConf)
def perfConfUpdated(self, object, event):
if object.id == self.instance:
def perfConfUpdated(self, conf, event):
if conf.id == self.instance:
for listener in self.listeners:
listener.callRemote("setPropertyItems", object.propertyItems())
listener.callRemote("setPropertyItems", conf.propertyItems())

@onUpdate(ZenPack)
def zenPackUpdated(self, object, event):
def zenPackUpdated(self, zenpack, event):
for listener in self.listeners:
try:
listener.callRemote(
Expand Down
19 changes: 10 additions & 9 deletions Products/ZenHub/tests/test_invalidationmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,21 @@ def setUp(t):
t.dmd = Mock(
name="dmd", spec_set=["getPhysicalRoot", "pauseHubNotifications"]
)
t.log = Mock(name="log", spec_set=["debug", "warn", "info"])
t.syncdb = Mock(name="ZenHub.async_syncdb", spec_set=[])
t.poll_invalidations = Mock(
name="ZenHub.storage.poll_invalidations", spec_set=[]
)

t.send_event = Mock(ZenHub.sendEvent, name="ZenHub.sendEvent")
t.im = InvalidationManager(
t.dmd, t.log, t.syncdb, t.poll_invalidations, t.send_event
t.dmd, t.syncdb, t.poll_invalidations, t.send_event
)

def tearDown(t):
logging.disable(logging.NOTSET)

def test___init__(t):
t.assertEqual(t.im._InvalidationManager__dmd, t.dmd)
t.assertEqual(t.im.log, t.log)
t.assertEqual(t.im._InvalidationManager__syncdb, t.syncdb)
t.assertEqual(
t.im._InvalidationManager__poll_invalidations, t.poll_invalidations
Expand Down Expand Up @@ -198,16 +199,16 @@ def setUp(t):
def test_invalidation_pipeline(t):
t.invalidation_pipeline.run(t.oid)

t.assertEqual(t.sink, set([t.oid]))
t.assertEqual(t.sink, {t.oid})

def test__build_pipeline(t):
__pipeline = t.invalidation_pipeline._build_pipeline()
__pipeline.send(t.oid)

t.assertEqual(t.sink, set([t.oid]))
t.assertEqual(t.sink, {t.oid})

@patch("{src}.log".format(**PATH), autospec=True)
def test_run_handles_exceptions(t, log):
def test_run_handles_exceptions(t, log_):
"""An exception in any of the coroutines will first raise the exception
then cause StopIteration exceptions on subsequent runs.
we handle the first exception and rebuild the pipeline
Expand All @@ -219,8 +220,8 @@ def test_run_handles_exceptions(t, log):
t.invalidation_pipeline.run(x) # causes an exception
t.invalidation_pipeline.run(t.oid)

log.exception.assert_called_with(ANY)
t.assertEqual(t.sink, set([t.oid]))
log_.exception.assert_called_with(ANY)
t.assertEqual(t.sink, {t.oid})
# ensure the dereferenced pipeline is cleaned up safely
import gc

Expand Down Expand Up @@ -380,7 +381,7 @@ class set_sink_Test(TestCase):
def test_set_sink_accepts_a_set(t):
output = set()
set_sink_pipe = set_sink(output)
set_sink_pipe.send({"a", "a", "b", "c"} or ("a",))
set_sink_pipe.send({"a", "b", "c"} or ("a",))
t.assertEqual(output, {"a", "b", "c"})

def test_set_sink_accepts_a_tuple(t):
Expand Down
Loading

0 comments on commit 07b36ec

Please sign in to comment.