diff --git a/Products/DataCollector/zenmodeler.py b/Products/DataCollector/zenmodeler.py index fe592f578a..9852585726 100755 --- a/Products/DataCollector/zenmodeler.py +++ b/Products/DataCollector/zenmodeler.py @@ -7,22 +7,6 @@ # ############################################################################## -# IMPORTANT! The import of the pysamba.twisted.reactor module should come -# before any other libraries that might possibly use twisted. This will -# ensure that the proper WmiReactor is installed before anyone else grabs a -# reference to the wrong reactor. -try: - import pysamba.twisted.reactor # noqa F401 - from ZenPacks.zenoss.WindowsMonitor.WMIClient import WMIClient - from ZenPacks.zenoss.WindowsMonitor.utils import ( - addNTLMv2Option, - setNTLMv2Auth, - ) - - USE_WMI = True -except ImportError: - USE_WMI = False - import cPickle as pickle import gzip import os @@ -35,6 +19,7 @@ from random import randint import DateTime +import six import zope.component from metrology import Metrology @@ -128,8 +113,7 @@ def __init__(self, single=False): if self.options.now: self.log.debug('option "now" specified, starting immediately.') else: - # self.startDelay = randint(10, 60) * 60 - self.startDelay = randint(10, 60) * 1 + self.startDelay = randint(10, 60) * 1 # noqa: S311 self.immediate = 0 self.log.info( 'option "now" not specified, waiting %s seconds to start.', @@ -243,6 +227,42 @@ def selectPlugins(self, device, transport): @todo: determine if an event for the collector AND the device should be sent. """ + plugins = self._get_valid_plugins(device) + + # Create functions to search for what plugins we will and + # won't supply to the device + def false_result(x): + return False + + collectTest = false_result + ignoreTest = false_result + if self.options.collectPlugins: + collectTest = re.compile(self.options.collectPlugins).search + elif self.options.ignorePlugins: + ignoreTest = re.compile(self.options.ignorePlugins).search + + result = [] + for plugin in plugins: + if plugin.transport != transport: + continue + name = plugin.name() + if ignoreTest(name): + self.log.debug( + "Ignoring %s on %s because of --ignore flag", + name, + device.id, + ) + elif collectTest(name): + self.log.debug( + "Using %s on %s because of --collect flag", name, device.id + ) + result.append(plugin) + elif not self.options.collectPlugins: + self.log.debug("Using %s on %s", name, device.id) + result.append(plugin) + return result + + def _get_valid_plugins(self, device): plugins = [] valid_loaders = [] for loader in device.plugins: @@ -252,10 +272,7 @@ def selectPlugins(self, device, transport): plugins.append(plugin) valid_loaders.append(loader) except Plugins.PluginImportError as import_error: - import socket - component, _ = os.path.splitext(os.path.basename(sys.argv[0])) - collector_host = socket.gethostname() # NB: an import errror affects all devices, # so report the issue against the collector # TODO: determine if an event for the collector AND @@ -263,8 +280,8 @@ def selectPlugins(self, device, transport): evt = { "eventClass": "/Status/Update", "component": component, - "agent": collector_host, - "device": collector_host, + "agent": self.options.monitor, + "device": self.options.monitor, "severity": Error, } @@ -289,38 +306,7 @@ def selectPlugins(self, device, transport): if len(device.plugins) != len(valid_loaders): device.plugins = valid_loaders - # Create functions to search for what plugins we will and - # won't supply to the device - def false_result(x): - return False - - collectTest = false_result - ignoreTest = false_result - if self.options.collectPlugins: - collectTest = re.compile(self.options.collectPlugins).search - elif self.options.ignorePlugins: - ignoreTest = re.compile(self.options.ignorePlugins).search - - result = [] - for plugin in plugins: - if plugin.transport != transport: - continue - name = plugin.name() - if ignoreTest(name): - self.log.debug( - "Ignoring %s on %s because of --ignore flag", - name, - device.id, - ) - elif collectTest(name): - self.log.debug( - "Using %s on %s because of --collect flag", name, device.id - ) - result.append(plugin) - elif not self.options.collectPlugins: - self.log.debug("Using %s on %s", name, device.id) - result.append(plugin) - return result + return device.plugins def collectDevice(self, device): """ @@ -332,12 +318,6 @@ def collectDevice(self, device): clientTimeout = getattr(device, "zCollectorClientTimeout", 180) ip = device.manageIp timeout = clientTimeout + time.time() - if USE_WMI: - self.wmiCollect(device, ip, timeout) - else: - self.log.info( - "skipping WMI-based collection, PySamba zenpack not installed" - ) self.log.info( "Collect on device %s for collector loop #%03d", device.id, @@ -348,39 +328,6 @@ def collectDevice(self, device): self.snmpCollect(device, ip, timeout) self.portscanCollect(device, ip, timeout) - def wmiCollect(self, device, ip, timeout): - """ - Start the Windows Management Instrumentation (WMI) collector - - @param device: device to collect against - @type device: string - @param ip: IP address of device to collect against - @type ip: string - @param timeout: timeout before failing the connection - @type timeout: integer - """ - if self.options.nowmi: - return - - client = None - try: - plugins = self.selectPlugins(device, "wmi") - if not plugins: - self.log.info("No WMI plugins found for %s", device.id) - return - if self.checkCollection(device): - self.log.info("WMI collector method for device %s", device.id) - self.log.info( - "plugins: %s", ", ".join(map(lambda p: p.name(), plugins)) - ) - client = WMIClient(device, self, plugins) - if not client or not plugins: - self.log.warn("WMI collector creation failed") - return - except Exception: - self.log.exception("Error opening WMI collector") - self.addClient(client, timeout, "WMI", device.id) - def pythonCollect(self, device, ip, timeout): """ Start local Python collection client. @@ -401,7 +348,7 @@ def pythonCollect(self, device, ip, timeout): if self.checkCollection(device): self.log.info("Python collection device %s", device.id) self.log.info( - "plugins: %s", ", ".join(map(lambda p: p.name(), plugins)) + "plugins: %s", ", ".join(p.name() for p in plugins) ) client = PythonClient(device, self, plugins) if not client or not plugins: @@ -499,7 +446,7 @@ def cmdCollect(self, device, ip, timeout): self.log.warn("Shell command collector creation failed") else: self.log.info( - "plugins: %s", ", ".join(map(lambda p: p.name(), plugins)) + "plugins: %s", ", ".join(p.name() for p in plugins) ) except Exception: self.log.exception("Error opening command collector") @@ -536,7 +483,7 @@ def snmpCollect(self, device, ip, timeout): if self.checkCollection(device): self.log.info("SNMP collection device %s", hostname) self.log.info( - "plugins: %s", ", ".join(map(lambda p: p.name(), plugins)) + "plugins: %s", ", ".join(p.name() for p in plugins) ) client = SnmpClient( device.id, ip, self.options, device, self, plugins @@ -629,7 +576,7 @@ def portscanCollect(self, device, ip, timeout): "Portscan collector method for device %s", hostname ) self.log.info( - "plugins: %s", ", ".join(map(lambda p: p.name(), plugins)) + "plugins: %s", ", ".join(p.name() for p in plugins) ) client = PortscanClient( device.id, ip, self.options, device, self, plugins @@ -728,7 +675,7 @@ def processClient(driver): pluginStats.setdefault( plugin.name(), plugin.weight ) - except Exception as ex: + except Exception: # NB: don't discard the plugin, as it might be a # temporary issue. # Also, report it against the device, rather than at @@ -804,7 +751,7 @@ def processClientFinished(result): self._modeledDevicesMetric.mark() # result is now the result of remote_applyDataMaps # (from processClient) - if result and isinstance(result, (basestring, Failure)): + if result and isinstance(result, six.string_types + (Failure,)): self.log.error( "Client %s finished with message: %s", device.id, result ) @@ -831,7 +778,7 @@ def processClientFinished(result): d.addBoth(processClientFinished) def savePluginData(self, deviceName, pluginName, dataType, data): - filename = "/tmp/%s.%s.%s.pickle.gz" % ( + filename = "/tmp/%s.%s.%s.pickle.gz" % ( # noqa: S108 deviceName, pluginName, dataType, @@ -874,12 +821,12 @@ def heartbeat(self, ignored=None): ARBITRARY_BEAT = 30 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) if self.options.cycle: - evt = dict( - eventClass=Heartbeat, - component="zenmodeler", - device=self.options.monitor, - timeout=self.options.heartbeatTimeout, - ) + evt = { + "eventClass": Heartbeat, + "component": "zenmodeler", + "device": self.options.monitor, + "timeout": self.options.heartbeatTimeout, + } self.sendEvent(evt) self.niceDoggie(self.cycleTime()) @@ -890,18 +837,21 @@ def heartbeat(self, ignored=None): if self.startat: # This stuff relies on ARBITRARY_BEAT being < 60s if self.timeMatches(): - # Run modeling in case we have now=False, startat is not None and local time matches the startat + # Run modeling in case we have now=False, startat is + # not None and local time matches the startat. self.started = True self.log.info("Starting modeling...") reactor.callLater(1, self.main) elif not self.isMainScheduled: - # Or run modeling by cycleTime in case we have now=False, startat is None - # and we haven't set schedule by cycleTime yet + # Or run modeling by cycleTime in case we have now=False, + # startat is None and we haven't set schedule by + # cycleTime yet. self.isMainScheduled = True reactor.callLater(self.cycleTime(), self.main) else: - # Going back to the normal modeling schedule either cron or cycleTime - # after the first immediate modeling during service startup + # Going back to the normal modeling schedule either cron + # or cycleTime after the first immediate modeling during + # service startup. self.immediate = 0 self.log.info( "Starting modeling in %s seconds.", self.startDelay @@ -984,8 +934,9 @@ def checkStop(self, unused=None): if not self.options.cycle: self.stop() self.finished = [] - # frequency of heartbeat rate could be 2 times per minute in case we have - # cron job modeling faster than 1 minute it'll be trigger a second time + # Frequency of heartbeat rate could be 2 times per minute in case + # we have cron job modeling faster than 1 minute it'll be trigger + # a second time. if runTime < 60 and self.startat is not None: yield wait(60) self.started = False @@ -1099,13 +1050,6 @@ def buildOptions(self): default=False, help="Don't fork threads for processing", ) - self.parser.add_option( - "--nowmi", - dest="nowmi", - action="store_true", - default=not USE_WMI, - help="Do not execute WMI plugins", - ) self.parser.add_option( "--parallel", dest="parallel", @@ -1220,8 +1164,6 @@ def buildOptions(self): addWorkerOptions(self.parser) TCbuildOptions(self.parser, self.usage) - if USE_WMI: - addNTLMv2Option(self.parser) def processOptions(self): """ @@ -1252,9 +1194,6 @@ def processOptions(self): self.options.startat, ) - if USE_WMI: - setNTLMv2Auth(self.options) - configFilter = parseWorkerOptions(self.options.__dict__, self.log) if configFilter: self.configFilter = configFilter