Skip to content

Commit

Permalink
Move IEventService definition to ZenHub.
Browse files Browse the repository at this point in the history
Moved the interface because PBDaemon is the class that actually
implements the IEventService.
  • Loading branch information
jpeacock-zenoss committed Sep 26, 2024
1 parent ba1d275 commit f1cc3e0
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Products/DataCollector/SnmpClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from twisted.internet.error import TimeoutError
from pynetsnmp.twistedsnmp import snmpprotocol, Snmpv3Error

from Products.ZenCollector.interfaces import IEventService
from Products.ZenEvents import Event
from Products.ZenEvents.ZenEventClasses import Status_Snmp
from Products.ZenHub.interfaces import IEventService
from Products.ZenUtils.Driver import drive
from Products.ZenUtils.snmp import (
SnmpAgentDiscoverer,
Expand Down
2 changes: 1 addition & 1 deletion Products/DataCollector/SshClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

from Products.DataCollector import CollectorClient
from Products.DataCollector.Exceptions import LoginFailed
from Products.ZenCollector.interfaces import IEventService
from Products.ZenEvents import Event
from Products.ZenHub.interfaces import IEventService
from Products.ZenUtils.IpUtil import getHostByName
from Products.ZenUtils.Utils import getExitMessage

Expand Down
2 changes: 1 addition & 1 deletion Products/DataCollector/zenmodeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
)
from Products.ZenCollector.cyberark import get_cyberark
from Products.ZenCollector.daemon import parseWorkerOptions, addWorkerOptions
from Products.ZenCollector.interfaces import IEventService
from Products.ZenEvents.ZenEventClasses import Heartbeat, Error
from Products.ZenHub.interfaces import IEventService
from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon, HubDown
from Products.ZenUtils.Driver import drive, driveLater
from Products.ZenUtils.Utils import unused, zenPath, wait
Expand Down
2 changes: 1 addition & 1 deletion Products/ZenCollector/cyberark.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
from zope.component import queryUtility

from Products.ZenEvents import Event
from Products.ZenHub.interfaces import IEventService
from Products.ZenUtils.GlobalConfig import getGlobalConfiguration

from .interfaces import IEventService
from .ExpiringCache import ExpiringCache

_CFG_URL = "cyberark-url"
Expand Down
5 changes: 1 addition & 4 deletions Products/ZenCollector/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
IConfigurationDispatchingFilter,
IConfigurationListener,
IDataService,
IEventService,
IFrameworkFactory,
IStatisticsService,
ITaskSplitter,
Expand All @@ -56,7 +55,7 @@
from .utils.maintenance import MaintenanceCycle, ZenHubHeartbeatSender


@implementer(ICollector, IDataService, IEventService)
@implementer(ICollector, IDataService)
class CollectorDaemon(RRDDaemon):
"""The daemon class for the entire ZenCollector framework."""

Expand Down Expand Up @@ -110,7 +109,6 @@ def __init__(
# that collector implementors can easily retrieve a reference back here
# if needed
provideUtility(self, ICollector)
provideUtility(self, IEventService)
provideUtility(self, IDataService)

# Register the collector's own preferences object so it may be easily
Expand Down Expand Up @@ -394,7 +392,6 @@ def _startMaintenance(self):
self.options.monitor,
self.name,
self.options.heartbeatTimeout,
self._eventqueue,
)
else:
heartbeatSender = None
Expand Down
11 changes: 2 additions & 9 deletions Products/ZenCollector/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import zope.interface

# IEventService imported here for ZenPack compability
from Products.ZenHub.interfaces import IEventService # noqa: F401
from Products.ZenUtils.observable import IObservable


