diff --git a/Products/ZenCollector/configcache/tasks/deviceconfig.py b/Products/ZenCollector/configcache/tasks/deviceconfig.py index 5a7e8d6883..508774710d 100644 --- a/Products/ZenCollector/configcache/tasks/deviceconfig.py +++ b/Products/ZenCollector/configcache/tasks/deviceconfig.py @@ -132,6 +132,21 @@ def buildDeviceConfig( ) service = svcconfigclass(dmd, monitorname) + method = getattr(service, "remote_getDeviceConfigs", None) + if method is None: + log.warn( + "config service does not have required API " + "device=%s collector=%s service=%s submitted=%f", + key.device, + key.monitor, + key.service, + submitted, + ) + # Services without a remote_getDeviceConfigs method can't create + # device configs, so delete the config that may exist. + _delete_config(key, store, log) + return + result = service.remote_getDeviceConfigs((deviceid,)) config = result[0] if result else None diff --git a/Products/ZenEvents/EventManagerBase.py b/Products/ZenEvents/EventManagerBase.py index 10bf8e8e0e..0ce83972fb 100644 --- a/Products/ZenEvents/EventManagerBase.py +++ b/Products/ZenEvents/EventManagerBase.py @@ -180,7 +180,7 @@ class EventManagerBase(ZenModelRM): "expr": "^(?P[^\[]+)\[(?PADTRAN)\]:(?P[^\|]+\|\d+\|\d+)\|(?P.*)", "keep": True },{ - "description": "", + "description": "fortigate devices", "expr": "^date=.+ (?Pdevname=.+ log_id=(?P\d+) type=(?P\S+).+)", "keep": True },{ diff --git a/Products/ZenEvents/SyslogMsgFilter.py b/Products/ZenEvents/SyslogMsgFilter.py deleted file mode 100644 index 8885a8caac..0000000000 --- a/Products/ZenEvents/SyslogMsgFilter.py +++ /dev/null @@ -1,132 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2023, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - - -__doc__ = """zensyslog - -Filters Syslog Messages. -""" - -import sys -import logging -import os.path -import re - -import zope.interface -import zope.component - -from zope.interface import implements - -from Products.ZenCollector.interfaces import ICollector -from Products.ZenHub.interfaces import ICollectorEventTransformer, \ - IEventService, \ - TRANSFORM_CONTINUE, \ - TRANSFORM_DROP -from Products.ZenUtils.Utils import unused, zenPath - -log = logging.getLogger("zen.zensyslog.filter") - -class SyslogMsgFilter(object): - implements(ICollectorEventTransformer) - """ - Interface used to perform filtering of events at the collector. This could be - used to drop events, transform event content, etc. - - These transformers are run sequentially before a fingerprint is generated for - the event, so they can set fields which are used by an ICollectorEventFingerprintGenerator. - - The priority of the event transformer (the transformers are executed in - ascending order using the weight of each filter). - """ - weight = 1 - def __init__(self): - self._daemon = None - self._eventService = None - self._initialized = False - self._ruleSet = {} - - def initialize(self): - self._daemon = zope.component.getUtility(ICollector) - self._eventService = zope.component.queryUtility(IEventService) - self._initialized = True - - def syslogMsgFilterErrorEvent(self, **kwargs): - """ - Build an Event dict from parameters.n - """ - eventDict = { - 'device': '127.0.0.1', - 'eventClass': '/App/Zenoss', - 'severity': 4, - 'eventClassKey': '', - 'summary': 'Syslog Message Filter processing issue', - 'component': 'zensyslog' - } - if kwargs: - eventDict.update(kwargs) - self._eventService.sendEvent(eventDict) - - def updateRuleSet(self, rules): - processedRuleSet = {} - for evtFieldName, evtFieldRules in rules.iteritems(): - if evtFieldName not in processedRuleSet: - processedRuleSet[evtFieldName] = [] - for i, evtFieldRule in enumerate(evtFieldRules): - try: - compiledRule = re.compile(evtFieldRule, re.DOTALL) - except Exception as ex: - msg = 'Syslog Message Filter configuration for the ' \ - '{!r} event field could not compile rule #{!r}' \ - ' with the expression of {!r}. Error {!r}'.format( - evtFieldName, - i, - evtFieldRule, - ex) - log.warn(msg) - self.syslogMsgFilterErrorEvent( - message=msg, - eventKey="SyslogMessageFilter.{}.{}".format(evtFieldName, i)) - else: - processedRuleSet[evtFieldName].append(compiledRule) - self._ruleSet = processedRuleSet - - def transform(self, event): - """ - Performs any transforms of the specified event at the collector. - - @param event: The event to transform. - @type event: dict - @return: Returns TRANSFORM_CONTINUE if this event should be forwarded on - to the next transformer in the sequence, TRANSFORM_STOP if no - further transformers should be performed on this event, and - TRANSFORM_DROP if the event should be dropped. - @rtype: int - """ - result = TRANSFORM_CONTINUE - - if self._daemon and self._ruleSet: - for evtFieldName, evtFieldRules in self._ruleSet.iteritems(): - if evtFieldName in event: - for i, compiledRule in enumerate(evtFieldRules): - m = compiledRule.search(event[evtFieldName]) - if not m: - continue - else: - log.debug( - 'Syslog Message Filter match! EventFieldName:%r ' - 'EventFieldValue:%r FilterRuleNumber:%s ' - 'FilterRuleExpression:%r', - evtFieldName, - event[evtFieldName], - i, - compiledRule.pattern) - self._daemon.counters["eventFilterDroppedCount"] += 1 - return TRANSFORM_DROP - return result diff --git a/Products/ZenEvents/SyslogProcessing.py b/Products/ZenEvents/SyslogProcessing.py deleted file mode 100644 index 166633a9a9..0000000000 --- a/Products/ZenEvents/SyslogProcessing.py +++ /dev/null @@ -1,292 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2007, 2023 all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - -__doc__ = """SyslogProcessing -Class for turning syslog events into Zenoss Events -""" - -import re -import logging -slog = logging.getLogger("zen.Syslog") -import socket - -from copy import deepcopy -from Products.ZenEvents.syslog_h import * -from Products.ZenUtils.IpUtil import isip - - -class SyslogProcessor(object): - """ - Class to process syslog messages and convert them into events viewable - in the Zenoss event console. - """ - - def __init__(self,sendEvent,minpriority,parsehost,monitor,defaultPriority,syslogParsers,syslogSummaryToMessage): - """ - Initializer - - @param sendEvent: message from a remote host - @type sendEvent: string - @param minpriority: ignore anything under this priority - @type minpriority: integer - @param parsehost: hostname where this parser is running - @type parsehost: string - @param monitor: name of the distributed collector monitor - @type monitor: string - @param defaultPriority: priority to use if it can't be understood from the received packet - @type defaultPriority: integer - @param syslogParsers: configureable syslog parsers - @type defaultPriority: list - """ - self.minpriority = minpriority - self.parsehost = parsehost - self.sendEvent = sendEvent - self.monitor = monitor - self.defaultPriority = defaultPriority - self.compiledParsers = [] - self.updateParsers(syslogParsers) - self.syslogSummaryToMessage = syslogSummaryToMessage - - def updateParsers(self, parsers): - self.compiledParsers = deepcopy(parsers) - for i, parserCfg in enumerate(self.compiledParsers): - if 'expr' not in parserCfg: - msg = 'Parser configuration #{} missing a "expr" attribute'.format(i) - slog.warn(msg) - self.syslogParserErrorEvent(message=msg) - continue - try: - parserCfg['expr'] = re.compile(parserCfg['expr'], re.DOTALL) - except Exception as ex: - msg = 'Parser configuration #{} Could not compile expression "{!r}", {!r}'.format(i, parserCfg['expr'], ex) - slog.warn(msg) - self.syslogParserErrorEvent(message=msg) - pass - - def syslogParserErrorEvent(self, **kwargs): - """ - Build an Event dict from parameters.n - """ - eventDict = { - 'device': '127.0.0.1', - 'eventClass': '/App/Zenoss', - 'severity': 4, - 'eventClassKey': '', - 'summary': 'Syslog Parser processing issue', - 'component': 'zensyslog' - } - if kwargs: - eventDict.update(kwargs) - self.sendEvent(eventDict) - - def process(self, msg, ipaddr, host, rtime): - """ - Process an event from syslog and convert to a Zenoss event - - @param msg: message from a remote host - @type msg: string - @param ipaddr: IP address of the remote host - @type ipaddr: string - @param host: remote host's name - @type host: string - @param rtime: time as reported by the remote host - @type rtime: string - """ - evt = dict(device=host, - ipAddress=ipaddr, - firstTime=rtime, - lastTime=rtime, - eventGroup='syslog') - slog.debug("host=%s, ip=%s", host, ipaddr) - slog.debug(msg) - - evt, msg = self.parsePRI(evt, msg) - if evt['priority'] > self.minpriority: return - - evt, msg = self.parseHEADER(evt, msg) - evt = self.parseTag(evt, msg) - if evt == "ParserDropped": - return evt - elif evt: - # Cisco standard msg includes the severity in the tag - if 'overwriteSeverity' in evt.keys(): - old_severity = evt['severity'] - new_severity = self.defaultSeverityMap(int(evt['overwriteSeverity'])) - evt['severity'] = new_severity - slog.debug('Severity overwritten in message tag. Previous:%s Current:%s', old_severity, new_severity) - #rest of msg now in summary of event - evt = self.buildEventClassKey(evt) - evt['monitor'] = self.monitor - if 'message' not in evt: - evt['message'] = msg - # Convert strings to unicode, previous code converted 'summary' & - # 'message' fields. With parsing group name matching, good idea to - # convert all fields. - evt.update({k: unicode(v) for k,v in evt.iteritems() if isinstance(v, str)}) - self.sendEvent(evt) - return "EventSent" - else: - return None - - - def parsePRI(self, evt, msg): - """ - Parse RFC-3164 PRI part of syslog message to get facility and priority. - - @param evt: dictionary of event properties - @type evt: dictionary - @param msg: message from host - @type msg: string - @return: tuple of dictionary of event properties and the message - @type: (dictionary, string) - """ - pri = self.defaultPriority - fac = None - if msg[:1] == '<': - pos = msg.find('>') - fac, pri = LOG_UNPACK(int(msg[1:pos])) - msg = msg[pos+1:] - elif msg and msg[0] < ' ': - fac, pri = LOG_KERN, ord(msg[0]) - msg = msg[1:] - evt['facility'] = fac - evt['priority'] = pri - evt['severity'] = self.defaultSeverityMap(pri) - slog.debug("fac=%s pri=%s", fac, pri) - slog.debug("facility=%s severity=%s", evt['facility'], evt['severity']) - return evt, msg - - - def defaultSeverityMap(self, pri): - """ - Default mapping from syslog priority to severity. - - @param pri: syslog priority from host - @type pri: integer - @return: numeric severity - @type: integer - """ - sev = 1 - if pri < 3: sev = 5 - elif pri == 3: sev = 4 - elif pri == 4: sev = 3 - elif pri == 5 or pri == 6: sev = 2 - return sev - - - timeParse = \ - re.compile("^(\S{3} [\d ]{2} [\d ]{2}:[\d ]{2}:[\d ]{2}(?:\.\d{1,3})?) (.*)", re.DOTALL).search - notHostSearch = re.compile("[\[:]").search - def parseHEADER(self, evt, msg): - """ - Parse RFC-3164 HEADER part of syslog message. TIMESTAMP format is: - MMM HH:MM:SS and host is next token without the characters '[' or ':'. - - @param evt: dictionary of event properties - @type evt: dictionary - @param msg: message from host - @type msg: string - @return: tuple of dictionary of event properties and the message - @type: (dictionary, string) - """ - slog.debug(msg) - m = re.sub("Kiwi_Syslog_Daemon \d+: \d+: " - "\S{3} [\d ]{2} [\d ]{2}:[\d ]{2}:[^:]+: ", "", msg) - m = self.timeParse(msg) - if m: - slog.debug("parseHEADER timestamp=%s", m.group(1)) - evt['originalTime'] = m.group(1) - msg = m.group(2).strip() - msglist = msg.split() - if self.parsehost and not self.notHostSearch(msglist[0]): - device = msglist[0] - if device.find('@') >= 0: - device = device.split('@', 1)[1] - slog.debug("parseHEADER hostname=%s", evt['device']) - msg = " ".join(msglist[1:]) - evt['device'] = device - if isip(device): - evt['ipAddress'] = device - else: - if 'ipAddress' in evt: - del(evt['ipAddress']) - return evt, msg - - - def parseTag(self, evt, msg): - """ - Parse the RFC-3164 tag of the syslog message using the regex defined - at the top of this module. - - @param evt: dictionary of event properties - @type evt: dictionary - @param msg: message from host - @type msg: string - @return: dictionary of event properties - @type: dictionary - """ - slog.debug(msg) - for i, parserCfg in enumerate(self.compiledParsers): - slog.debug("parserCfg[%s] regex: %s", i, parserCfg['expr'].pattern) - m = parserCfg['expr'].search(msg) - if not m: - continue - elif not parserCfg['keep']: - slog.debug("parserCfg[%s] matched but DROPPED due to parserCfg. msg:%r, pattern:%r, parsedGroups:%r", - i, - msg, - parserCfg['expr'].pattern, - m.groupdict()) - return "ParserDropped" - slog.debug("parserCfg[%s] matched. msg:%r, pattern:%r, parsedGroups:%r", - i, - msg, - parserCfg['expr'].pattern, - m.groupdict()) - evt.update(m.groupdict()) - evt['parserRuleMatched'] = i - break - else: - slog.debug("No matching parser: %r", msg) - evt['summary'] = msg - if self.syslogSummaryToMessage: - # In case the parsed event doesn't have a summary we set an empty string to the message key - evt['message'] = evt.get("summary", "") - evt['unparsedMessage'] = msg - return evt - - - def buildEventClassKey(self, evt): - """ - Build the key used to find an events dictionary record. If eventClass - is defined it is used. For NT events "Source_Evid" is used. For other - syslog events we use the summary of the event to perform a full text - or'ed search. - - @param evt: dictionary of event properties - @type evt: dictionary - @return: dictionary of event properties - @type: dictionary - """ - if 'eventClassKey' in evt or 'eventClass' in evt: - return evt - elif 'ntevid' in evt: - evt['eventClassKey'] = "%s_%s" % (evt['component'],evt['ntevid']) - elif 'component' in evt: - evt['eventClassKey'] = evt['component'] - if 'eventClassKey' in evt: - slog.debug("eventClassKey=%s", evt['eventClassKey']) - try: - evt['eventClassKey'] = evt['eventClassKey'].decode('latin-1') - except Exception: - evt['eventClassKey'] = evt['eventClassKey'].decode('utf-8') - else: - slog.debug("No eventClassKey assigned") - return evt diff --git a/Products/ZenEvents/syslog_h.py b/Products/ZenEvents/syslog_h.py deleted file mode 100644 index 4dd1a7973f..0000000000 --- a/Products/ZenEvents/syslog_h.py +++ /dev/null @@ -1,70 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2007, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - -# constants from syslog.h -LOG_EMERGENCY = 0 -LOG_ALERT = 1 -LOG_CRITICAL = 2 -LOG_ERRROR = 3 -LOG_WARNING = 4 -LOG_NOTICE = 5 -LOG_INFO = 6 -LOG_DEBUG = 7 - -LOG_PRIMASK = 0x07 - -def LOG_PRI(p): return p & LOG_PRIMASK -def LOG_MAKEPRI(fac, pri): return fac << 3 | pri - -LOG_KERN = 0 << 3 -LOG_USER = 1 << 3 -LOG_MAIL = 2 << 3 -LOG_DAEMON = 3 << 3 -LOG_AUTH = 4 << 3 -LOG_SYSLOG = 5 << 3 -LOG_LPR = 6 << 3 -LOG_NEWS = 7 << 3 -LOG_UUCP = 8 << 3 -LOG_CRON = 9 << 3 -LOG_AUTHPRIV = 10 << 3 -LOG_FTP = 11 << 3 -LOG_LOCAL0 = 16 << 3 -LOG_LOCAL1 = 17 << 3 -LOG_LOCAL2 = 18 << 3 -LOG_LOCAL3 = 19 << 3 -LOG_LOCAL4 = 20 << 3 -LOG_LOCAL5 = 21 << 3 -LOG_LOCAL6 = 22 << 3 -LOG_LOCAL7 = 23 << 3 - -LOG_NFACILITIES = 24 -LOG_FACMASK = 0x03F8 -def LOG_FAC(p): return (p & LOG_FACMASK) >> 3 - -def LOG_MASK(pri): return 1 << pri -def LOG_UPTO(pri): return (1 << pri + 1) - 1 -# end syslog.h - -def LOG_UNPACK(p): return (LOG_FAC(p), LOG_PRI(p)) - -fac_values = {} # mapping of facility constants to their values -fac_names = {} # mapping of values to names -pri_values = {} -pri_names = {} -for i, j in globals().items(): - if i[:4] == 'LOG_' and isinstance(j, int): - if j > LOG_PRIMASK or i == 'LOG_KERN': - n, v = fac_names, fac_values - else: - n, v = pri_names, pri_values - i = i[4:].lower() - v[i] = j - n[j] = i -del i, j, n, v diff --git a/Products/ZenEvents/tests/testEventMigrate.py b/Products/ZenEvents/tests/testEventMigrate.py deleted file mode 100644 index c1016e7578..0000000000 --- a/Products/ZenEvents/tests/testEventMigrate.py +++ /dev/null @@ -1,455 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2011, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - -from Products.ZenTestCase.BaseTestCase import BaseTestCase -from zope.interface import implements -from Products.ZenEvents.events2.processing import AddDeviceContextAndTagsPipe -from Products.ZenEvents.events2.proxy import EventProxy -from Products.ZenMessaging.queuemessaging.interfaces import IQueuePublisher -from Products.ZenEvents.zeneventmigrate import ZenEventMigrate -from Products.ZenUtils.guid.interfaces import IGlobalIdentifier -from ConfigParser import ConfigParser -from datetime import datetime -from itertools import repeat -import re - -import logging -log = logging.getLogger('zen.testEventMigrate') - -#lookup -from Zope2.App import zcml -import Products.ZenossStartup -zcml.load_site() - - -class MockChannel(object): - """ - Mocks out an AMQP channel. - """ - def tx_select(self): - pass - - def tx_commit(self): - pass - -class MockPublisher(object): - """ - Mocks out an IQueuePublisher which saves published events for verification. - """ - implements(IQueuePublisher) - - def __init__(self): - self.msgs = [] - self.channel = MockChannel() - - def publish(self, exchange, routing_key, message, createQueues=None, mandatory=False): - self.msgs.append(message) - -class MockCursor(object): - """ - Mocks out a SQL cursor object. - """ - def __init__(self, conn): - self.conn = conn - self.next_result = None - - def execute(self, sql, args=None): - self.next_result = self.conn.resultForQuery(sql, args) - - def fetchall(self): - return self.next_result - - def fetchone(self): - return self.next_result[0] - - def close(self): - pass - -class MockConnection(object): - """ - Mocks out a SQL connection. - """ - def __init__(self, queries): - self.queries = queries - - def cursor(self): - return MockCursor(self) - - def resultForQuery(self, sql, args=None): - for query, result in self.queries.iteritems(): - if re.search(query, sql): - try: - return result.next() - except StopIteration: - return [] - raise Exception('Unsupported query: %s' % sql) - -class testEventMigrate(BaseTestCase): - - def afterSetUp(self): - super(testEventMigrate, self).afterSetUp() - - self.zeneventmigrate = ZenEventMigrate(app=self.app, connect=True) - - # Initialize config - self.zeneventmigrate.config = ConfigParser() - self.zeneventmigrate.config.add_section(self.zeneventmigrate.config_section) - - # Don't save state to disk - self.zeneventmigrate._storeConfig = lambda *x: None - - # Don't show progress messages - self.zeneventmigrate._progress = lambda *x: None - - def testMigrateSameDeviceClass(self): - """ - Tests that an event sent when a device belongs to a new device class is tagged with the original device class - from the migrated event. - """ - devices = self.dmd.Devices - - original = devices.createOrganizer("/Server/Solaris") - original_guid = IGlobalIdentifier(original).getGUID() - - updated = devices.createOrganizer("/Server/SSH/Solaris") - updated_guid = IGlobalIdentifier(updated).getGUID() - - updated.createInstance('test-solaris10.zenoss.loc') - - evt = { - 'dedupid': "test-solaris10.zenoss.loc|SUNWxscreensaver-hacks|/Change/Set||2|calling function " - "'setProductKey' with 'SUNWxscreensaver-hacks' on object SUNWxscreensaver-hacks", - 'evid': "0002aaaf-e10f-4348-a7b8-ae12573e560a", - 'device': "test-solaris10.zenoss.loc", - 'component': "SUNWxscreensaver-hacks", - 'eventClass': "/Change/Set", - 'eventKey': "", - 'summary': "calling function 'setProductKey' with 'SUNWxscreensaver-hacks' on object SUNWxscreensaver-hacks", - 'message': "calling function 'setProductKey' with 'SUNWxscreensaver-hacks' on object SUNWxscreensaver-hacks", - 'severity': 2, - 'eventState': 0, - 'eventClassKey': "", - 'eventGroup': "", - 'stateChange': datetime(2011, 6, 8, 13, 24, 20), - 'firstTime': 1307557460.044, - 'lastTime': 1307557460.044, - 'count': 1, - 'prodState': 1000, - 'suppid': '', - 'manager': '', - 'agent': 'ApplyDataMap', - 'DeviceClass': '/Server/Solaris', - 'Location': '', - 'Systems': '|', - 'DeviceGroups': '|', - 'ipAddress': '10.175.211.23', - 'facility': 'unknown', - 'priority': -1, - 'ntevid': 0, - 'ownerid': '', - 'deletedTime': datetime(2011, 6, 8, 13, 24, 20), - 'clearid': None, - 'DevicePriority': 3, - 'eventClassMapping': '', - 'monitor': '', - } - - events = [evt] - queries = { - r'^SELECT COUNT\(\*\) AS num_rows FROM status': repeat([{ 'num_rows': len(events) }]), - r'^SELECT \* FROM status': [events].__iter__(), - r'^SELECT evid, name, value FROM detail': repeat([]), - r'^SELECT \* FROM log WHERE evid IN': repeat([]), - } - conn = MockConnection(queries) - mock_publisher = MockPublisher() - self.zeneventmigrate._migrate_events(conn, mock_publisher, True) - self.assertEquals(1, len(mock_publisher.msgs)) - event_summary = mock_publisher.msgs[0] - event_occurrence = event_summary.occurrence[0] - for d in event_occurrence.details: - if d.name == EventProxy.DEVICE_CLASS_DETAIL_KEY: - self.assertEquals([original.getOrganizerName()], d.value) - - device_class_tags = set() - for t in event_occurrence.tags: - if t.type == AddDeviceContextAndTagsPipe.DEVICE_DEVICECLASS_TAG_KEY: - device_class_tags.update(t.uuid) - - self.assertTrue(original_guid in device_class_tags, msg="Event wasn't tagged with original device class") - self.assertFalse(updated_guid in device_class_tags, msg="Event was tagged with new device class") - - def testMigrateSameLocation(self): - """ - Tests that an event sent when a device belongs to a new location is tagged with the original location - from the migrated event. - """ - devices = self.dmd.Devices - locations = self.dmd.Locations - - original = locations.createOrganizer("/Austin") - original_guid = IGlobalIdentifier(original).getGUID() - - updated = locations.createOrganizer("/Annapolis") - updated_guid = IGlobalIdentifier(updated).getGUID() - - device_class = devices.createOrganizer("/Server/Windows/WMI/Active Directory/2008") - device = device_class.createInstance('test-win2008-ad.zenoss.loc') - device.setLocation(updated.getOrganizerName()) - - evt = { - 'dedupid': "test-win2008-ad.zenoss.loc|zeneventlog|/Status/Wmi||4|\n Could not read the Windows" - " event log (ExecNotificationQuery on test-win2008-ad.zenoss.loc (DOS code 0x800700a4)). C", - 'evid': "00049aee-b0bc-4621-8393-9b0cf831afc4", - 'device': "test-win2008-ad.zenoss.loc", - 'component': "zeneventlog", - 'eventClass': "/Status/Wmi", - 'eventKey': "", - 'summary': "Could not read the Windows event log (ExecNotificationQuery on test-win2008-ad.zenoss.loc (DOS" - " code 0x800700a4)). C", - 'message': "Could not read the Windows event log (ExecNotificationQuery on test-win2008-ad.zenoss.loc (DOS" - " code 0x800700a4)). Check your username/password settings and verify network connectivity.", - 'severity': 4, - 'eventState': 0, - 'eventClassKey': "", - 'eventGroup': "", - 'stateChange': datetime(2011, 6, 9, 22, 39, 48), - 'firstTime': 1307677188.839, - 'lastTime': 1307677188.839, - 'count': 1, - 'prodState': 1000, - 'suppid': '', - 'manager': 'pwarren-dev.zenoss.loc', - 'agent': 'zeneventlog', - 'DeviceClass': '/Server/Windows/WMI/Active Directory/2008', - 'Location': '/Austin', - 'Systems': '|', - 'DeviceGroups': '|', - 'ipAddress': '10.175.211.197', - 'facility': 'unknown', - 'priority': -1, - 'ntevid': 0, - 'ownerid': '', - 'deletedTime': datetime(2011, 6, 9, 22, 39, 48), - 'clearid': '947d299f-cc25-4250-a8de-b8fd8bc2b06d', - 'DevicePriority': 3, - 'eventClassMapping': '', - 'monitor': 'localhost', - } - - events = [evt] - queries = { - r'^SELECT COUNT\(\*\) AS num_rows FROM status': repeat([{ 'num_rows': len(events) }]), - r'^SELECT \* FROM status': [events].__iter__(), - r'^SELECT evid, name, value FROM detail': repeat([]), - r'^SELECT \* FROM log WHERE evid IN': repeat([]), - } - conn = MockConnection(queries) - mock_publisher = MockPublisher() - self.zeneventmigrate._migrate_events(conn, mock_publisher, True) - self.assertEquals(1, len(mock_publisher.msgs)) - event_summary = mock_publisher.msgs[0] - event_occurrence = event_summary.occurrence[0] - for d in event_occurrence.details: - if d.name == EventProxy.DEVICE_LOCATION_DETAIL_KEY: - self.assertEquals([original.getOrganizerName()], d.value) - - device_location_tags = set() - for t in event_occurrence.tags: - if t.type == AddDeviceContextAndTagsPipe.DEVICE_LOCATION_TAG_KEY: - device_location_tags.update(t.uuid) - - self.assertTrue(original_guid in device_location_tags, msg="Event wasn't tagged with original location") - self.assertFalse(updated_guid in device_location_tags, msg="Event was tagged with new location") - - def testMigrateSameGroups(self): - """ - Tests that an event sent when a device belongs to new device groups is tagged with the original device groups - from the migrated event. - """ - devices = self.dmd.Devices - groups = self.dmd.Groups - - group_first = groups.createOrganizer('/First') - group_second = groups.createOrganizer('/Second') - group_third = groups.createOrganizer('/Third') - group_first_nested = groups.createOrganizer('/First/FirstNested') - - group_fourth = groups.createOrganizer('/Fourth') - group_fifth = groups.createOrganizer('/Fifth') - - device_class = devices.createOrganizer("/Server/Linux") - device = device_class.createInstance('pwarren-dev.zenoss.loc') - device.setGroups([group_fourth.getOrganizerName(), group_fifth.getOrganizerName()]) - - evt = { - 'dedupid': "pwarren-dev.zenoss.loc|snmpd|||2|Received SNMP packet(s) from UDP: [10.175.210.74]:48219", - 'evid': "0015e762-1983-40ad-a966-d2a66ee40fd9", - 'device': "pwarren-dev.zenoss.loc", - 'component': "snmpd", - 'eventClass': "/Unknown", - 'eventKey': "", - 'summary': "Received SNMP packet(s) from UDP: [10.175.210.74]:48219", - 'message': "Received SNMP packet(s) from UDP: [10.175.210.74]:48219", - 'severity': 2, - 'eventState': 0, - 'eventClassKey': "snmpd", - 'eventGroup': "syslog", - 'stateChange': datetime(2011, 6, 13, 3, 10, 13), - 'firstTime': 1307952609.997, - 'lastTime': 1307952609.997, - 'count': 1, - 'prodState': 1000, - 'suppid': '', - 'manager': 'pwarren-dev.zenoss.loc', - 'agent': 'zensyslog', - 'DeviceClass': '/Server/Linux', - 'Location': '/Austin', - 'Systems': '|/Production|/Development', - 'DeviceGroups': '|/First|/Second|/Third|/First/FirstNested', - 'ipAddress': '10.175.210.74', - 'facility': 'nfacilit', - 'priority': 6, - 'ntevid': 0, - 'ownerid': '', - 'deletedTime': datetime(2011, 6, 13, 7, 11, 8), - 'clearid': None, - 'DevicePriority': 3, - 'eventClassMapping': '', - 'monitor': 'localhost', - } - - events = [evt] - queries = { - r'^SELECT COUNT\(\*\) AS num_rows FROM status': repeat([{ 'num_rows': len(events) }]), - r'^SELECT \* FROM status': [events].__iter__(), - r'^SELECT evid, name, value FROM detail': repeat([]), - r'^SELECT \* FROM log WHERE evid IN': repeat([]), - } - conn = MockConnection(queries) - mock_publisher = MockPublisher() - self.zeneventmigrate._migrate_events(conn, mock_publisher, True) - self.assertEquals(1, len(mock_publisher.msgs)) - event_summary = mock_publisher.msgs[0] - event_occurrence = event_summary.occurrence[0] - - expected_group_names = set([g.getOrganizerName() for g in [group_first, group_second, group_third, - group_first_nested]]) - found_group_names = set() - - for d in event_occurrence.details: - if d.name == EventProxy.DEVICE_GROUPS_DETAIL_KEY: - found_group_names.update(d.value) - diff_names = expected_group_names - found_group_names - self.assertEquals(0, len(diff_names)) - - expected_group_tags = set([IGlobalIdentifier(g).getGUID() for g in [group_first, group_second, group_third, - group_first_nested]]) - found_group_tags = set() - for t in event_occurrence.tags: - if t.type == AddDeviceContextAndTagsPipe.DEVICE_GROUPS_TAG_KEY: - found_group_tags.update(t.uuid) - - diff_tags = expected_group_tags - found_group_tags - self.assertEquals(0, len(diff_tags)) - - def testMigrateSameSystems(self): - """ - Tests that an event sent when a device belongs to new systems is tagged with the original systems - from the migrated event. - """ - devices = self.dmd.Devices - groups = self.dmd.Systems - - system_production = groups.createOrganizer('/Production') - system_development = groups.createOrganizer('/Development') - - system_additional = groups.createOrganizer('/Additional') - system_preprod = groups.createOrganizer('/PreProduction') - - device_class = devices.createOrganizer("/Server/Linux") - device = device_class.createInstance('pwarren-dev.zenoss.loc') - device.setSystems([system_additional.getOrganizerName(), system_preprod.getOrganizerName()]) - - evt = { - 'dedupid': "pwarren-dev.zenoss.loc|snmpd|||2|Received SNMP packet(s) from UDP: [10.175.210.74]:48219", - 'evid': "0015e762-1983-40ad-a966-d2a66ee40fd9", - 'device': "pwarren-dev.zenoss.loc", - 'component': "snmpd", - 'eventClass': "/Unknown", - 'eventKey': "", - 'summary': "Received SNMP packet(s) from UDP: [10.175.210.74]:48219", - 'message': "Received SNMP packet(s) from UDP: [10.175.210.74]:48219", - 'severity': 2, - 'eventState': 0, - 'eventClassKey': "snmpd", - 'eventGroup': "syslog", - 'stateChange': datetime(2011, 6, 13, 3, 10, 13), - 'firstTime': 1307952609.997, - 'lastTime': 1307952609.997, - 'count': 1, - 'prodState': 1000, - 'suppid': '', - 'manager': 'pwarren-dev.zenoss.loc', - 'agent': 'zensyslog', - 'DeviceClass': '/Server/Linux', - 'Location': '/Austin', - 'Systems': '|/Production|/Development', - 'DeviceGroups': '|/First|/Second|/Third|/First/FirstNested', - 'ipAddress': '10.175.210.74', - 'facility': 'nfacilit', - 'priority': 6, - 'ntevid': 0, - 'ownerid': '', - 'deletedTime': datetime(2011, 6, 13, 7, 11, 8), - 'clearid': None, - 'DevicePriority': 3, - 'eventClassMapping': '', - 'monitor': 'localhost', - } - - events = [evt] - queries = { - r'^SELECT COUNT\(\*\) AS num_rows FROM status': repeat([{ 'num_rows': len(events) }]), - r'^SELECT \* FROM status': [events].__iter__(), - r'^SELECT evid, name, value FROM detail': repeat([]), - r'^SELECT \* FROM log WHERE evid IN': repeat([]), - } - conn = MockConnection(queries) - mock_publisher = MockPublisher() - self.zeneventmigrate._migrate_events(conn, mock_publisher, True) - self.assertEquals(1, len(mock_publisher.msgs)) - event_summary = mock_publisher.msgs[0] - event_occurrence = event_summary.occurrence[0] - - expected_system_names = set([s.getOrganizerName() for s in [system_development, system_production]]) - found_system_names = set() - - for d in event_occurrence.details: - if d.name == EventProxy.DEVICE_SYSTEMS_DETAIL_KEY: - found_system_names.update(d.value) - diff_names = expected_system_names - found_system_names - self.assertEquals(0, len(diff_names)) - - expected_system_tags = set([IGlobalIdentifier(s).getGUID() for s in [system_development, system_production]]) - found_system_tags = set() - for t in event_occurrence.tags: - if t.type == AddDeviceContextAndTagsPipe.DEVICE_SYSTEMS_TAG_KEY: - found_system_tags.update(t.uuid) - - diff_tags = expected_system_tags - found_system_tags - self.assertEquals(0, len(diff_tags)) - - -def test_suite(): - from unittest import TestSuite, makeSuite - suite = TestSuite() - suite.addTest(makeSuite(testEventMigrate)) - return suite diff --git a/Products/ZenEvents/tests/testSyslogMsgFilter.py b/Products/ZenEvents/tests/testSyslogMsgFilter.py deleted file mode 100644 index 1cfb9a53ba..0000000000 --- a/Products/ZenEvents/tests/testSyslogMsgFilter.py +++ /dev/null @@ -1,86 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2023, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - -from mock import Mock - -from Products.ZenHub.interfaces import ICollectorEventTransformer, \ - TRANSFORM_CONTINUE, \ - TRANSFORM_DROP - -from Products.ZenEvents.EventManagerBase import EventManagerBase -from Products.ZenEvents.SyslogMsgFilter import SyslogMsgFilter -from Products.ZenTestCase.BaseTestCase import BaseTestCase - - -class SyslogMsgFilterTest(BaseTestCase): - - def testSyslogMsgDefaultFilter(self): - # Currently there are not any default filters defined - msgFilter = SyslogMsgFilter() - msgFilter._daemon = Mock() - msgFilter._eventService = Mock() - msgFilter._initialized = True - msgFilter.updateRuleSet(EventManagerBase.syslogMsgEvtFieldFilterRules) - self.assertEquals(msgFilter._eventService.sendEvent.called, False) - - def testSyslogMsgBadCfg(self): - filterCfg = { - "eventClassKey": [ - "(BadBad" - ] - } - msgFilter = SyslogMsgFilter() - msgFilter._daemon = Mock() - msgFilter._eventService = Mock() - msgFilter._initialized = True - msgFilter.updateRuleSet(filterCfg) - self.assertEquals(msgFilter._eventService.sendEvent.called, True) - self.assertEquals(msgFilter._eventService.sendEvent.call_count, 1) - evtFields = msgFilter._eventService.sendEvent.mock_calls[0][1][0] - self.assertEquals( - evtFields['message'], - "Syslog Message Filter configuration for the 'eventClassKey' event field could not compile rule #0 with the expression of '(BadBad'. Error error('unbalanced parenthesis',)" - ) - - def testSyslogMsgFilterMatch(self): - filterCfg = { - "eventClassKey": [ - "MARK" - ] - } - event = { - 'severity': 4, - 'eventClassKey': 'MARK', - 'component': 'zensyslog', - 'summary': 'test message', - 'eventKey': 'SyslogMessageFilter.eventClassKey.0', - 'device': '127.0.0.1', - 'eventClass': '/App/Zenoss', - 'message': 'test test 123' - } - msgFilter = SyslogMsgFilter() - msgFilter._daemon = Mock() - msgFilter._daemon.counters = { - 'eventCount': 0, - 'eventFilterDroppedCount': 0} - msgFilter._eventService = Mock() - msgFilter._initialized = True - msgFilter.updateRuleSet(filterCfg) - self.assertEquals(msgFilter._eventService.sendEvent.called, False) - transformResult = msgFilter.transform(event) - self.assertEquals(transformResult, TRANSFORM_DROP) - event['eventClassKey'] = "NotMark" - transformResult = msgFilter.transform(event) - self.assertEquals(transformResult, TRANSFORM_CONTINUE) - -def test_suite(): - from unittest import TestSuite, makeSuite - suite = TestSuite() - suite.addTest(makeSuite(SyslogMsgFilterTest)) - return suite \ No newline at end of file diff --git a/Products/ZenEvents/tests/testSyslogProcessing.py b/Products/ZenEvents/tests/testSyslogProcessing.py deleted file mode 100644 index f0f520f986..0000000000 --- a/Products/ZenEvents/tests/testSyslogProcessing.py +++ /dev/null @@ -1,158 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2008, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - -from Products.ZenEvents.SyslogProcessing import SyslogProcessor -from Products.ZenEvents.EventManagerBase import EventManagerBase -from Products.ZenTestCase.BaseTestCase import BaseTestCase - -class SyslogProcessingTest(BaseTestCase): - - def sendEvent(self, evt): - "Fakeout sendEvent() method" - self.sent = evt - - def testBuildEventClassKey(self): - "Simple, brain-dead testing of SyslogProcessor" - base = dict(device='localhost', component='component', severity=3) - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - self.assert_(s.buildEventClassKey({}) == {}) - evt = dict(eventClassKey='akey', **base) - self.assert_(s.buildEventClassKey(evt.copy()) == evt) - evt = dict(eventClassKey='akey', ntevid='1234', **base) - self.assert_(s.buildEventClassKey(evt.copy()) == evt) - evt = dict(ntevid='1234', **base) - self.assert_(s.buildEventClassKey(evt)['eventClassKey'] == - 'component_1234') - evt = dict(**base) - self.assert_(s.buildEventClassKey(evt)['eventClassKey'] == 'component') - - def testProcess(self): - long_text_message = "long text message " * 20 - msg = "2016-08-08T11:07:33.660820-04:00 devname=localhost log_id=98765434 type=component {}".format(long_text_message) - ipaddr = "127.0.0.1" - host = "8080" - rtime = "1416111" - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - s.process(msg, ipaddr, host, rtime) - evt = self.sent - self.assertEquals(evt.get('device'), host) - self.assertEquals(evt.get('ipAddress'), ipaddr) - self.assertEquals(evt.get('firstTime'), rtime) - self.assertEquals(evt.get('lastTime'), rtime) - self.assertEquals(evt.get('eventGroup'), 'syslog') - self.assertEquals(evt.get('message'), unicode(msg)) - self.assertEquals(evt.get('summary'), unicode(msg)) - - def testCheckFortigate(self): - """ - Test of Fortigate syslog message parsing - """ - msg = "date=xxxx devname=blue log_id=987654321 type=myComponent blah blah blah" - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag( {}, msg ) - - self.assertEquals( evt.get('eventClassKey'), '987654321' ) - self.assertEquals( evt.get('component'), 'myComponent' ) - self.assertEquals( evt.get('summary'), 'devname=blue log_id=987654321 type=myComponent blah blah blah' ) - - def testCheckCiscoPortStatus(self): - """ - Test of Cisco port status syslog message parsing - """ - msg = "Process 10532, Nbr 192.168.10.13 on GigabitEthernet2/15 from LOADING to FULL, Loading Done" - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag( {}, msg ) - - self.assertEquals( evt.get('device'), '192.168.10.13' ) - self.assertEquals( evt.get('process_id'), '10532' ) - self.assertEquals( evt.get('interface'), 'GigabitEthernet2/15' ) - self.assertEquals( evt.get('start_state'), 'LOADING' ) - self.assertEquals( evt.get('end_state'), 'FULL' ) - self.assertEquals( evt.get('summary'), 'Loading Done') - - def testCiscoVpnConcentrator(self): - """ - Test of Cisco VPN Concentrator syslog message parsing - """ - msg = "54884 05/25/2009 13:41:14.060 SEV=3 HTTP/42 RPT=4623 Error on socket accept." - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag( {}, msg ) - - self.assertEquals( evt.get('eventClassKey'), 'HTTP/42' ) - self.assertEquals( evt.get('summary'), 'Error on socket accept.' ) - - def testCiscoStandardMessageSeverity(self): - """ - Test that the event severity is correctly extracted from the - Cisco standard message body - """ - msg = '2014 Jan 31 19:45:51 R2-N6K1-2010-P1 %ETH_PORT_CHANNEL-5-CREATED: port-channel1 created' - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag( {}, msg ) - self.assertEquals( evt.get('overwriteSeverity'), '5' ) - - def testDellSyslog(self): - """ - Test dell stuf - """ - msg = ("1-Oct-2009 23:00:00.383809:snapshotDelete.cc:290:INFO:8.2.5:Successfully deleted snapshot 'UNVSQLCLUSTERTEMPDB-2009-09-30-23:00:14.11563'.") - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag( {}, msg ) - - self.assertEquals( evt.get('eventClassKey'), '8.2.5' ) - self.assertEquals( evt.get('summary'), - "Successfully deleted snapshot 'UNVSQLCLUSTERTEMPDB-2009-09-30-23:00:14.11563'.") - - def testDellSyslog2(self): - """ - Test dell stuf - """ - msg = ("2626:48:VolExec:27-Aug-2009 13:15:58.072049:VE_VolSetWorker.hh:75:WARNING:43.3.2:Volume volumeName has reached 96 percent of its reported size and is currently using 492690MB.") - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag( {}, msg ) - - self.assertEquals( evt.get('eventClassKey'), '43.3.2' ) - self.assertEquals( evt.get('summary'), - "Volume volumeName has reached 96 percent of its reported size and is currently using 492690MB.") - - def testNetAppSyslogParser(self): - """ - Test NetApp syslog parser. - """ - msg = '[deviceName: 10/100/1000/e1a:warning]: Client 10.0.0.101 (xid 4251521131) is trying to access an unexported mount (fileid 64, snapid 0, generation 6111516 and flags 0x0 on volume 0xc97d89a [No volume name available])' - s = SyslogProcessor( - self.sendEvent, 6, False, 'localhost', 3, - EventManagerBase.syslogParsers, EventManagerBase.syslogSummaryToMessage) - evt = s.parseTag({}, msg) - self.assertEquals(evt.get('component'), '10/100/1000/e1a') - self.assertEquals(evt.get('summary'), 'Client 10.0.0.101 (xid 4251521131) is trying to access an unexported mount (fileid 64, snapid 0, generation 6111516 and flags 0x0 on volume 0xc97d89a [No volume name available])') - - -def test_suite(): - from unittest import TestSuite, makeSuite - suite = TestSuite() - suite.addTest(makeSuite(SyslogProcessingTest)) - return suite diff --git a/Products/ZenEvents/zeneventmigrate.py b/Products/ZenEvents/zeneventmigrate.py deleted file mode 100644 index 095a647c04..0000000000 --- a/Products/ZenEvents/zeneventmigrate.py +++ /dev/null @@ -1,693 +0,0 @@ -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2011, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - -""" -Script used to migrate events from a Zenoss 3.1.x events database into the -new ZEP event schema. All properties of the events are mapped to the new -property values, and zeneventd identification/tagging is performed to ensure -that events will be associated with the correct entities in Zenoss. - -The migration script assumes that the old MySQL database no longer accepts -events, and saves the last event (per table) inside of the -zeneventmigrate.conf file. Note that only the MySQL server is required to be -available, and not all of Zenoss. - -On the 3x source system (assuming separate 3x and 4x systems), ensure that -the 4x system (in this example: 10.87.207.181) can acces the database. Create -a new user to access the events. - -mysql> grant SELECT on *.* to 'event_migrate'@'10.87.207.181' identified by 'password'; -mysql> flush privileges; - -From the remote machine, test the access to the 3x database, which (in this example -resides on 10.87.207.80. - -zends -uevent_migrate -ppassword -h 10.87.207.80 --port 3306 -D events - -From the 4.x system, you can then start the migration: - -zeneventmigrate --evthost=10.87.207.80 --evtport=3306 \ - --evtuser=event_migrate --evtpass=password --dont-fetch-args -""" - -import logging -import os -import sys -from time import mktime -from ConfigParser import ConfigParser, NoOptionError -from copy import deepcopy -from itertools import imap -from uuid import uuid4 -from signal import signal, siginterrupt, SIGTERM, SIGINT -from time import sleep - - -from Products.ZenUtils.mysql import MySQLdb -from MySQLdb import connect, escape_string -from MySQLdb.cursors import DictCursor - -from zenoss.protocols.protobufs.zep_pb2 import (EventSummary, ZepRawEvent, STATUS_NEW, STATUS_ACKNOWLEDGED, - STATUS_SUPPRESSED, STATUS_CLOSED, STATUS_CLEARED, - SYSLOG_PRIORITY_EMERG, SYSLOG_PRIORITY_DEBUG) -from zenoss.protocols.protobufs.model_pb2 import DEVICE, COMPONENT -from Products.ZenEvents.syslog_h import fac_values, LOG_FAC -from Products.ZenUtils.AmqpDataManager import AmqpTransaction -from Products.ZenUtils.ZenScriptBase import ZenScriptBase -from Products.ZenUtils.Utils import zenPath -from Products.ZenUtils.guid.interfaces import IGlobalIdentifier -from zope.component import getUtility -from Products.ZenMessaging.queuemessaging.interfaces import IQueuePublisher -from Products.ZenMessaging.queuemessaging.adapters import EventProtobufSeverityMapper -from Products.ZenEvents.events2.processing import EventProxy -from Products.ZenEvents.events2.processing import (Manager, EventContext, IdentifierPipe, AddDeviceContextAndTagsPipe, - AssignDefaultEventClassAndTagPipe) -from Products.ZenModel.DeviceClass import DeviceClass -from Products.ZenModel.DeviceGroup import DeviceGroup -from Products.ZenModel.Location import Location -from Products.ZenModel.System import System - -log = logging.getLogger('zen.EventMigrate') - -class MappingEventContext(object): - """ - Contains the event summary information to be published to the migrated - events queue. - """ - def __init__(self, event_dict): - self._event_dict = event_dict - self._summary = EventSummary() - self._occurrence = self._summary.occurrence.add() - self._actor = self._occurrence.actor - - @property - def event_dict(self): - return self._event_dict - - @property - def summary(self): - return self._summary - - @property - def occurrence(self): - return self._occurrence - - @property - def actor(self): - return self._actor - - def __str__(self): - return str(self._summary) - -def _user_uuid(dmd, userName): - # We have to call _getOb instead of getUserSettings here because the - # latter will create a new user settings object even if the user is - # not known. - try: - user = dmd.ZenUsers._getOb(userName) - return IGlobalIdentifier(user).getGUID() - except Exception: - if log.isEnabledFor(logging.DEBUG): - log.exception("Failed to look up user UUID for %s", userName) - -def _convert_summary(new_name, conversion_fcn = None): - """ - Returns a function to convert a value from a previous event into - its equivalent value in the EventSummary. - """ - def _convert_summary_internal(value, event_ctx): - if conversion_fcn: - value = conversion_fcn(value) - if value is not None: - setattr(event_ctx.summary, new_name, value) - return _convert_summary_internal - -def _convert_occurrence(new_name, conversion_fcn = None): - """ - Returns a function to convert a value from a previous event into - its equivalent value in the Event occurrence. - """ - def _convert_occurrence_internal(value, event_ctx): - if conversion_fcn: - value = conversion_fcn(value) - if value is not None: - setattr(event_ctx.occurrence, new_name, value) - return _convert_occurrence_internal - -def _add_detail(new_name, conversion_fcn = None): - """ - Returns a function to convert a value from a previous event into - its equivalent EventDetail within the event occurrence. - """ - def _add_detail_internal(value, event_ctx): - if conversion_fcn: - value = conversion_fcn(value) - if value is not None: - detail = event_ctx.occurrence.details.add() - detail.name = new_name - if not hasattr(value, '__iter__'): - value = (str(value),) - else: - value = map(str, value) - detail.value.extend(value) - return _add_detail_internal - -def _add_details(value, event_ctx): - """ - Converts event details from the detail table to EventDetail objects - on the event occurrence. - """ - for detail_name, detail_value in value.iteritems(): - detail = event_ctx.occurrence.details.add() - detail.name = detail_name - detail.value.append(detail_value) - -_AUDIT_LOG_CONVERSIONS = { - 'event state changed to acknowledged': STATUS_ACKNOWLEDGED, - 'deleted by user': STATUS_CLOSED, -} - -def _add_logs(dmd): - """ - Converts event logs from the log table to either AuditLog or - EventNote objects on the event summary depending on whether - the log message matches system generated values. - """ - def _add_logs_internal(value, event_ctx): - for log_row in value: - username = log_row['userName'] - useruuid = _user_uuid(dmd, username) - text = log_row['text'] - ctime = _convert_ts_to_millis(log_row['ctime']) - - audit_state = _AUDIT_LOG_CONVERSIONS.get(text.lower()) - if audit_state: - log = event_ctx.summary.audit_log.add(timestamp=ctime, - new_status=audit_state, - user_name=username) - if useruuid: - log.user_uuid = useruuid - else: - note = event_ctx.summary.notes.add(uuid=str(uuid4()), - user_name=username, - created_time=ctime, - message=text) - if useruuid: - note.user_uuid = useruuid - - return _add_logs_internal - -def _convert_actor(sub_type): - """ - Returns a function to convert a value from a previous event into - its equivalent value in the EventActor within the event occurrence. - """ - def _convert_actor_internal(value, event_ctx): - if value: - actor = event_ctx.actor - if not sub_type: - actor.element_type_id = DEVICE - actor.element_identifier = value - else: - actor.element_sub_type_id = COMPONENT - actor.element_sub_identifier = value - return _convert_actor_internal - -def _convert_severity(value): - return EventProtobufSeverityMapper.SEVERITIES[str(value).upper()] - -def _convert_pipe_delimited(value): - if value: - values = [val for val in value.split('|') if val] - return values if values else None - -_STATE_CONVERSIONS = { - 0: STATUS_NEW, - 1: STATUS_ACKNOWLEDGED, - 2: STATUS_SUPPRESSED, -} - -def _convert_state(status): - """ - Converts an event state from a previous event into the equivalent new - state. Events migrated from history get a status of STATUS_CLOSED or - STATUS_CLEARED depending on the presence of the clearid field. - """ - def _convert_state_internal(value, event_ctx): - if status: - event_ctx.summary.status = _STATE_CONVERSIONS.get(value, STATUS_NEW) - else: - event_ctx.summary.status = STATUS_CLEARED if event_ctx.event_dict.get('clearid','') else STATUS_CLOSED - - return _convert_state_internal - -def _convert_ts_to_millis(value): - return int(mktime(value.timetuple()) * 1000) - -def _convert_double_to_millis(value): - return int(value * 1000) - -def _drop_empty(value): - return value if value else None - -_FACILITY_CONVERSIONS = dict((k,LOG_FAC(v)) for k, v in fac_values.iteritems() if k not in ('facmask','nfacilities')) - -def _convert_facility(value): - """ - Converts a syslog facility from the old string format to the new - numeric format. This was changed because all systems don't use the - same mapping for syslog facilities and using a numeric facility - ensures we don't lose data from the original syslog event. - """ - if value and value in _FACILITY_CONVERSIONS: - return _FACILITY_CONVERSIONS[value] - -def _convert_priority(value): - if value >= SYSLOG_PRIORITY_EMERG and value <= SYSLOG_PRIORITY_DEBUG: - return value - -def _convert_event_class_mapping_uuid(dmd): - """ - Converts an event class mapping to the UUID of the event class - mapping. - """ - failed_mappings = set() - - def _convert_event_class_mapping_uuid_internal(value): - if value: - try: - value = value.encode('ascii') - components = value.split('/') - components.insert(-1, 'instances') - eventClass = dmd.unrestrictedTraverse('/zport/dmd/Events' + '/'.join(components)) - return IGlobalIdentifier(eventClass).getGUID() - except Exception: - if value not in failed_mappings: - failed_mappings.add(value) - if log.isEnabledFor(logging.DEBUG): - log.exception("Failed to resolve event class mapping: %s", value) - else: - log.warning('Failed to resolve event class mapping: %s', value) - return _convert_event_class_mapping_uuid_internal - -def _convert_ownerid(dmd): - def _convert_ownerid_internal(value, event_ctx): - if value: - event_ctx.summary.current_user_name = value - useruuid = _user_uuid(dmd, value) - if useruuid: - event_ctx.summary.current_user_uuid = useruuid - - return _convert_ownerid_internal - -class EventConverter(object): - """ - Utility class used to convert an old-style event from the status or - history table into the equivalent EventSummary protobuf. Other mappers - exist for converting old style events to event occurrences, but this - needs to preserve information that is stored in the event summary (i.e. - count, event notes, audit logs). - """ - - _FIELD_MAPPERS = { - 'evid': _convert_summary('uuid'), - 'dedupid': _convert_occurrence('fingerprint'), - 'device': _convert_actor(False), - 'component': _convert_actor(True), - 'eventClass': _convert_occurrence('event_class'), - 'eventKey': _convert_occurrence('event_key', _drop_empty), - 'summary': _convert_occurrence('summary'), - 'message': _convert_occurrence('message'), - 'severity': _convert_occurrence('severity', _convert_severity), - 'eventClassKey': _convert_occurrence('event_class_key', _drop_empty), - 'eventGroup': _convert_occurrence('event_group', _drop_empty), - 'stateChange': _convert_summary('status_change_time', _convert_ts_to_millis), - 'firstTime': _convert_summary('first_seen_time', _convert_double_to_millis), - 'lastTime': _convert_summary('last_seen_time', _convert_double_to_millis), - 'count': _convert_summary('count'), - 'prodState': _add_detail(EventProxy.PRODUCTION_STATE_DETAIL_KEY), - # This doesn't have an equivalent value in new schema - just add as detail - 'suppid': _add_detail('suppid', _drop_empty), - # Deprecated - 'manager': _add_detail('manager', _drop_empty), - 'agent': _convert_occurrence('agent', _drop_empty), - 'DeviceClass': _add_detail(EventProxy.DEVICE_CLASS_DETAIL_KEY, _drop_empty), - 'Location': _add_detail(EventProxy.DEVICE_LOCATION_DETAIL_KEY, _drop_empty), - 'Systems': _add_detail(EventProxy.DEVICE_SYSTEMS_DETAIL_KEY, _convert_pipe_delimited), - 'DeviceGroups': _add_detail(EventProxy.DEVICE_GROUPS_DETAIL_KEY, _convert_pipe_delimited), - 'ipAddress': _add_detail(EventProxy.DEVICE_IP_ADDRESS_DETAIL_KEY, _drop_empty), - 'facility': _convert_occurrence('syslog_facility', _convert_facility), - 'priority': _convert_occurrence('syslog_priority', _convert_priority), - 'ntevid': _convert_occurrence('nt_event_code', _drop_empty), - 'clearid': _convert_summary('cleared_by_event_uuid', _drop_empty), - 'DevicePriority': _add_detail(EventProxy.DEVICE_PRIORITY_DETAIL_KEY), - 'monitor': _convert_occurrence('monitor', _drop_empty), - 'deletedTime': _convert_summary('status_change_time', _convert_ts_to_millis), - 'details': _add_details, - } - - def __init__(self, dmd, status): - self.dmd = dmd - self.status = status - # Most of these can be shared above - a few require DMD access - self.field_mappers = dict(EventConverter._FIELD_MAPPERS) - self.field_mappers['ownerid'] = _convert_ownerid(dmd) - self.field_mappers['eventState'] = _convert_state(status) - self.field_mappers['eventClassMapping'] = _convert_occurrence('event_class_mapping_uuid', - _convert_event_class_mapping_uuid(dmd)) - self.field_mappers['logs'] = _add_logs(dmd) - - def convert(self, event_dict): - event_ctx = MappingEventContext(event_dict) - for name, value in event_dict.iteritems(): - if name in self.field_mappers: - self.field_mappers[name](value, event_ctx) - else: - _add_detail(name)(value, event_ctx) - return event_ctx - -_IN_CLAUSE = lambda evids: ','.join("'%s'" % evid for evid in evids) - -class ShutdownException(Exception): - pass - -class ZenEventMigrate(ZenScriptBase): - def __init__(self, noopts=0, app=None, connect=True): - super(ZenEventMigrate, self).__init__(noopts=noopts, app=app, connect=connect) - self.config_filename = zenPath('etc/zeneventmigrate.conf') - self.config_section = 'zeneventmigrate' - self._shutdown = False - - def buildOptions(self): - super(ZenEventMigrate, self).buildOptions() - self.parser.add_option('--dont-fetch-args', dest='fetchArgs', default=True, action='store_false', - help='By default MySQL connection information' - ' is retrieved from Zenoss if not' - ' specified and if Zenoss is available.' - ' This disables fetching of these values' - ' from Zenoss.') - self.parser.add_option('--evthost', dest='evthost', default='127.0.0.1', - help='Events database hostname (Default: %default)') - self.parser.add_option('--evtport', dest='evtport', action='store', type='int', default=3306, - help='Port used to connect to the events database (Default: %default)') - self.parser.add_option('--evtuser', dest='evtuser', default=None, - help='Username used to connect to the events database') - self.parser.add_option('--evtpass', dest='evtpass', default=None, - help='Password used to connect to the events database') - self.parser.add_option('--evtdb', dest='evtdb', default='events', - help='Name of events database (Default: %default)') - self.parser.add_option('--batchsize', dest='batchsize', action='store', type='int', default=100, - help='Number of events to process in one batch (Default: %default)') - self.parser.add_option('--sleep', dest='sleep', action='store', type='int', default=0, - help='Number of seconds to wait after migrating a batch of events (Default: %default)') - self.parser.add_option('--restart', dest='restart', action='store_true', default=False, - help='Use this flag to start a new migration process (disables resuming a previous ' - 'migration).') - - def _output(self, message): - if sys.stdout.isatty(): - print message - else: - log.info(message) - - def _progress(self, message): - if sys.stdout.isatty(): - sys.stdout.write("\r" + message) - sys.stdout.flush() - else: - log.info(message) - - def _loadConfig(self): - self.config = ConfigParser() - self.config.read(self.config_filename) - if not self.config.has_section(self.config_section): - self.config.add_section(self.config_section) - - def _storeConfig(self): - with open(self.config_filename, 'wb') as configfile: - self.config.write(configfile) - - def _getConfig(self, option, default=None): - try: - return self.config.get(self.config_section, option) - except NoOptionError: - return default - - def _setConfig(self, option, value): - self.config.set(self.config_section, option, value) - - def _execQuery(self, conn, sql, args=None): - cursor = None - try: - cursor = conn.cursor() - cursor.execute(sql, args) - rows = cursor.fetchall() - return rows - finally: - if cursor: - cursor.close() - - def _countQuery(self, conn, sql, args=None): - cursor = None - try: - cursor = conn.cursor() - cursor.execute(sql, args) - rows = cursor.fetchall() - cursor.execute("SELECT FOUND_ROWS() AS num_rows") - count = cursor.fetchone()['num_rows'] - return rows, count - finally: - if cursor: - cursor.close() - - def _add_details(self, conn, evids, events_by_evid): - """ - Queries the database for event details for all of the events with the specified - event ids. Each returned detail is added to the event dictionary for the event - in events_by_evid. - """ - query = "SELECT evid, name, value FROM detail WHERE evid IN (%s)" % _IN_CLAUSE(evids) - rows = self._execQuery(conn, query) - for row in rows: - evid = row['evid'] - event = events_by_evid[evid] - if not 'details' in event: - event['details'] = {} - event['details'][row['name']] = row['value'] - - def _add_logs(self, conn, evids, events_by_evid): - """ - Queries the database for event logs for all of the events with the specified - event ids. Each returned log is added to the event dictionary for the event - in events_by_evid. - """ - query = "SELECT * FROM log WHERE evid IN (%s)" % _IN_CLAUSE(evids) - rows = self._execQuery(conn, query) - for row in rows: - evid = row.pop('evid') - event = events_by_evid[evid] - if not 'logs' in event: - event['logs'] = [] - event['logs'].append(row) - - def _page_rows(self, conn, status=True): - """ - Pages through rows in the database in either the status or history - table. After returning a batch of rows, the location of the last - processed event is persisted to disk to ensure we resume from the - right location in case the process is aborted for any reason. - """ - table = 'status' if status else 'history' - - offset = 0 - last_evid = self._getConfig('%s_last_evid' % table) - where = "WHERE evid > '%s'" % escape_string(last_evid) if last_evid else '' - - if last_evid: - num_rows_query = "SELECT SQL_CALC_FOUND_ROWS evid FROM %s %s LIMIT 0" % (table, where) - num_rows = self._countQuery(conn, num_rows_query)[1] - else: - num_rows_query = "SELECT COUNT(*) AS num_rows FROM %s" % table - num_rows = self._execQuery(conn, num_rows_query)[0]['num_rows'] - - if not num_rows: - self._output("No events to migrate from %s" % table) - return - - query = "SELECT * FROM %s %s ORDER BY evid LIMIT %%s OFFSET %%s" % (table, where) - rows = self._execQuery(conn, query, (self.options.batchsize, offset)) - while not self._shutdown and rows: - self._progress("Processing events in %s: [%d/%d]" % (table, offset, num_rows)) - evids = [] - events_by_evid = {} - for row in rows: - evid = row['evid'] - events_by_evid[evid] = row - evids.append(evid) - self._add_details(conn, evids, events_by_evid) - self._add_logs(conn, evids, events_by_evid) - yield rows - self._setConfig('%s_last_evid' % table, rows[-1]['evid']) - self._storeConfig() - if self.options.sleep: - log.debug("Pausing event migration for %s seconds", self.options.sleep) - sleep(self.options.sleep) - offset += self.options.batchsize - rows = self._execQuery(conn, query, (self.options.batchsize, offset)) - - if not self._shutdown: - self._progress("Processing events in %s: [%d/%d]\n" % (table, num_rows, num_rows)) - - - def _event_to_zep_raw_event(self, event): - """ - Converts an event occurrence into a ZepRawEvent (required for running through - zeneventd pipes). - """ - zepRawEvent = ZepRawEvent() - zepRawEvent.event.CopyFrom(event) - return zepRawEvent - - def _merge_tags(self, zep_raw_event, event): - """ - Merges results from the identification and tagging pipes into the event - occurrence to be published. This will take the element_uuid, element_sub_uuid, titles - and tags from the ZEP raw event and copy them to the appropriate place on - the event occurrence. - """ - raw_actor = zep_raw_event.event.actor - event_actor = event.actor - for field in ('element_uuid', 'element_sub_uuid', 'element_title', 'element_sub_title'): - if raw_actor.HasField(field): - setattr(event_actor, field, getattr(raw_actor, field)) - event.tags.extend(imap(deepcopy, zep_raw_event.event.tags)) - - def _migrate_events(self, conn, publisher, status): - converter = EventConverter(self.dmd, status) - manager = Manager(self.dmd) - pipes = (IdentifierPipe(manager), AddDeviceContextAndTagsPipe(manager), - AssignDefaultEventClassAndTagPipe(manager)) - routing_key = 'zenoss.events.summary' if status else 'zenoss.events.archive' - - taggers = { - EventProxy.DEVICE_CLASS_DETAIL_KEY: (self.dmd.Devices, DeviceClass), - EventProxy.DEVICE_GROUPS_DETAIL_KEY: (self.dmd.Groups, DeviceGroup), - EventProxy.DEVICE_LOCATION_DETAIL_KEY: (self.dmd.Locations, Location), - EventProxy.DEVICE_SYSTEMS_DETAIL_KEY: (self.dmd.Systems, System), - } - - try: - for event_rows in self._page_rows(conn, status): - with AmqpTransaction(publisher.channel): - for mapping_event_context in imap(converter.convert, event_rows): - if self._shutdown: - raise ShutdownException() - occurrence = mapping_event_context.occurrence - zep_raw_event = self._event_to_zep_raw_event(occurrence) - event_ctx = EventContext(log, zep_raw_event) - for pipe in pipes: - pipe(event_ctx) - - # Clear tags for device class, location, systems, groups from current device - event_ctx.eventProxy.tags.clearType(AddDeviceContextAndTagsPipe.DEVICE_TAGGERS.keys()) - - # Resolve tags from original fields in the event - for detail in occurrence.details: - if detail.name in taggers: - organizer_root, organizer_cls = taggers[detail.name] - tags = set() - for val in detail.value: - try: - obj = organizer_root.unrestrictedTraverse(str(val[1:])) - if isinstance(obj, organizer_cls): - tags.update(manager.getUuidsOfPath(obj)) - except Exception: - if log.isEnabledFor(logging.DEBUG): - log.debug("Unable to resolve UUID for %s", val) - if tags: - event_tag = occurrence.tags.add() - event_tag.type = detail.name - event_tag.uuid.extend(tags) - - self._merge_tags(zep_raw_event, occurrence) - if log.isEnabledFor(logging.DEBUG): - log.debug("Migrated event: %s", mapping_event_context.summary) - - publisher.publish("$MigratedEvents", routing_key, mapping_event_context.summary, - createQueues=("$ZepMigratedEventSummary","$ZepMigratedEventArchive")) - except ShutdownException: - pass - - def _sigterm(self, signum=None, frame=None): - log.debug('SIGTERM signal caught') - self._shutdown = True - self._output('\nShutting down...') - - def run(self): - signal(SIGTERM, self._sigterm) - signal(SIGINT, self._sigterm) - # Try to avoid stacktraces from interrupted signal calls - siginterrupt(SIGTERM, False) - siginterrupt(SIGINT, False) - - if self.options.restart: - if os.path.exists(self.config_filename): - os.remove(self.config_filename) - - self._loadConfig() - if self.options.batchsize <= 0: - self.parser.error('Invalid argument for --batchsize parameter - must be positive') - if self.options.sleep < 0: - self.parser.error('Invalid argument for --sleep parameter') - - if not self.options.fetchArgs: - if not self.options.evtuser or self.options.evtpass is None: - self.parser.error('Required arguments --evtuser and --evtpass must be provided when using ' - '--dont-fetch-args') - else: - zem = self.dmd.ZenEventManager - self.options.evthost = zem.host - self.options.evtport = zem.port - self.options.evtuser = zem.username - self.options.evtpass = zem.password - self.options.evtdb = zem.database - conn = None - publisher = None - try: - conn = connect(host=self.options.evthost, - user=self.options.evtuser, - passwd=self.options.evtpass, - db=self.options.evtdb, - port=self.options.evtport, - cursorclass=DictCursor, - use_unicode=True) - conn.autocommit(1) - - publisher = getUtility(IQueuePublisher) - - # Migrate status - self._migrate_events(conn, publisher, True) - - # Migrate history - self._migrate_events(conn, publisher, False) - - except Exception as e: - if log.isEnabledFor(logging.DEBUG): - log.exception('Error migrating events') - print >>sys.stderr, "Failed to migrate events: %s" % e - finally: - if publisher: - publisher.close() - if conn: - conn.close() - - -if __name__ == '__main__': - migrate = ZenEventMigrate() - migrate.run() diff --git a/Products/ZenEvents/zensyslog.py b/Products/ZenEvents/zensyslog.py deleted file mode 100644 index de1e6b1359..0000000000 --- a/Products/ZenEvents/zensyslog.py +++ /dev/null @@ -1,493 +0,0 @@ -#! /usr/bin/env python -############################################################################## -# -# Copyright (C) Zenoss, Inc. 2008, 2011, 2023, all rights reserved. -# -# This content is made available according to terms specified in -# License.zenoss under the directory where your Zenoss product is installed. -# -############################################################################## - - -__doc__ = """zensyslog - -Turn syslog messages into events. - -""" - -import time -import socket -import os -import logging - -from twisted.internet.protocol import DatagramProtocol -from twisted.internet import reactor, defer, udp -from twisted.python import failure - -import zope.interface -import zope.component - - -from Products.ZenCollector.daemon import CollectorDaemon -from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\ - IEventService, \ - IScheduledTask, IStatisticsService -from Products.ZenCollector.tasks import SimpleTaskFactory,\ - SimpleTaskSplitter,\ - BaseTask, TaskStates -from Products.ZenUtils.observable import ObservableMixin - -from Products.ZenEvents.SyslogProcessing import SyslogProcessor - -from Products.ZenUtils.Utils import zenPath -from Products.ZenUtils.IpUtil import asyncNameLookup - -from Products.ZenEvents.EventServer import Stats -from Products.ZenEvents.SyslogMsgFilter import SyslogMsgFilter -from Products.ZenEvents.ZenEventClasses import Clear, Info, Critical -from Products.ZenHub.interfaces import ICollectorEventTransformer -from Products.ZenUtils.Utils import unused -from Products.ZenHub.PBDaemon import HubDown -from Products.ZenCollector.services.config import DeviceProxy -unused(DeviceProxy) - -COLLECTOR_NAME = 'zensyslog' -DYNAMIC_CONFIGS = ("defaultPriority", "syslogParsers", "syslogSummaryToMessage", "syslogMsgEvtFieldFilterRules") -log = logging.getLogger("zen.%s" % COLLECTOR_NAME) - - -class SyslogPreferences(object): - zope.interface.implements(ICollectorPreferences) - - def __init__(self): - """ - Constructs a new PingCollectionPreferences instance and - provides default values for needed attributes. - """ - self.collectorName = COLLECTOR_NAME - self.configCycleInterval = 20 # minutes - self.cycleInterval = 5 * 60 # seconds - - # The configurationService attribute is the fully qualified class-name - # of our configuration service that runs within ZenHub - self.configurationService = 'Products.ZenHub.services.SyslogConfig' - - # Will be filled in based on buildOptions - self.options = None - - self.configCycleInterval = 20*60 - - def postStartupTasks(self): - dynamicConfTask = DynamicConfigLoader(taskName=COLLECTOR_NAME + "DynamicConf", configId=COLLECTOR_NAME) - task = SyslogTask(COLLECTOR_NAME, configId=COLLECTOR_NAME) - for confName in DYNAMIC_CONFIGS: - dynamicConfTask.attachAttributeObserver(confName, task.syslogProcessorConfChangeListener) - yield dynamicConfTask - yield task - - def buildOptions(self, parser): - """ - Command-line options to be supported - """ - SYSLOG_PORT = 514 - try: - SYSLOG_PORT = socket.getservbyname('syslog', 'udp') - except socket.error: - pass - - parser.add_option('--parsehost', dest='parsehost', - action='store_true', default=False, - help='Try to parse the hostname part of a syslog HEADER' - ) - parser.add_option('--stats', dest='stats', - action='store_true', default=False, - help='Print statistics to log every 2 secs') - parser.add_option('--logorig', dest='logorig', - action='store_true', default=False, - help='Log the original message') - parser.add_option('--logformat', dest='logformat', - default='human', - help='Human-readable (/var/log/messages) or raw (wire)' - ) - parser.add_option('--minpriority', dest='minpriority', - default=6, type='int', - help='Minimum priority message that zensyslog will accept' - ) - parser.add_option('--syslogport', dest='syslogport', - default=SYSLOG_PORT, type='int', - help='Port number to use for syslog events' - ) - parser.add_option('--listenip', dest='listenip', - default='0.0.0.0', - help='IP address to listen on. Default is %default' - ) - parser.add_option('--useFileDescriptor', - dest='useFileDescriptor', type='int', - help='Read from an existing connection rather opening a new port.' - , default=None) - parser.add_option('--noreverseLookup', dest='noreverseLookup', - action='store_true', default=False, - help="Don't convert the remote device's IP address to a hostname." - ) - - def postStartup(self): - daemon = zope.component.getUtility(ICollector) - daemon.defaultPriority = 1 - daemon.syslogParsers = [] - daemon.syslogSummaryToMessage = None - - # add our collector's custom statistics - statService = zope.component.queryUtility(IStatisticsService) - statService.addStatistic("events", "COUNTER") - - -@zope.interface.implementer(IScheduledTask) -class DynamicConfigLoader(BaseTask): - """Handles retrieving additional dynamic configs for daemon from ZODB""" - - def __init__(self, taskName, configId, scheduleIntervalSeconds=3*60, taskConfig=None): - BaseTask.__init__(self, taskName, configId, scheduleIntervalSeconds, taskConfig) - self.log = log - # Needed for interface - self.name = taskName - self.configId = configId - self.state = TaskStates.STATE_IDLE - self.interval = scheduleIntervalSeconds - self._daemon = zope.component.getUtility(ICollector) - self._preferences = self._daemon - for confName in DYNAMIC_CONFIGS: - setattr(self, confName, None) - setattr(self, confName + "CheckSum", None) - - @defer.inlineCallbacks - def doTask(self): - """ - Contact zenhub and gather configuration data. - e.g. remote_getSyslogMsgEvtFieldFilterRules - """ - log.debug("%s gathering dynamic config changes", self.name) - try: - remoteProxy = self._daemon.getRemoteConfigServiceProxy() - - for confName in DYNAMIC_CONFIGS: - remoteMethod = "get" + confName[0].upper() + confName[1:] - checkSum, retConf = yield remoteProxy.callRemote(remoteMethod, getattr(self, confName + "CheckSum")) - if checkSum and retConf: - setattr(self, confName + "CheckSum", checkSum) - setattr(self, confName, retConf) - log.debug("%s new %s changes applied to %s", retConf, confName, self.name) - except Exception as ex: - log.exception("task '%s' failed", self.name) - - if isinstance(ex, HubDown): - # Allow the loader to be reaped and re-added - self.state = TaskStates.STATE_COMPLETED - - def cleanup(self): - pass # Required by interface - - -class SyslogTask(BaseTask, DatagramProtocol): - """ - Listen for syslog messages and turn them into events - Connects to the TrapService service in zenhub. - """ - zope.interface.implements(IScheduledTask) - - SYSLOG_DATE_FORMAT = '%b %d %H:%M:%S' - SAMPLE_DATE = 'Apr 10 15:19:22' - - def __init__(self, taskName, configId, - scheduleIntervalSeconds=3600, taskConfig=None): - BaseTask.__init__(self, taskName, configId, - scheduleIntervalSeconds, taskConfig) - self.log = log - - # Needed for interface - self.name = taskName - self.configId = configId - self.state = TaskStates.STATE_IDLE - self.interval = scheduleIntervalSeconds - self._preferences = taskConfig - self._daemon = zope.component.getUtility(ICollector) - self._eventService = zope.component.queryUtility(IEventService) - self._statService = zope.component.queryUtility(IStatisticsService) - self._preferences = self._daemon - - self.options = self._daemon.options - - self.stats = Stats() - - self._daemon.processor = SyslogProcessor(self._eventService.sendEvent, - self._daemon.options.minpriority, self._daemon.options.parsehost, - self._daemon.options.monitor, self._preferences.defaultPriority, - self._preferences.syslogParsers, self._preferences.syslogSummaryToMessage) - - if not self.options.useFileDescriptor\ - and self.options.syslogport < 1024: - self._daemon.openPrivilegedPort('--listen', '--proto=udp', - '--port=%s:%d' - % (self.options.listenip, - self.options.syslogport)) - self._daemon.changeUser() - self.minpriority = self.options.minpriority - - if self.options.logorig: - self.olog = logging.getLogger('origsyslog') - self.olog.setLevel(20) - self.olog.propagate = False - lname = zenPath('log/origsyslog.log') - hdlr = logging.FileHandler(lname) - hdlr.setFormatter(logging.Formatter('%(message)s')) - self.olog.addHandler(hdlr) - - if self.options.useFileDescriptor is not None: - self.useUdpFileDescriptor(int(self.options.useFileDescriptor)) - else: - reactor.listenUDP(self.options.syslogport, self, - interface=self.options.listenip) - - # yield self.model().callRemote('getDefaultPriority') - - def doTask(self): - """ - This is a wait-around task since we really are called - asynchronously. - """ - return defer.succeed("Waiting for syslog messages...") - - def useUdpFileDescriptor(self, fd): - s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM) - os.close(fd) - port = s.getsockname()[1] - transport = udp.Port(port, self) - s.setblocking(0) - transport.socket = s - transport.fileno = s.fileno - transport.connected = 1 - transport._realPortNumber = port - self.transport = transport - # hack around startListening not being called - self.numPorts = 1 - transport.startReading() - - def expand(self, msg, client_address): - """ - Expands a syslog message into a string format suitable for writing - to the filesystem such that it appears the same as it would - had the message been logged by the syslog daemon. - - @param msg: syslog message - @type msg: string - @param client_address: IP info of the remote device (ipaddr, port) - @type client_address: tuple of (string, number) - @return: message - @rtype: string - """ - # pri := facility * severity - stop = msg.find('>') - - # check for a datestamp. default to right now if date not present - start = stop + 1 - stop = start + len(SyslogTask.SAMPLE_DATE) - dateField = msg[start:stop] - try: - date = time.strptime(dateField, - SyslogTask.SYSLOG_DATE_FORMAT) - year = time.localtime()[0] - date = (year, ) + date[1:] - start = stop + 1 - except ValueError: - - # date not present, so use today's date - date = time.localtime() - - # check for a hostname. default to localhost if not present - stop = msg.find(' ', start) - if msg[stop - 1] == ':': - hostname = client_address[0] - else: - hostname = msg[start:stop] - start = stop + 1 - - # the message content - body = msg[start:] - - # assemble the message - prettyTime = time.strftime(SyslogTask.SYSLOG_DATE_FORMAT, date) - message = '%s %s %s' % (prettyTime, hostname, body) - return message - - def syslogProcessorConfChangeListener(self, observable, attrName, oldValue, newValue): - self.log.debug("Task %s changed %s. Updating it for task %s", observable.name, attrName, self.name) - if attrName == "syslogParsers": - self._daemon.processor.updateParsers(newValue) - elif attrName == "syslogMsgEvtFieldFilterRules": - self._daemon._syslogMsgFilter.updateRuleSet(newValue) - else: - setattr(self._daemon.processor, attrName, newValue) - - def datagramReceived(self, msg, client_address): - """ - Consume the network packet - - @param msg: syslog message - @type msg: string - @param client_address: IP info of the remote device (ipaddr, port) - @type client_address: tuple of (string, number) - """ - if msg == "": - self.log.debug("Received empty datagram. Discarding.") - return - (ipaddr, port) = client_address - if self.options.logorig: - if self.options.logformat == 'human': - message = self.expand(msg, client_address) - else: - message = msg - self.olog.info(message) - - if self.options.noreverseLookup: - d = defer.succeed(ipaddr) - else: - d = asyncNameLookup(ipaddr) - d.addBoth(self.gotHostname, (msg, ipaddr, time.time())) - - def gotHostname(self, response, data): - """ - Send the resolved address, if possible, and the event via the thread - - @param response: Twisted response - @type response: Twisted response - @param data: (msg, ipaddr, rtime) - @type data: tuple of (string, string, datetime object) - """ - (msg, ipaddr, rtime) = data - if isinstance(response, failure.Failure): - host = ipaddr - else: - host = response - - if self._daemon.processor: - processResult = self._daemon.processor.process(msg, ipaddr, host, rtime) - if processResult == "EventSent": - totalTime, totalEvents, maxTime = self.stats.report() - stat = self._statService.getStatistic("events") - stat.value = totalEvents - elif processResult == "ParserDropped": - self._daemon.counters["eventParserDroppedCount"] += 1 - - def displayStatistics(self): - totalTime, totalEvents, maxTime = self.stats.report() - display = "%d events processed in %.2f seconds" % ( - totalEvents, - totalTime) - if totalEvents > 0: - display += """ -%.5f average seconds per event -Maximum processing time for one event was %.5f""" % ( - (totalTime / totalEvents), maxTime) - return display - - def cleanup(self): - status = self.displayStatistics() - self.log.info(status) - - -class SyslogConfigTask(ObservableMixin): - """ - Receive a configuration object containing the default priority - """ - zope.interface.implements(IScheduledTask) - - def __init__(self, taskName, configId, - scheduleIntervalSeconds=3600, taskConfig=None): - super(SyslogConfigTask, self).__init__() - - # Needed for ZCA interface contract - self.name = taskName - self.configId = configId - self.state = TaskStates.STATE_IDLE - self.interval = scheduleIntervalSeconds - self._preferences = taskConfig - self._daemon = zope.component.getUtility(ICollector) - - def doTask(self): - return defer.succeed("Already updated default syslog priority...") - - def cleanup(self): - pass - - -class SyslogDaemon(CollectorDaemon): - - _frameworkFactoryName = "nosip" - - def __init__(self, *args, **kwargs): - self._syslogMsgFilter = SyslogMsgFilter() - zope.component.provideUtility(self._syslogMsgFilter, ICollectorEventTransformer) - kwargs["initializationCallback"] = self._initializeSyslogMsgFilter - super(SyslogDaemon, self).__init__(*args, **kwargs) - - def _initializeSyslogMsgFilter(self): - try: - self._syslogMsgFilter.initialize() - initializationSucceededEvent = { - 'component': 'zensyslog', - 'device': self.options.monitor, - 'eventClass': "/Status", - 'eventKey': "SyslogMessageFilterInit", - 'summary': 'initialized', - 'severity': Clear, - } - self.sendEvent(initializationSucceededEvent) - except Exception as e: - initializationFailedEvent = { - 'component': 'zensyslog', - 'device': self.options.monitor, - 'eventClass': "/Status", - 'eventKey': "SyslogMessageFilterInit", - 'summary': 'initialization failed', - 'message': e.message, - 'severity': Critical, - } - log.error("Failed to initialize syslog message filter: %s", e.message) - self.sendEvent(initializationFailedEvent) - self.setExitCode(1) - self.stop() - - def _updateConfig(self, cfg): - result = super(SyslogDaemon, self)._updateConfig(cfg) - if result: - self._syslogMsgFilter.updateRuleSet(cfg.syslogMsgEvtFieldFilterRules) - return result - - def _displayStatistics(self, verbose=False): - super(SyslogDaemon, self)._displayStatistics(verbose) - sendEventsOnCounters = ['eventFilterDroppedCount', 'eventParserDroppedCount'] - if not hasattr(self, 'lastCounterEventTime'): - self.lastCounterEventTime = time.time() - # Send an update event every hour - if self.lastCounterEventTime < (time.time() - 3600): - for counterName in sendEventsOnCounters: - counterEvent = { - 'component': 'zensyslog', - 'device': self.options.monitor, - 'eventClass': "/App/Zenoss", - 'eventKey': "zensyslog.{}".format(counterName), - 'summary': '{}: {}'.format( - counterName, - self.counters[counterName]), - 'severity': Info, - } - self.sendEvent(counterEvent) - self.lastCounterEventTime = time.time() - - - -if __name__=='__main__': - myPreferences = SyslogPreferences() - myTaskFactory = SimpleTaskFactory(SyslogConfigTask) - myTaskSplitter = SimpleTaskSplitter(myTaskFactory) - daemon = SyslogDaemon(myPreferences, myTaskSplitter) - daemon.run() diff --git a/bin/zeneventmigrate b/Products/ZenEvents/zensyslog/__init__.py old mode 100755 new mode 100644 similarity index 61% rename from bin/zeneventmigrate rename to Products/ZenEvents/zensyslog/__init__.py index 21eb6b0652..654b7f5ef6 --- a/bin/zeneventmigrate +++ b/Products/ZenEvents/zensyslog/__init__.py @@ -1,13 +1,15 @@ -#! /usr/bin/env bash ############################################################################## -# -# Copyright (C) Zenoss, Inc. 2011, all rights reserved. -# +# +# Copyright (C) Zenoss, Inc. 2024, all rights reserved. +# # This content is made available according to terms specified in # License.zenoss under the directory where your Zenoss product is installed. -# +# ############################################################################## +from __future__ import absolute_import, print_function -. $ZENHOME/bin/zenfunctions -$PYTHON $ZENHOME/Products/ZenEvents/zeneventmigrate.py "$CMD" "$@" + +def main(): + from .daemon import SyslogDaemon + SyslogDaemon().run() diff --git a/Products/ZenEvents/zensyslog/__main__.py b/Products/ZenEvents/zensyslog/__main__.py new file mode 100644 index 0000000000..8ff54492bd --- /dev/null +++ b/Products/ZenEvents/zensyslog/__main__.py @@ -0,0 +1,15 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +if __name__ == "__main__": + from Products.ZenEvents.zensyslog import main + + main() diff --git a/Products/ZenEvents/zensyslog/config.py b/Products/ZenEvents/zensyslog/config.py new file mode 100644 index 0000000000..36505f58eb --- /dev/null +++ b/Products/ZenEvents/zensyslog/config.py @@ -0,0 +1,63 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2008, 2011, 2023, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +from twisted.spread import pb + + +class ConfigChecksums(pb.Copyable, pb.RemoteCopy): + """ + Object for requesting zensyslog config data. + + Each field is a token returned from zenhub. For the first request, + the fields should be None. + """ + + __slots__ = ("priority", "parsers", "use_summary", "rules") + + def __init__( + self, priority=None, parsers=None, use_summary=None, rules=None + ): + self.priority = priority + self.parsers = parsers + self.use_summary = use_summary + self.rules = rules + + def __repr__(self): + return "{}({})".format( + self.__class__.__name__, + ", ".join( + "{}={}".format(name, getattr(self, name)) + for name in self.__slots__ + ), + ) + + +pb.setUnjellyableForClass(ConfigChecksums, ConfigChecksums) + + +class ConfigUpdates(pb.Copyable, pb.RemoteCopy): + """ + Configuration for zensyslog. + """ + + __slots__ = ("priority", "parsers", "use_summary", "rules", "checksums") + + def __init__( + self, priority=None, parsers=None, use_summary=None, rules=None + ): + self.priority = priority + self.parsers = parsers + self.use_summary = use_summary + self.rules = rules + self.checksums = ConfigChecksums() + + +pb.setUnjellyableForClass(ConfigUpdates, ConfigUpdates) diff --git a/Products/ZenEvents/zensyslog/daemon.py b/Products/ZenEvents/zensyslog/daemon.py new file mode 100644 index 0000000000..e853c8f11b --- /dev/null +++ b/Products/ZenEvents/zensyslog/daemon.py @@ -0,0 +1,314 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2008, 2011, 2023, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import socket +import sys + +from twisted.internet import defer, reactor +from twisted.internet.task import LoopingCall +from zope.component import provideUtility + +from Products.ZenCollector.utils.maintenance import ZenHubHeartbeatSender +from Products.ZenEvents.ZenEventClasses import Info +from Products.ZenHub.interfaces import ICollectorEventTransformer +from Products.ZenHub.PBDaemon import PBDaemon + +from .loader import ConfigLoader +from .loggers import DropLogger, MessageLogger, RawFormatter, HumanFormatter +from .processor import Parsers, SyslogProcessor +from .protocol import SyslogProtocol +from .receiver import AdoptPort, CreatePort, Receiver +from .transformer import FilterRules, SyslogMsgFilter + +_dropped_counter_names = ("eventFilterDroppedCount", "eventParserDroppedCount") +_drop_events_task_interval = 3600 + + +class SyslogDaemon(PBDaemon): + """ + Daemon for receiving SysLog events and recording them as Zenoss events. + """ + + mname = name = "zensyslog" + + _configservice = "Products.ZenHub.services.SyslogConfig" + initialServices = PBDaemon.initialServices + [_configservice] + + def __init__(self, *args, **kwargs): + super(SyslogDaemon, self).__init__(*args, **kwargs) + + self.configCycleInterval = 2 * 60 # seconds + self.cycleInterval = 5 * 60 # seconds + + self._rules = FilterRules(self) + self._event_filter = SyslogMsgFilter(self._rules, self.counters) + provideUtility(self._event_filter, ICollectorEventTransformer) + + self._heartbeat_sender = ZenHubHeartbeatSender( + self.options.monitor, + self.name, + self.options.heartbeatTimeout, + ) + self._heartbeat_task = None + + self._parsers = Parsers(self) + self._processor = SyslogProcessor( + self.sendEvent, + self.options.minpriority, + self.options.parsehost, + self.options.monitor, + self._parsers, + ) + self._loader = ConfigLoader( + self.getRemoteConfigServiceProxy, + self._parsers, + self._processor, + self._rules, + ) + self._loader_task = None + + self._drop_events_task = None + + self._receiver = None + + def buildOptions(self): + super(SyslogDaemon, self).buildOptions() + try: + SYSLOG_PORT = socket.getservbyname("syslog", "udp") + except socket.error: + SYSLOG_PORT = 514 + self.parser.add_option( + "--parsehost", + dest="parsehost", + action="store_true", + default=False, + help="Try to parse the hostname part of a syslog HEADER", + ) + self.parser.add_option( + "--stats", + dest="stats", + action="store_true", + default=False, + help="Print statistics to log every 2 secs", + ) + self.parser.add_option( + "--logorig", + dest="logorig", + action="store_true", + default=False, + help="Log the original message", + ) + self.parser.add_option( + "--logformat", + dest="logformat", + default="human", + help="Human-readable (/var/log/messages) or raw (wire)", + ) + self.parser.add_option( + "--minpriority", + dest="minpriority", + default=6, + type="int", + help="Minimum priority message that zensyslog will accept", + ) + self.parser.add_option( + "--syslogport", + dest="syslogport", + default=SYSLOG_PORT, + type="int", + help="Port number to use for syslog events", + ) + self.parser.add_option( + "--listenip", + dest="listenip", + default="0.0.0.0", # noqa: S104 + help="IP address to listen on. Default is %default", + ) + self.parser.add_option( + "--useFileDescriptor", + dest="useFileDescriptor", + type="int", + help="Read from an existing connection rather opening a new port.", + default=None, + ) + self.parser.add_option( + "--noreverseLookup", + dest="noreverseLookup", + action="store_true", + default=False, + help="Don't convert the remote device's IP address to a hostname.", + ) + + # @override + def run(self): + if ( + not self.options.useFileDescriptor + and self.options.syslogport < 1024 + ): + self.log.info( + "opening privileged port %s", self.options.syslogport + ) + # Makes a call to zensocket here, + # which performs an exec* so it never returns. + self.openPrivilegedPort( + "--listen", + "--proto=udp", + "--port=%s:%d" + % (self.options.listenip, self.options.syslogport), + ) + self.log.error("Failed to open privileged port") + sys.exit(1) + super(SyslogDaemon, self).run() + + # @override + @defer.inlineCallbacks + def connected(self): + try: + # initial config load + yield self._loader.task() + + self._start_heartbeat_task() + self._start_loader_task() + self._start_drop_events_task() + self._start_receiver() + except Exception: + self.log.exception("BOOM!") + + # @override + def postStatisticsImpl(self): + if self._receiver is None: + return + totalTime, totalEvents, maxTime = self._processor.stats.report() + self.rrdStats.counter("events", totalEvents) + + @defer.inlineCallbacks + def getRemoteConfigServiceProxy(self): + """Return the remote configuration service proxy.""" + proxy = yield self.getService(self._configservice) + defer.returnValue(proxy) + + def _start_heartbeat_task(self): + self._heartbeat_task = LoopingCall(self._heartbeat_sender.heartbeat) + self._heartbeat_task.start(self.cycleInterval) + reactor.addSystemEventTrigger( + "before", "shutdown", self._stop_heartbeat_task + ) + self.log.info("started task for sending heartbeats") + + def _stop_heartbeat_task(self): + if self._heartbeat_task is None: + return + self._heartbeat_task.stop() + self._heartbeat_task = None + self.log.info("stopped task for sending heartbeats") + + def _start_loader_task(self): + self._loader_task = LoopingCall(self._loader.task) + self._loader_task.start(self.cycleInterval) + reactor.addSystemEventTrigger( + "before", "shutdown", self._stop_loader_task + ) + self.log.info("started task to retrieve configuration data") + + def _stop_loader_task(self): + if self._loader_task is None: + return + self._loader_task.stop() + self._loader_task = None + self.log.info("stopped task to retrieve configuration data") + + def _start_drop_events_task(self): + self._drop_events_task = LoopingCall(self._send_drop_events) + self._drop_events_task.start(_drop_events_task_interval) + reactor.addSystemEventTrigger( + "before", "shutdown", self._stop_drop_events_task + ) + self.log.info( + "started task to send events with the count of dropped events" + ) + + def _stop_drop_events_task(self): + if self._drop_events_task is None: + return + self._drop_events_task.stop() + self._drop_events_task = None + self.log.info( + "stopped task to send events with the count of dropped events" + ) + + def _start_receiver(self): + protocol = self._build_protocol() + portfactory = self._build_port_factory() + self._receiver = Receiver(protocol, portfactory) + self._receiver.start() + reactor.addSystemEventTrigger( + "before", "shutdown", self._stop_receiver + ) + reactor.addSystemEventTrigger( + "after", "shutdown", self._displayStatistics + ) + self.log.info("started receiving syslog messages") + + def _stop_receiver(self): + if self._receiver is None: + return + self._receiver.stop() + self._receiver = None + self.log.info("stopped receiving syslog messages") + + def _build_protocol(self): + if self.options.logorig: + if self.options.logformat == "human": + formatter = HumanFormatter() + else: + formatter = RawFormatter() + logger = MessageLogger(formatter) + else: + logger = DropLogger() + + return SyslogProtocol( + self._processor, + logger, + self.counters, + self.options.noreverseLookup, + ) + + def _build_port_factory(self): + if self.options.useFileDescriptor is not None: + fd = int(self.options.useFileDescriptor) + return AdoptPort(fd) + return CreatePort(self.options.syslogport, self.options.listenip) + + def _send_drop_events(self): + for name in _dropped_counter_names: + count = self.counters[name] + event = { + "component": self.name, + "device": self.options.monitor, + "eventClass": "/App/Zenoss", + "eventKey": "zensyslog.{}".format(name), + "summary": "{}: {}".format(name, count), + "severity": Info, + } + self.sendEvent(event) + + def _displayStatistics(self): + totalTime, totalEvents, maxTime = self._processor.stats.report() + display = "%d events processed in %.2f seconds" % ( + totalEvents, + totalTime, + ) + if totalEvents > 0: + display += ( + "\n%.5f average seconds per event\n" + "Maximum processing time for one event was %.5f\n" + ) % ((totalTime / totalEvents), maxTime) + return display diff --git a/Products/ZenEvents/zensyslog/loader.py b/Products/ZenEvents/zensyslog/loader.py new file mode 100644 index 0000000000..58cf0153f7 --- /dev/null +++ b/Products/ZenEvents/zensyslog/loader.py @@ -0,0 +1,80 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2008, 2011, 2023, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging + +from twisted.internet import defer + +from .config import ConfigChecksums + +log = logging.getLogger("zen.zensyslog.configloader") + + +class ConfigLoader(object): + """Handles retrieving additional dynamic configs for daemon from ZODB""" + + def __init__(self, servicefactory, parsers, processor, rules): + self._servicefactory = servicefactory + self._parsers = parsers + self._processor = processor + self._rules = rules + self._checksums = ConfigChecksums() + + @defer.inlineCallbacks + def task(self): + """ + Contact zenhub and gather configuration data. + """ + log.debug("retrieving zensyslog configuration") + try: + service = yield self._servicefactory() + updates = yield service.callRemote("getConfig", self._checksums) + except Exception: + log.exception("failed to retrieve syslog configuration") + else: + log.debug("zensyslog configuration retrieved") + self._process_priorty(updates) + self._process_parsers(updates) + self._process_use_summary(updates) + self._process_rules(updates) + log.debug("applied zensyslog configuration changes") + + def _process_priorty(self, updates): + if updates.checksums.priority is None: + return + state = "updated" if self._checksums.priority else "initial" + log.info("received %s default event priority", state) + self._checksums.priority = updates.checksums.priority + self._processor.priority = updates.priority + + def _process_use_summary(self, updates): + if updates.checksums.use_summary is None: + return + state = "disable" if not updates.use_summary else "enable" + log.info("%s using syslog event summary as event message ", state) + self._checksums.use_summary = updates.checksums.use_summary + self._processor.use_summary = updates.use_summary + + def _process_parsers(self, updates): + if updates.checksums.parsers is None: + return + state = "updated" if self._checksums.parsers else "initial" + log.info("received %s syslog event parsers", state) + self._checksums.parsers = updates.checksums.parsers + self._parsers.update(updates.parsers) + + def _process_rules(self, updates): + if updates.checksums.rules is None: + return + state = "updated" if self._checksums.rules else "initial" + log.info("received %s event field filter rules", state) + self._checksums.rules = updates.checksums.rules + self._rules.update(updates.rules) diff --git a/Products/ZenEvents/zensyslog/loggers.py b/Products/ZenEvents/zensyslog/loggers.py new file mode 100644 index 0000000000..5124e5427e --- /dev/null +++ b/Products/ZenEvents/zensyslog/loggers.py @@ -0,0 +1,105 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging +import time + +from Products.ZenUtils.Utils import zenPath + +SYSLOG_DATE_FORMAT = "%b %d %H:%M:%S" +SAMPLE_DATE = "Apr 10 15:19:22" + + +class DropLogger(object): + """ + Messages are not written anywhere. + """ + + def log(self, message, address): + pass + + +class MessageLogger(object): + """ + Writes syslog messages to a log file. + """ + + def __init__(self, formatter): + self._formatter = formatter + self._log = _get_logger() + + def log(self, data, address): + message = self._formatter(data, address) + self._log.info(message) + + +def _get_logger(self): + log = logging.getLogger("origsyslog") + log.setLevel(logging.INFO) + log.propagate = False + filepath = zenPath("log/origsyslog.log") + handler = logging.FileHandler(filepath) + handler.setFormatter(logging.Formatter("%(message)s")) + log.addHandler(handler) + return log + + +class RawFormatter(object): + def __call__(self, data, address): + return data + + +class HumanFormatter(object): + """ + Expands a syslog message into a string format suitable for writing + to the filesystem such that it appears the same as it would + had the message been logged by the syslog daemon. + + @param msg: syslog message + @type msg: string + @param client_address: IP info of the remote device (ipaddr, port) + @type client_address: tuple of (string, number) + @return: message + @rtype: string + """ + + def __call__(self, data, address): + # pri := (facility * 8) + severity + stop = data.find(">") + + # check for a datestamp. default to right now if date not present + start = stop + 1 + stop = start + len(SAMPLE_DATE) + dateField = data[start:stop] + try: + date = time.strptime(dateField, SYSLOG_DATE_FORMAT) + year = time.localtime()[0] + date = (year,) + date[1:] + start = stop + 1 + except ValueError: + # date not present, so use today's date + date = time.localtime() + + # check for a hostname. default to localhost if not present + stop = data.find(" ", start) + if data[stop - 1] == ":": + hostname = address[0] + else: + hostname = data[start:stop] + start = stop + 1 + + # the message content + body = data[start:] + + # assemble the message + prettyTime = time.strftime(SYSLOG_DATE_FORMAT, date) + message = "%s %s %s" % (prettyTime, hostname, body) + return message diff --git a/Products/ZenEvents/zensyslog/processor.py b/Products/ZenEvents/zensyslog/processor.py new file mode 100644 index 0000000000..bae6490685 --- /dev/null +++ b/Products/ZenEvents/zensyslog/processor.py @@ -0,0 +1,347 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2007, 2023 all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging +import re +import time + +from collections import Sequence + +import six + +from Products.ZenEvents.EventServer import Stats +from Products.ZenEvents.ZenEventClasses import Error +from Products.ZenUtils.IpUtil import isip + +from . import rfc3164 + +log = logging.getLogger("zen.zensyslog.processor") + + +class SyslogProcessor(object): + """ + Class to process syslog messages and convert them into events viewable + in the Zenoss event console. + """ + + def __init__( + self, + sendEvent, + minpriority, + parsehost, + monitor, + parsers, + ): + """ + Initialize a SyslogProcessor instance. + + @param sendEvent: message from a remote host + @type sendEvent: string + @param minpriority: ignore anything under this priority + @type minpriority: integer + @param parsehost: hostname where this parser is running + @type parsehost: string + @param monitor: name of the distributed collector monitor + @type monitor: string + @param defaultPriority: priority to use if it can't be understood + from the received packet + @type defaultPriority: integer + @param syslogParsers: configureable syslog parsers + @type defaultPriority: list + """ + self.minpriority = minpriority + self.parsehost = parsehost + self.sendEvent = sendEvent + self.monitor = monitor + self.parsers = parsers + + # These are set as found on the EventManagerBase class. + self.use_summary = False + self._severity = rfc3164.Severity.Error + + self.stats = Stats() + + @property + def priority(self): + """Return the default syslog severity value.""" + return self._severity.value + + @priority.setter + def priority(self, value): + self._severity = rfc3164.Severity(value) + + def process(self, msg, ipaddr, host, rtime): + """ + Process an event from syslog and convert to a Zenoss event + + Returns either "EventSent" or "ParserDropped" + + @param msg: message from a remote host + @type msg: string + @param ipaddr: IP address of the remote host + @type ipaddr: string + @param host: remote host's name + @type host: string + @param rtime: time as reported by the remote host + @type rtime: string + """ + try: + fac, sev, dt, hostname, mesg = self._parse_message(msg) + except rfc3164.SyslogMessageError as ex: + log.error("bad syslog message: %s", ex) + return + + # Lower values mean higher severity/priority + if sev.value > self.minpriority: + log.debug("syslog severity below minimum value=%s", sev.value) + return + + event, drop = self._build_event(mesg, host, ipaddr, rtime, fac, sev) + if drop: + return drop + + self._maybe_add_originalTime(event, dt) + self._maybe_add_device(event, hostname) + self._maybe_use_summary_for_message(event, mesg) + self._maybe_overwrite_severity(event) + self._maybe_add_eventclasskey_value(event) + self._maybe_add_message(event, mesg) + + self._convert_to_unicode(event) + + self.sendEvent(event) + self.stats.add(time.time() - rtime) + return "EventSent" + + def _parse_message(self, message): + fac, sev, dt, hostname, mesg = rfc3164.parse(message) + + # Use default severity if a severity was not found in message + sev = sev if sev else self._severity + + return (fac, sev, dt, hostname, mesg) + + def _build_event(self, mesg, host, ipaddr, rtime, fac, sev): + fields, index, drop = parse_MSG(mesg, self.parsers) + if drop: + return (None, "ParserDropped") + + event = { + "device": host, + "monitor": self.monitor, + "ipAddress": ipaddr, + "firstTime": rtime, + "lastTime": rtime, + "eventGroup": "syslog", + "facility": fac.value if fac else None, + "priority": sev.value, + "severity": sev.as_event_severity(), + "parserRuleMatched": index, + } + event.update(fields) + return (event, None) + + def _maybe_add_originalTime(self, event, dt): + if dt: + event["originalTime"] = dt.strftime("%b %d %H:%M:%S") + + def _maybe_add_device(self, event, hostname): + if self.parsehost and hostname: + event["device"] = hostname + if isip(hostname): + event["ipAddress"] = hostname + else: + del event["ipAddress"] + + def _maybe_use_summary_for_message(self, event, mesg): + if self.use_summary: + event["message"] = event.get("summary", "") + event["unparsedMessage"] = mesg + + def _maybe_overwrite_severity(event): + if "overwriteSeverity" not in event: + return + overwrite_v = int(event["overwriteSeverity"]) + overwrite = rfc3164.Severity(overwrite_v) + old_severity = event["severity"] + new_severity = overwrite.as_event_severity() + log.debug( + "Severity overwritten in message tag. Previous:%s Current:%s", + old_severity, + new_severity, + ) + event["severity"] = new_severity + + def _maybe_add_eventclasskey_value(self, event): + value = getEventClassKeyValue(event) + if value: + event["eventClassKey"] = value + + def _maybe_add_message(self, event, mesg): + if "message" not in event: + event["message"] = mesg + + def _convert_to_unicode(self, event): + # Convert strings to unicode, previous code converted 'summary' & + # 'message' fields. With parsing group name matching, good idea to + # convert all fields. + event.update( + { + k: six.text_type(v) + for k, v in event.iteritems() + if isinstance(v, six.binary_type) + } + ) + + +def parse_MSG(msg, parsers): + """ + Parse the RFC-3164 tag of the syslog message using the regex defined + at the top of this module. + + @param msg: message from host + @type msg: string + @return: dictionary of event properties + @type: dictionary + """ + log.debug("[parsed_Tag] message=%s", msg) + fields = {} + for i, parser in enumerate(parsers): + log.debug("parser[%s] regex: %s", i, parser.pattern) + result = parser.parse(msg) + if result is None: + continue + if not parser.keep: + log.debug( + "parser[%s] matched but DROPPED due to parser. " + "msg:%r, pattern:%r, parsedGroups:%r", + i, + msg, + parser.pattern, + result, + ) + return None, -1, True + log.debug( + "parser[%s] matched. msg:%r, pattern:%r, parsedGroups:%r", + i, + msg, + parser.pattern, + result, + ) + return result, i, False + else: + log.debug("No matching parser: %r", msg) + fields["summary"] = msg + return fields, -1, False + + +def getEventClassKeyValue(evt): + """ + Build the key used to find an events dictionary record. If eventClass + is defined it is used. For NT events "Source_Evid" is used. For other + syslog events we use the summary of the event to perform a full text + or'ed search. + + @param evt: dictionary of event properties + @type evt: dictionary + @return: dictionary of event properties + @type: dictionary + """ + if "eventClassKey" in evt or "eventClass" in evt: + return None + + if "ntevid" in evt: + value = "{component}_{ntevid}".format(**evt) + elif "component" in evt: + value = evt["component"] + else: + value = None + + if value: + try: + value = value.decode("latin-1") + except Exception: + value = value.decode("utf-8") + + return value + + +_parser_error_event = { + "device": "127.0.0.1", + "eventClass": "/App/Zenoss", + "severity": Error, + "eventClassKey": "", + "summary": "Syslog Parser processing issue", + "component": "zensyslog", +} + + +class _Parser(object): + __slots__ = ("_matcher", "keep") + + def __init__(self, matcher, keep): + self._matcher = matcher + self.keep = keep + + @property + def pattern(self): + return self._matcher.pattern + + def parse(self, text): + m = self._matcher.search(text) + return m.groupdict() if m else None + + +class Parsers(Sequence): + def __init__(self, sendevent): + self._sendevent = sendevent + self._parsers = [] + + def __getitem__(self, offset): + return self._parsers[offset] + + def __len__(self): + return len(self._parsers) + + def update(self, source): + parsers = [] + for idx, spec in enumerate(source): + if "expr" not in spec: + msg = ( + 'Parser configuration #{} missing a "expr" attribute' + ).format(idx) + log.warn(msg) + self._send_error_event(message=msg) + continue + try: + matcher = re.compile(spec["expr"], re.DOTALL) + parser = _Parser(matcher, spec["keep"]) + except Exception as ex: + msg = ( + "Parser configuration #{} Could not compile expression " + '"{!r}", {!r}' + ).format(idx, spec["expr"], ex) + log.warn(msg) + self._send_error_event(message=msg) + else: + parsers.append(parser) + self._parsers[:] = parsers + + def _send_error_event(self, **kwargs): + """ + Build an Event dict from parameters.n + """ + if kwargs: + event = _parser_error_event.copy() + event.update(kwargs) + else: + event = _parser_error_event + self._sendevent(event) diff --git a/Products/ZenEvents/zensyslog/protocol.py b/Products/ZenEvents/zensyslog/protocol.py new file mode 100644 index 0000000000..3530973d04 --- /dev/null +++ b/Products/ZenEvents/zensyslog/protocol.py @@ -0,0 +1,76 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging +import time + +from twisted.internet import defer +from twisted.internet.protocol import DatagramProtocol + +from Products.ZenUtils.IpUtil import asyncNameLookup + +log = logging.getLogger("zen.zensyslog.protocol") + + +class SyslogProtocol(DatagramProtocol): + """ + Implementation to listen for syslog messages. + """ + + def __init__(self, processor, messagelogger, counters, noreverselookup): + self._processor = processor + self._messagelogger = messagelogger + self._counters = counters + self._gethostname = ( + defer.succeed if noreverselookup else asyncNameLookup + ) + + def datagramReceived(self, packet, address): + """ + Consume the network packet + + @param data: syslog message + @type data: string + @param address: IP info of the remote device (ipaddr, port) + @type address: tuple of (string, number) + """ + if packet == "": + log.debug("received empty datagram. Discarding.") + return + log.debug("received packet from %s -> %s", address, packet) + self._messagelogger.log(packet, address) + + (ipaddr, port) = address + d = self._gethostname(ipaddr) + data = (packet, ipaddr, time.time()) + d.addCallback(self._handle_message, data) + d.addErrback(self._convert_error, data) + + def doStop(self): + log.info("stop receiving syslog messages") + + def _convert_error(self, error, data): + # On failure, use the ip address as the hostname. + self._handle_message(data[1], data) + + def _handle_message(self, hostname, data): + """ + Send the resolved address, if possible, and the event via the thread + + @param response: Twisted response + @type response: Twisted response + @param data: (msg, ipaddr, rtime) + @type data: tuple of (string, string, datetime object) + """ + (packet, ipaddr, rtime) = data + result = self._processor.process(packet, ipaddr, hostname, rtime) + if result == "ParserDropped": + self._counters["eventParserDroppedCount"] += 1 diff --git a/Products/ZenEvents/zensyslog/receiver.py b/Products/ZenEvents/zensyslog/receiver.py new file mode 100644 index 0000000000..100d0cda16 --- /dev/null +++ b/Products/ZenEvents/zensyslog/receiver.py @@ -0,0 +1,75 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2008, 2011, 2023, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging +import os +import socket + +from twisted.internet import reactor + +log = logging.getLogger("zen.zensyslog.receiver") + + +class Receiver(object): + """ + Listens for syslog messages and turns them into Zenoss events. + """ + + def __init__(self, protocol, portfactory): + self._protocol = protocol + self._portfactory = portfactory + self._port = None + + def start(self): + self._port = self._portfactory(self._protocol) + + def stop(self): + if self._port is None: + return + self._port.stopListening() + self._port = None + + +class CreatePort(object): + def __init__(self, port, interface): + self._port = port + self._interface = interface + + def __call__(self, protocol): + return reactor.listenUDP( + self._port, protocol, interface=self._interface + ) + + +class AdoptPort(object): + def __init__(self, fd): + self._fd = fd + + def __call__(self, protocol): + # Create a datagram socket from the specific file descriptor + sock = socket.fromfd(self._fd, socket.AF_INET, socket.SOCK_DGRAM) + + # No longer need the file descriptor; `fromfd` created a duplicate. + os.close(self._fd) + del self._fd + + # Set the socket non-blocking + sock.setblocking(False) + + try: + # Adopt the socket and keep a reference to the IListeningPort. + return reactor.adoptDatagramPort( + sock.fileno(), socket.AF_INET, protocol + ) + finally: + # No longer need the socket; + # `adoptDatagramPort` created a duplicate. + sock.close() diff --git a/Products/ZenEvents/zensyslog/rfc3164/__init__.py b/Products/ZenEvents/zensyslog/rfc3164/__init__.py new file mode 100644 index 0000000000..83363a2019 --- /dev/null +++ b/Products/ZenEvents/zensyslog/rfc3164/__init__.py @@ -0,0 +1,15 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024 all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import + +from .parser import parse, SyslogMessageError +from .severity import Severity + +__all__ = ("parse", "Severity", "SyslogMessageError") diff --git a/Products/ZenEvents/zensyslog/rfc3164/facility.py b/Products/ZenEvents/zensyslog/rfc3164/facility.py new file mode 100644 index 0000000000..5a9217c0ec --- /dev/null +++ b/Products/ZenEvents/zensyslog/rfc3164/facility.py @@ -0,0 +1,71 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024 all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import + +from enum import IntEnum + + +class Facility(IntEnum): + kernel = 0 + user = 1 + mail = 2 + system = 3 + security4 = 4 + syslogd = 5 + printer = 6 + network_news = 7 + uucp = 8 + clock9 = 9 + security10 = 10 + ftp = 11 + ntp = 12 + log_audit = 13 + log_alert = 14 + clock15 = 15 + local0 = 16 + local1 = 17 + local2 = 18 + local3 = 19 + local4 = 20 + local5 = 21 + local6 = 22 + local7 = 23 + + @property + def description(self): + return _descriptions.get(self.value) + + +_descriptions = { + 0: "kernel messages", + 1: "user-level messages", + 2: "mail system", + 3: "system daemons", + 4: "security/authorization messages", + 5: "messages generated internally by syslogd", + 6: "line printer subsystem", + 7: "network news subsystem", + 8: "UUCP subsystem", + 9: "clock daemon", + 10: "security/authorization messages", + 11: "FTP daemon", + 12: "NTP subsystem", + 13: "log audit", + 14: "log alert", + 15: "clock daemon", + 16: "local use 0 (local0)", + 17: "local use 1 (local1)", + 18: "local use 2 (local2)", + 19: "local use 3 (local3)", + 20: "local use 4 (local4)", + 21: "local use 5 (local5)", + 22: "local use 6 (local6)", + 23: "local use 7 (local7)", +} diff --git a/Products/ZenEvents/zensyslog/rfc3164/parser.py b/Products/ZenEvents/zensyslog/rfc3164/parser.py new file mode 100644 index 0000000000..6fe55fea84 --- /dev/null +++ b/Products/ZenEvents/zensyslog/rfc3164/parser.py @@ -0,0 +1,121 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024 all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import + +import logging +import re + +import dateutil.parser + +from .facility import Facility +from .severity import Severity + +log = logging.getLogger("zen.zensyslog.parser") + + +class SyslogMessageError(ValueError): + """Raised when the syslog message has bad values""" + + +def parse(message): + """ + Return a parsed syslog (RFC 3164) message. + + Return a tuple having four elements: + + (facility, severity, datetime, hostname, message) + + The 'message' is the remaining content of the original message + minus the 'facility', 'severity', 'datetime', and 'hostname' parts. + """ + start = 0 + start, facility, severity = _extract_pri(start, message) + start, dt = _extract_timestamp(start, message) + start, hostname = _extract_hostname(start, message) + return (facility, severity, dt, hostname, message[start:].strip()) + + +def _extract_pri(start, mesg): + """ + Parse RFC-3164 PRI part of syslog message to get facility and priority. + + Returns a tuple containing a dict of the parsed fields and unparsed + portion of the syslog message string. + + @param msg: message from host + @type msg: string + @return: tuple of dictionary of event properties and the message + @type: (dictionary, string) + """ + if mesg[start:1] == "<": + posn = mesg.find(">") + pvalue = mesg[start + 1 : posn] + try: + pvalue = int(pvalue) + except ValueError: + raise SyslogMessageError( + "Found '{}' instead of a number for priority".format(pvalue) + ) + fac, sev = divmod(pvalue, 8) + try: + facility = Facility(fac) + except ValueError: + raise SyslogMessageError("Invalid facility value '{}'".format(fac)) + try: + severity = Severity(sev) + except ValueError: + raise SyslogMessageError("Invalid severity value '{}'".format(sev)) + return (posn + 1, facility, severity) + + if mesg and mesg[start] < " ": + sev = ord(mesg[start]) + try: + severity = Severity(sev) + except ValueError: + raise SyslogMessageError("Invalid severity value '{}'".format(sev)) + return (start + 1, Facility.kernel, severity) + + log.debug("no priority found in message") + return (start, None, None) + + +_match_timestamp = re.compile( + "^(\S{3} [\d ]{2} [\d ]{2}:[\d ]{2}:[\d ]{2}(?:\.\d{1,3})?)", re.DOTALL +).search + + +def _extract_timestamp(start, mesg): + m = _match_timestamp(mesg[start:]) + if not m: + log.debug("no timestamp found in message") + return (start, None) + ts = m.group(0) + try: + dt = dateutil.parser.parse(ts) + except ValueError: + raise SyslogMessageError("Invalid timestamp '{}'".format(ts)) + else: + return (start + len(ts) + 1, dt) + + +_not_hostname = re.compile(r"[\[:]").search + + +def _extract_hostname(start, mesg): + offset = mesg[start:].find(" ") + if offset < 0: + log.debug("unexpected end of message") + return start, None + hostname = mesg[start : start + offset] + if _not_hostname(hostname): + log.debug("no hostname found in message") + return start, None + hostname = hostname.split("@", 1)[-1] + return (start + offset), hostname diff --git a/Products/ZenEvents/zensyslog/rfc3164/severity.py b/Products/ZenEvents/zensyslog/rfc3164/severity.py new file mode 100644 index 0000000000..8ea453f17e --- /dev/null +++ b/Products/ZenEvents/zensyslog/rfc3164/severity.py @@ -0,0 +1,55 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2024 all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import + +from enum import IntEnum + +from Products.ZenEvents import ZenEventClasses as _zec + + +class Severity(IntEnum): + Emergency = 0 + Alert = 1 + Critical = 2 + Error = 3 + Warning = 4 + Notice = 5 + Informational = 6 + Debug = 7 + + @property + def description(self): + return _descriptions.get(self.value) + + def as_event_severity(self): + return _syslog_to_zenoss.get(self.value) + + +_descriptions = { + 0: "system is unusable", + 1: "action must be taken immediately", + 2: "critical conditions", + 3: "error conditions", + 4: "warning conditions", + 5: "normal but significant condition", + 6: "informational messages", + 7: "debug-level messages", +} + +_syslog_to_zenoss = { + 0: _zec.Critical, + 1: _zec.Critical, + 2: _zec.Critical, + 3: _zec.Error, + 4: _zec.Warning, + 5: _zec.Info, + 6: _zec.Info, + 7: _zec.Debug, +} diff --git a/Products/ZenEvents/zensyslog/tests/__init__.py b/Products/ZenEvents/zensyslog/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Products/ZenEvents/zensyslog/tests/test_processor.py b/Products/ZenEvents/zensyslog/tests/test_processor.py new file mode 100644 index 0000000000..d3d63740d3 --- /dev/null +++ b/Products/ZenEvents/zensyslog/tests/test_processor.py @@ -0,0 +1,208 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2008, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging + +from unittest import TestCase + +import six + +from Products.ZenEvents.zensyslog.processor import ( + getEventClassKeyValue, + Parsers, + parse_MSG, +) +from Products.ZenEvents.EventManagerBase import EventManagerBase + + +class TestGetEventClassKeyValue(TestCase): + base = {"device": "localhost", "component": "component", "severity": 3} + + def setUp(t): + logging.getLogger().setLevel(logging.CRITICAL + 10) + + def tearDown(t): + logging.getLogger().setLevel(logging.NOTSET) + + def test_empty(t): + empty = {} + result = getEventClassKeyValue(empty.copy()) + t.assertIsNone(result) + + def test_eventClassKey(t): + evt = dict(eventClassKey="akey", **t.base) + result = getEventClassKeyValue(evt.copy()) + t.assertIsNone(result) + + def test_eventClassKey_and_ntevid(t): + evt = dict(eventClassKey="akey", ntevid="1234", **t.base) + result = getEventClassKeyValue(evt.copy()) + t.assertIsNone(result) + + def test_ntevid(t): + evt = dict(ntevid="1234", **t.base) + result = getEventClassKeyValue(evt.copy()) + t.assertEqual(result, "component_1234") + + def test_default(t): + evt = dict(**t.base) + result = getEventClassKeyValue(evt.copy()) + t.assertEqual(result, "component") + + +class TestParseMSG(TestCase): + def setUp(t): + logging.getLogger().setLevel(logging.CRITICAL + 10) + t.parsers = Parsers(t.sendEvent) + t.parsers.update(EventManagerBase.syslogParsers) + + def tearDown(t): + del t.parsers + logging.getLogger().setLevel(logging.NOTSET) + + def sendEvent(t, evt): + "Fakeout sendEvent() method" + t.sent = evt + + def test_msg_content(t): + long_text_message = ("long text message " * 20).strip() + msg = ( + "2016-08-08T11:07:33.660820-04:00 devname=localhost " + "log_id=98765434 type=component {}" + ).format(long_text_message) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertEqual(index, -1) + t.assertDictEqual(fields, {"summary": six.text_type(msg)}) + + def testCheckFortigate(t): + """ + Test of Fortigate syslog message parsing + """ + key = "987654321" + comp = "myComponent" + msg = ( + "date=xxxx devname=blue log_id={} type={} " "blah blah blah" + ).format(key, comp) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("eventClassKey"), key) + t.assertEqual(fields.get("component"), comp) + t.assertEqual( + fields.get("summary"), + "devname=blue log_id=987654321 type=myComponent blah blah blah", + ) + + def testCheckCiscoPortStatus(t): + """ + Test of Cisco port status syslog message parsing + """ + msg = ( + "Process 10532, Nbr 192.168.10.13 on GigabitEthernet2/15 " + "from LOADING to FULL, Loading Done" + ) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("process_id"), "10532") + t.assertEqual(fields.get("interface"), "GigabitEthernet2/15") + t.assertEqual(fields.get("start_state"), "LOADING") + t.assertEqual(fields.get("end_state"), "FULL") + t.assertEqual(fields.get("summary"), "Loading Done") + + def testCiscoVpnConcentrator(t): + """ + Test of Cisco VPN Concentrator syslog message parsing + """ + msg = ( + "54884 05/25/2009 13:41:14.060 SEV=3 HTTP/42 RPT=4623 " + "Error on socket accept." + ) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("eventClassKey"), "HTTP/42") + t.assertEqual(fields.get("summary"), "Error on socket accept.") + + def testCiscoStandardMessageSeverity(t): + """ + Test that the event severity is correctly extracted from the + Cisco standard message body + """ + msg = ( + "2014 Jan 31 19:45:51 R2-N6K1-2010-P1 " + "%ETH_PORT_CHANNEL-5-CREATED: port-channel1 created" + ) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("overwriteSeverity"), "5") + + def testDellSyslog(t): + """ + Test dell stuf + """ + msg = ( + "1-Oct-2009 23:00:00.383809:snapshotDelete.cc:290:INFO:8.2.5:" + "Successfully deleted snapshot " + "'UNVSQLCLUSTERTEMPDB-2009-09-30-23:00:14.11563'." + ) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("eventClassKey"), "8.2.5") + t.assertEqual( + fields.get("summary"), + "Successfully deleted snapshot " + "'UNVSQLCLUSTERTEMPDB-2009-09-30-23:00:14.11563'.", + ) + + def testDellSyslog2(t): + """ + Test dell stuf + """ + msg = ( + "2626:48:VolExec:27-Aug-2009 " + "13:15:58.072049:VE_VolSetWorker.hh:75:WARNING:43.3.2:Volume " + "volumeName has reached 96 percent of its reported size and " + "is currently using 492690MB." + ) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("eventClassKey"), "43.3.2") + t.assertEqual( + fields.get("summary"), + "Volume volumeName has reached 96 percent of its reported size " + "and is currently using 492690MB.", + ) + + def testNetAppSyslogParser(t): + """ + Test NetApp syslog parser. + """ + msg = ( + "[deviceName: 10/100/1000/e1a:warning]: Client 10.0.0.101 " + "(xid 4251521131) is trying to access an unexported mount " + "(fileid 64, snapid 0, generation 6111516 and flags 0x0 on " + "volume 0xc97d89a [No volume name available])" + ) + fields, index, drop = parse_MSG(msg, t.parsers) + t.assertFalse(drop) + t.assertTrue(index >= 0) + t.assertEqual(fields.get("component"), "10/100/1000/e1a") + t.assertEqual( + fields.get("summary"), + "Client 10.0.0.101 (xid 4251521131) is trying to access an " + "unexported mount (fileid 64, snapid 0, generation 6111516 " + "and flags 0x0 on volume 0xc97d89a [No volume name available])", + ) diff --git a/Products/ZenEvents/zensyslog/tests/test_transformer.py b/Products/ZenEvents/zensyslog/tests/test_transformer.py new file mode 100644 index 0000000000..81b925e31d --- /dev/null +++ b/Products/ZenEvents/zensyslog/tests/test_transformer.py @@ -0,0 +1,78 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2023, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +import logging +import collections + +from unittest import TestCase +from mock import Mock + +from Products.ZenHub.interfaces import TRANSFORM_CONTINUE, TRANSFORM_DROP + +from Products.ZenEvents.EventManagerBase import EventManagerBase +from Products.ZenEvents.zensyslog.transformer import ( + FilterRules, + SyslogMsgFilter, +) + + +class SyslogMsgFilterTest(TestCase): + def setUp(t): + logging.getLogger().setLevel(logging.CRITICAL + 10) + + def tearDown(t): + logging.getLogger().setLevel(logging.NOTSET) + + def testDefaultFilterRules(self): + app = Mock() + rules = FilterRules(app) + rules.update(EventManagerBase.syslogMsgEvtFieldFilterRules) + self.assertEquals(app.sendEvent.called, False) + + def testBadFilter(self): + filterCfg = {"eventClassKey": ["(BadBad"]} + app = Mock() + rules = FilterRules(app) + rules.update(filterCfg) + self.assertEqual(len(rules), 0) + self.assertTrue(app.sendEvent.called) + self.assertEquals(app.sendEvent.call_count, 1) + evtFields = app.sendEvent.mock_calls[0][1][0] + self.assertEquals( + evtFields["message"], + "Syslog Message Filter configuration for the 'eventClassKey' " + "event field could not compile rule #0 with the expression " + "of '(BadBad'. Error error('unbalanced parenthesis',)", + ) + + def testSyslogMsgFilterMatch(self): + filterCfg = {"eventClassKey": ["MARK"]} + event = { + "severity": 4, + "eventClassKey": "MARK", + "component": "zensyslog", + "summary": "test message", + "eventKey": "SyslogMessageFilter.eventClassKey.0", + "device": "127.0.0.1", + "eventClass": "/App/Zenoss", + "message": "test test 123", + } + app = Mock() + rules = FilterRules(app) + counters = collections.Counter() + counters["eventCount"] = 0 + counters["eventFilterDroppedCount"] = 0 + transformer = SyslogMsgFilter(rules, counters) + rules.update(filterCfg) + self.assertFalse(app.sendEvent.called) + result = transformer.transform(event) + self.assertEquals(result, TRANSFORM_DROP) + event["eventClassKey"] = "NotMark" + result = transformer.transform(event) + self.assertEquals(result, TRANSFORM_CONTINUE) diff --git a/Products/ZenEvents/zensyslog/transformer.py b/Products/ZenEvents/zensyslog/transformer.py new file mode 100644 index 0000000000..35097eb876 --- /dev/null +++ b/Products/ZenEvents/zensyslog/transformer.py @@ -0,0 +1,146 @@ +############################################################################## +# +# Copyright (C) Zenoss, Inc. 2023, all rights reserved. +# +# This content is made available according to terms specified in +# License.zenoss under the directory where your Zenoss product is installed. +# +############################################################################## + +from __future__ import absolute_import, print_function + +import logging +import re + +from collections import Mapping + +from zope.interface import implementer + +from Products.ZenEvents.ZenEventClasses import Error +from Products.ZenHub.interfaces import ( + ICollectorEventTransformer, + TRANSFORM_CONTINUE, + TRANSFORM_DROP, +) + +log = logging.getLogger("zen.zensyslog.transformer") + +_rule_error_event = { + "device": "127.0.0.1", + "eventClass": "/App/Zenoss", + "severity": Error, + "eventClassKey": "", + "summary": "Syslog Message Filter processing issue", + "component": "zensyslog", +} + + +@implementer(ICollectorEventTransformer) +class SyslogMsgFilter(object): + """ + Interface used to perform filtering of events at the collector. + This could be used to drop events, transform event content, etc. + + These transformers are run sequentially before a fingerprint is generated + for the event, so they can set fields which are used by an + ICollectorEventFingerprintGenerator. + + The priority of the event transformer (the transformers are executed in + ascending order using the weight of each filter). + """ + + weight = 1 + + def __init__(self, rules, counters): + self._rules = rules + self._counters = counters + + def transform(self, event): + """ + Performs any transforms of the specified event at the collector. + + @param event: The event to transform. + @type event: dict + @return: Returns TRANSFORM_CONTINUE if this event should be forwarded + on to the next transformer in the sequence, TRANSFORM_STOP if no + further transformers should be performed on this event, and + TRANSFORM_DROP if the event should be dropped. + @rtype: int + """ + relevant_rules = ( + (k, v) for k, v in self._rules.iteritems() if k in event + ) + for name, matchers in relevant_rules: + value = event.get(name) + for idx, matcher in enumerate(matchers): + matched = matcher.search(value) + if not matched: + continue + log.debug( + "drop syslog message! " + "EventFieldName:%r " + "EventFieldValue:%r " + "FilterRuleNumber:%s " + "FilterRuleExpression:%r", + name, + value, + idx, + matcher.pattern, + ) + self._counters["eventFilterDroppedCount"] += 1 + return TRANSFORM_DROP + else: + return TRANSFORM_CONTINUE + + +class FilterRules(Mapping): + """ + Rules for syslog message filtering. + """ + + def __init__(self, app): + self._app = app + self._rules = {} + + def __getitem__(self, key): + return self._rules[key] + + def __iter__(self): + return iter(self._rules) + + def __len__(self): + return len(self._rules) + + def update(self, source): + rules = {} + for name, ruledefs in source.iteritems(): + for idx, ruledef in enumerate(ruledefs): + try: + compiledRule = re.compile(ruledef, re.DOTALL) + except Exception as ex: + msg = ( + "Syslog Message Filter configuration for the " + "{!r} event field could not compile rule #{!r}" + " with the expression of {!r}. Error {!r}".format( + name, idx, ruledef, ex + ) + ) + log.warn(msg) + self._send_error_event( + message=msg, + eventKey="SyslogMessageFilter.{}.{}".format(name, idx), + ) + else: + rules.setdefault(name, []).append(compiledRule) + self._rules = rules + + def _send_error_event(self, **kwargs): + """ + Build an Event dict from parameters.n + """ + if kwargs: + event = _rule_error_event.copy() + event.update(kwargs) + else: + event = _rule_error_event + self._app.sendEvent(event) diff --git a/Products/ZenHub/services/SyslogConfig.py b/Products/ZenHub/services/SyslogConfig.py index f5a79ff6d8..80b5e1dd23 100644 --- a/Products/ZenHub/services/SyslogConfig.py +++ b/Products/ZenHub/services/SyslogConfig.py @@ -7,69 +7,53 @@ # ############################################################################## -from __future__ import print_function - """SyslogConfig Provides configuration for syslog message to Zenoss event conversions. """ -import logging -from hashlib import md5 - -from Products.ZenCollector.services.config import CollectorConfigService - -log = logging.getLogger("zen.HubService.SyslogConfig") - +from __future__ import absolute_import, print_function -class FakeDevice(object): - id = "Syslog payload" - - -class SyslogConfig(CollectorConfigService): - - def _filterDevice(self, device): - return device.id == FakeDevice.id - - def _filterDevices(self, deviceList): - return [FakeDevice()] +import logging - def _createDeviceProxy(self, device): - proxy = CollectorConfigService._createDeviceProxy(self, device) - proxy.configCycleInterval = 3600 - proxy.name = "Syslog Configuration" - proxy.device = device.id +from hashlib import md5 - proxy.defaultPriority = self.zem.defaultPriority - proxy.syslogParsers = self.zem.syslogParsers - proxy.syslogSummaryToMessage = self.zem.syslogSummaryToMessage - proxy.syslogMsgEvtFieldFilterRules = self.zem.syslogMsgEvtFieldFilterRules +from Products.ZenHub.HubService import HubService +from Products.ZenEvents.zensyslog.config import ConfigUpdates - return proxy +log = logging.getLogger("zen.hub.services.syslogconfig") - def __checkSumRetConf(self, remoteCheckSum, confName): - currentCheckSum = md5(str(getattr(self.zem, confName))).hexdigest() - return (None, None) if currentCheckSum == remoteCheckSum else (currentCheckSum, getattr(self.zem, confName)) - def remote_getDefaultPriority(self, remoteCheckSum): - return self.__checkSumRetConf(remoteCheckSum, "defaultPriority") +class SyslogConfig(HubService): + def remote_getConfig(self, checksums): + result = ConfigUpdates() - def remote_getSyslogParsers(self, remoteCheckSum): - return self.__checkSumRetConf(remoteCheckSum, "syslogParsers") + priority = self.zem.defaultPriority + priority_checksum = _checksum(priority) + if checksums.priority != priority_checksum: + result.priority = priority + result.checksums.priority = priority_checksum - def remote_getSyslogSummaryToMessage(self, remoteCheckSum): - return self.__checkSumRetConf(remoteCheckSum, "syslogSummaryToMessage") + use_summary = self.zem.syslogSummaryToMessage + use_summary_checksum = _checksum(use_summary) + if checksums.use_summary != use_summary_checksum: + result.use_summary = use_summary + result.checksums.use_summary = use_summary_checksum - def remote_getSyslogMsgEvtFieldFilterRules(self, remoteCheckSum): - return self.__checkSumRetConf(remoteCheckSum, "syslogMsgEvtFieldFilterRules") + parsers = self.zem.syslogParsers + parsers_checksum = _checksum(parsers) + if checksums.parsers != parsers_checksum: + result.parsers = parsers + result.checksums.parsers = parsers_checksum -if __name__ == "__main__": - from Products.ZenHub.ServiceTester import ServiceTester + rules = self.zem.syslogMsgEvtFieldFilterRules + rules_checksum = _checksum(rules) + if checksums.rules != rules_checksum: + result.rules = rules + result.checksums.rules = rules_checksum - tester = ServiceTester(SyslogConfig) + return result - def printer(config): - print("Default syslog priority = ", config.defaultPriority) - tester.printDeviceProxy = printer - tester.showDeviceInfo() +def _checksum(value): + return md5(str(value)).hexdigest() # noqa: S324 diff --git a/bin/zensyslog b/bin/zensyslog index dcd1922ec5..6fcfcc6d61 100755 --- a/bin/zensyslog +++ b/bin/zensyslog @@ -12,7 +12,7 @@ . $ZENHOME/bin/zenfunctions PRGHOME=$ZENHOME/Products/ZenEvents -PRGNAME=zensyslog.py +PRGNAME=zensyslog CFGFILE=$CFGDIR/zensyslog.conf generic "$@"