Expand Down Expand Up @@ -549,15 +551,6 @@ def writeRRD(
"""


class IEventService(zope.interface.Interface):
"""
A service that allows the sending of an event.
"""

def sendEvent(event, **kw):
pass


class IFrameworkFactory(zope.interface.Interface):
"""
An abstract factory object that allows the collector framework to be
Expand Down
6 changes: 3 additions & 3 deletions Products/ZenCollector/utils/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from zope.component import getUtility

from Products.ZenEvents.ZenEventClasses import Heartbeat
from Products.ZenHub.interfaces import IEventService
from Products.ZenMessaging.queuemessaging.interfaces import IQueuePublisher

log = logging.getLogger("zen.maintenance")
Expand Down Expand Up @@ -65,17 +66,16 @@ class ZenHubHeartbeatSender(object):
Default heartbeat sender for CollectorDaemon.
"""

def __init__(self, monitor, daemon, timeout, queue):
def __init__(self, monitor, daemon, timeout):
self.__event = {
"eventClass": Heartbeat,
"device": monitor,
"component": daemon,
"timeout": timeout
}
self.__queue = queue

def heartbeat(self):
self.__queue.addHeartbeatEvent(self.__event)
getUtility(IEventService).sendHeartbeat(self.__event)


class MaintenanceCycle(object):
Expand Down
3 changes: 2 additions & 1 deletion Products/ZenEvents/SyslogMsgFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

from zope.interface import implements

from Products.ZenCollector.interfaces import ICollector, IEventService
from Products.ZenCollector.interfaces import ICollector
from Products.ZenHub.interfaces import ICollectorEventTransformer, \
IEventService, \
TRANSFORM_CONTINUE, \
TRANSFORM_DROP
from Products.ZenUtils.Utils import unused, zenPath
Expand Down
15 changes: 12 additions & 3 deletions Products/ZenHub/PBDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from twisted.internet import defer, reactor, task
from twisted.internet.error import ReactorNotRunning
from twisted.spread import pb
from zope.component import provideUtility
from zope.interface import implementer

from Products.ZenEvents.ZenEventClasses import (
App_Start,
Expand All @@ -43,6 +45,7 @@

from .errors import HubDown, translateError
from .events import EventClient, EventQueueManager
from .interfaces import IEventService
from .localserver import LocalServer, ZenHubStatus
from .metricpublisher import publisher
from .pinger import PingZenHub
Expand Down Expand Up @@ -78,6 +81,7 @@ def callRemote(self, *args, **kwargs):
return defer.fail(HubDown())


@implementer(IEventService)
class PBDaemon(ZenDaemon, pb.Referenceable):
"""Base class for services that connect to ZenHub."""

Expand All @@ -100,6 +104,8 @@ def __init__(
if name is not None:
self.name = self.mname = name

provideUtility(self, IEventService)

super(PBDaemon, self).__init__(noopts, keeproot)

# Configure/initialize the ZenHub client
Expand All @@ -124,7 +130,7 @@ def __init__(
for evt in self.startEvent, self.stopEvent:
evt.update(details)

self._eventqueue = EventQueueManager(self.options, self.log)
self.__eventqueue = EventQueueManager(self.options, self.log)
self._metrologyReporter = None

self.__publisher = publisher
Expand All @@ -134,7 +140,7 @@ def __init__(

self.__eventclient = EventClient(
self.options,
self._eventqueue,
self.__eventqueue,
self.generateEvent,
lambda: self.getService("EventService"),
)
Expand Down Expand Up @@ -181,7 +187,7 @@ def services(self):

def __record_queued_events_count(self):
if self.rrdStats.name:
self.rrdStats.gauge("eventQueueLength", len(self._eventqueue))
self.rrdStats.gauge("eventQueueLength", len(self.__eventqueue))

def generateEvent(self, event, **kw):
"""
Expand Down Expand Up @@ -269,6 +275,9 @@ def eventService(self):
def sendEvents(self, events):
return self.__eventclient.sendEvents(events)

def sendHeartbeat(self, event):
self.__eventclient.sendHeartbeat(event)

@defer.inlineCallbacks
def sendEvent(self, event, **kw):
yield self.__eventclient.sendEvent(event, **kw)
Expand Down
3 changes: 3 additions & 0 deletions Products/ZenHub/events/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def sendEvent(self, event, **kw):
self.__queue.addEvent(built_event)
self.counters["eventCount"] += 1

def sendHeartbeat(self, event):
self.__queue.addHeartbeatEvent(event)

@defer.inlineCallbacks
def _last_push(self, task):
yield self._push()
Expand Down
8 changes: 5 additions & 3 deletions Products/ZenHub/events/queue/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from collections import deque
from itertools import chain

import six

from metrology import Metrology
from metrology.instruments import Gauge
from metrology.registry import registry
Expand Down Expand Up @@ -192,7 +194,7 @@ def chunk_events():
num_heartbeat_events = min(
chunk_remaining, len(prev_heartbeat_event_queue)
)
for i in xrange(num_heartbeat_events):
for _ in six.moves.range(num_heartbeat_events):
heartbeat_events.append(
prev_heartbeat_event_queue.popleft()
)
Expand All @@ -202,13 +204,13 @@ def chunk_events():
num_perf_events = min(
chunk_remaining, len(prev_perf_event_queue)
)
for i in xrange(num_perf_events):
for _ in six.moves.range(num_perf_events):
perf_events.append(prev_perf_event_queue.popleft())
chunk_remaining -= num_perf_events

events = []
num_events = min(chunk_remaining, len(prev_event_queue))
for i in xrange(num_events):
for _ in six.moves.range(num_events):
events.append(prev_event_queue.popleft())
return heartbeat_events, perf_events, events

Expand Down
15 changes: 15 additions & 0 deletions Products/ZenHub/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ def generate(event):
"""


class IEventService(Interface):
"""
A service that allows the sending of an event.
"""

def sendEvents(events):
pass

def sendEvent(event, **kw):
pass

def sendHeartbeat(event):
pass


TRANSFORM_CONTINUE = 0
TRANSFORM_STOP = 1
TRANSFORM_DROP = 2
Expand Down
4 changes: 1 addition & 3 deletions Products/ZenHub/tests/test_PBDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from Products.ZenHub.PBDaemon import (
collections,
defer,
# EventQueueManager,
PBDaemon,
publisher,
)
Expand Down Expand Up @@ -91,7 +90,6 @@ def test___init__(
t.assertEqual(pbd.rrdStats, DaemonStats.return_value)
t.assertEqual(pbd.lastStats, 0)
t.assertEqual(pbd.services, _getZenHubClient.return_value.services)
t.assertEqual(pbd._eventqueue, EventQueueManager.return_value)
t.assertEqual(pbd.startEvent, startEvent.copy())
t.assertEqual(pbd.stopEvent, stopEvent.copy())

Expand Down Expand Up @@ -198,7 +196,7 @@ def setUp(t):
]

for target in patches:
patcher = patch("{src}.{}".format(target, **PATH), autospec=True)
patcher = patch("{src}.{}".format(target, **PATH), spec=True)
setattr(t, target, patcher.start())
t.addCleanup(patcher.stop)

Expand Down

0 comments on commit f1cc3e0

Please sign in to comment.