From c914bfcf32c2470ccc3093e23ff8957c120d2fb4 Mon Sep 17 00:00:00 2001 From: Jason Peacock Date: Fri, 8 Nov 2024 11:59:55 -0600 Subject: [PATCH] Add indices to zenjobs' jobs data in Redis. Add an index for the status and user IDs fields of job records. ZEN-34975 --- Products/Jobber/__init__.py | 2 +- Products/Jobber/model.py | 5 +- Products/Jobber/storage.py | 263 +++++++++++++++++++++----- Products/Jobber/tests/test_storage.py | 3 +- Products/Zuul/facades/jobsfacade.py | 12 +- Products/Zuul/routers/jobs.py | 5 +- 6 files changed, 232 insertions(+), 58 deletions(-) diff --git a/Products/Jobber/__init__.py b/Products/Jobber/__init__.py index e058f66a5d..4800902881 100644 --- a/Products/Jobber/__init__.py +++ b/Products/Jobber/__init__.py @@ -23,7 +23,7 @@ def _patchstate(): ) for attr in groupings: setattr(states, attr, frozenset({ABORTED} | getattr(states, attr))) - setattr(states, "ABORTED", ABORTED) + states.ABORTED = ABORTED # Update the PRECENDENCE stuff to account for ABORTED offset = states.PRECEDENCE.index(None) diff --git a/Products/Jobber/model.py b/Products/Jobber/model.py index b04caebda7..1d70faee7e 100644 --- a/Products/Jobber/model.py +++ b/Products/Jobber/model.py @@ -74,7 +74,7 @@ def __dir__(self): return sorted( set( tuple(dir(JobRecord)) - + tuple((getattr(self, "details") or {}).keys()) + + tuple((getattr(self, "details", None) or {}).keys()) ) ) @@ -89,7 +89,7 @@ def __dict__(self): for k in self.__slots__ + ("uuid", "duration", "complete") if k != "details" } - details = getattr(self, "details") or {} + details = getattr(self, "details", None) or {} base.update(**details) return base @@ -375,7 +375,6 @@ def save_jobrecord(log, body=None, headers=None, properties=None, **ignored): def _save_record(log, storage, record): - # Retrieve the job storage connection. jobid = record["jobid"] if "userid" not in record: log.warn("No user ID submitted with job %s", jobid) diff --git a/Products/Jobber/storage.py b/Products/Jobber/storage.py index 9607c96ed0..e073d7bf3b 100644 --- a/Products/Jobber/storage.py +++ b/Products/Jobber/storage.py @@ -12,6 +12,7 @@ import json import logging import re +import types from collections import Container, Iterable, Sized @@ -23,9 +24,15 @@ from .config import ZenCeleryConfig -_keybase = "zenjobs:job:" -_keypattern = _keybase + "*" -_keytemplate = "{}{{}}".format(_keybase) +_appkey = "zenjobs" + +_jobkey_template = "{}:job:{{}}".format(_appkey) +_statuskey_template = "{}:status:{{}}".format(_appkey) +_useridkey_template = "{}:userid:{{}}".format(_appkey) + +_alljobkeys = _jobkey_template.format("*") +_allstatuskeys = _statuskey_template.format("*") +_alluseridkeys = _useridkey_template.format("*") log = logging.getLogger("zen.zenjobs") @@ -38,7 +45,6 @@ def makeJobStore(): class _Converter(object): - __slots__ = ("dumps", "loads") def __init__(self, dumps, loads): @@ -170,35 +176,120 @@ def search(self, **fields): :rtype: Iterable[str] :raises TypError: if an unsupported value type is given for a field """ - field_names = fields.keys() - _verifyfields(field_names) + + # 'created', 'started', 'finished' fields are indexed by sorted-set + # keys of the same names. + # 'name', 'status', and 'userid' fields are indexed by set keys where + # each name, status, and userid value is its own key. + + _verifyfields(fields.keys()) + + statuses = fields.pop("status", ()) + if statuses: + if isinstance(statuses, six.string_types): + statuses = (statuses,) + elif isinstance(statuses, types.GeneratorType): + statuses = tuple(statuses) + + userids = fields.pop("userid", ()) + if userids: + if isinstance(userids, six.string_types): + userids = (userids,) + elif isinstance(userids, types.GeneratorType): + userids = tuple(userids) + + if not fields: + result = self._index_lookup(statuses, userids) + if result is not None: + return result + return self._index_and_record_lookup(statuses, userids, fields) + + def _index_lookup(self, statuses, userids): + if len(statuses) > 1 or len(userids) > 1: + return + status = statuses[0] if statuses else None + userid = userids[0] if userids else None + if status and not userid: + return ( + jobid + for jobid in self.__client.sscan_iter( + _statuskey(status), count=self.__scan_count + ) + ) + if userid and not status: + return ( + jobid + for jobid in self.__client.sscan_iter( + _useridkey(userid), count=self.__scan_count + ) + ) + return ( + jobid + for jobid in self.__client.sinter( + _statuskey(status), _useridkey(userid) + ) + ) + + def _index_and_record_lookup(self, statuses, userids, fields): + result = self._index_lookup(statuses, userids) + jobids = set() if result is None else set(result) + + if not jobids: + jobids.update(self._get_jobids_by_index(_statuskey, statuses)) + jobids.update(self._get_jobids_by_index(_useridkey, userids)) + + matchers = self._get_matchers(fields) + + if jobids: + if not fields: + return jobids + return ( + jobid + for jobid in jobids + if self._match_fields(matchers, _jobkey(jobid), fields.keys()) + ) + return ( + self.__client.hget(key, "jobid") + for key in self.__client.scan_iter( + match=_alljobkeys, count=self.__scan_count + ) + if self._match_fields(matchers, key, fields.keys()) + ) + + def _get_matchers(self, fields): matchers = {} - for name, match in fields.items(): - # Note: check for string first because strings are also - # iterable. - if isinstance(match, six.string_types): - matchers[name] = match - elif isinstance(match, Iterable): - matchers[name] = _Any(*match) - elif isinstance(match, re._pattern_type): - matchers[name] = _RegEx(match) - elif isinstance(match, float): - matchers[name] = "{:f}".format(match) + for name, value in fields.items(): + # Note: check for string first because strings are iterable. + if isinstance(value, six.string_types): + matchers[name] = value + elif isinstance(value, Iterable): + matchers[name] = _Any(*value) + elif isinstance(value, re._pattern_type): + matchers[name] = _RegEx(value) + elif isinstance(value, float): + matchers[name] = "{:f}".format(value) else: raise TypeError( "Type '%s' not supported for field '%s'" - % (type(match), name), + % (type(value), name), ) + return matchers - def get_fields(key): - return self.__client.hmget(key, *field_names) + def _match_fields(self, matchers, jobkey, fieldnames): + if not fieldnames: + return () + record = dict( + zip(fieldnames, self.__client.hmget(jobkey, *fieldnames)) + ) + return matchers == record + def _get_jobids_by_index(self, keyfactory, indexids): + if not indexids: + return () return ( - self.__client.hget(key, "jobid") - for key in self.__client.scan_iter( - match=_keypattern, count=self.__scan_count - ) - if matchers == dict(zip(field_names, get_fields(key))) + jobid + for key in (keyfactory(indexid) for indexid in indexids) + for jobid in self.__client.sscan_iter(key, count=self.__scan_count) ) def getfield(self, jobid, name, default=None): @@ -217,7 +308,7 @@ def getfield(self, jobid, name, default=None): """ if name not in Fields: raise AttributeError("Job record has no attribute '%s'" % name) - key = _key(jobid) + key = _jobkey(jobid) if not self.__client.exists(key): return default raw = self.__client.hget(key, name) @@ -246,18 +337,46 @@ def update(self, jobid, **fields): ", ".join("'%s'" % name for name in badfields), ), ) - key = _key(jobid) - if not self.__client.exists(key): + jobkey = _jobkey(jobid) + if not self.__client.exists(jobkey): raise KeyError("Job not found: %s" % jobid) + oldstatus, olduserid = self.__client.hmget( + jobkey, ("status", "userid") + ) deleted_fields = [k for k, v in fields.items() if v is None] if deleted_fields: - self.__client.hdel(key, *deleted_fields) + self.__client.hdel(jobkey, *deleted_fields) fields = { k: Fields[k].dumps(v) for k, v in fields.items() if v is not None } if fields: - self.__client.hmset(key, fields) - self.__expire_key_if_status_is_ready(key) + self.__client.hmset(jobkey, fields) + newstatus, newuserid = self.__client.hmget( + jobkey, ("status", "userid") + ) + self._update_status_index(jobid, oldstatus, newstatus) + self._update_userid_index(jobid, olduserid, newuserid) + self.__expire_key_if_status_is_ready(jobkey) + + def _update_status_index(self, jobid, oldstatus, newstatus): + if newstatus == oldstatus: + return + if oldstatus: + oldstatuskey = _statuskey(oldstatus) + self.__client.srem(oldstatuskey, jobid) + if newstatus: + newstatuskey = _statuskey(newstatus) + self.__client.srem(newstatuskey, jobid) + + def _update_userid_index(self, jobid, olduserid, newuserid): + if newuserid == olduserid: + return + if olduserid: + olduseridkey = _useridkey(olduserid) + self.__client.srem(olduseridkey, jobid) + if newuserid: + newuseridkey = _useridkey(newuserid) + self.__client.srem(newuseridkey, jobid) def keys(self): """Return all existing job IDs. @@ -267,7 +386,7 @@ def keys(self): return ( self.__client.hget(key, "jobid") for key in self.__client.scan_iter( - match=_keypattern, count=self.__scan_count + match=_alljobkeys, count=self.__scan_count ) ) @@ -305,7 +424,7 @@ def mget(self, *jobids): :param jobids: Iterable[str] :rtype: Iterator[Dict[str, Union[str, float]]] """ - keys = (_key(jobid) for jobid in jobids) + keys = (_jobkey(jobid) for jobid in jobids) raw = ( self.__client.hgetall(key) for key in keys @@ -325,7 +444,7 @@ def get(self, jobid, default=None): :type default: Any :rtype: Union[Dict[str, Union[str, float]], default] """ - key = _key(jobid) + key = _jobkey(jobid) if not self.__client.exists(key): return default item = self.__client.hgetall(key) @@ -340,7 +459,7 @@ def __getitem__(self, jobid): :rtype: Dict[str, Union[str, float]] :raises: KeyError """ - key = _key(jobid) + key = _jobkey(jobid) if not self.__client.exists(key): raise KeyError("Job not found: %s" % jobid) item = self.__client.hgetall(key) @@ -359,12 +478,25 @@ def __setitem__(self, jobid, data): data = { k: Fields[k].dumps(v) for k, v in data.items() if v is not None } - key = _key(jobid) + key = _jobkey(jobid) olddata = self.__client.hgetall(key) deleted_fields = set(olddata) - set(data) if deleted_fields: self.__client.hdel(key, *deleted_fields) self.__client.hmset(key, data) + + # Update userid index + userid = data.get("userid") + if userid: + userid_key = _useridkey(userid) + self.__client.sadd(userid_key, jobid) + + # Update status index + status = data.get("status") + if status: + status_key = _statuskey(status) + self.__client.sadd(status_key, jobid) + self.__expire_key_if_status_is_ready(key) def mdelete(self, *jobids): @@ -375,8 +507,16 @@ def mdelete(self, *jobids): """ if not jobids: return - jobids = (_key(jobid) for jobid in jobids) - self.__client.delete(*jobids) + jobkeys = (_jobkey(jobid) for jobid in jobids) + self.__client.delete(*jobkeys) + for statuskey in self.__client.scan_iter( + _allstatuskeys, count=self.__scan_count + ): + self.__client.srem(statuskey, *jobids) + for useridkey in self.__client.scan_iter( + _alluseridkeys, count=self.__scan_count + ): + self.__client.srem(useridkey, *jobids) def __delitem__(self, jobid): """Delete the job data associated with the given job ID. @@ -385,10 +525,29 @@ def __delitem__(self, jobid): :type jobid: str """ - key = _key(jobid) - if not self.__client.exists(key): + jobkey = _jobkey(jobid) + if not self.__client.exists(jobkey): + for statuskey in self.__client.scan_iter( + _allstatuskeys, count=self.__scan_count + ): + self.__client.srem(statuskey, jobid) + for useridkey in self.__client.scan_iter( + _alluseridkeys, count=self.__scan_count + ): + self.__client.srem(useridkey, jobid) raise KeyError("Job not found: %s" % jobid) - self.__client.delete(key) + + status = self.__client.hget(jobkey, "status") + if status: + statuskey = _statuskey(status) + self.__client.srem(statuskey, jobid) + + userid = self.__client.hget(jobkey, "userid") + if userid: + useridkey = _useridkey(userid) + self.__client.srem(useridkey, jobid) + + self.__client.delete(jobkey) def __contains__(self, jobid): """Return True if job data exists for the given job ID. @@ -396,13 +555,13 @@ def __contains__(self, jobid): :type jobid: str :rtype: boolean """ - return self.__client.exists(_key(jobid)) + return self.__client.exists(_jobkey(jobid)) def __len__(self): return sum( 1 for _ in self.__client.scan_iter( - match=_keypattern, count=self.__scan_count + match=_alljobkeys, count=self.__scan_count ) ) @@ -414,7 +573,7 @@ def __iter__(self): return self.keys() def ttl(self, jobid): - result = self.__client.ttl(_key(jobid)) + result = self.__client.ttl(_jobkey(jobid)) return result if result >= 0 else None def __expire_key_if_status_is_ready(self, key): @@ -423,9 +582,19 @@ def __expire_key_if_status_is_ready(self, key): self.__client.expire(key, self.__expires) -def _key(jobid): +def _jobkey(jobid): """Return the redis key for the given job ID.""" - return _keytemplate.format(jobid) + return _jobkey_template.format(jobid) + + +def _statuskey(status): + """Return the redis key for the given status.""" + return _statuskey_template.format(status) + + +def _useridkey(userid): + """Return the redis key for the given user ID.""" + return _useridkey_template.format(userid) def _iteritems(client, count): @@ -433,7 +602,7 @@ def _iteritems(client, count): Only (key, data) pairs where data is not None are returned. """ - keys = client.scan_iter(match=_keypattern, count=count) + keys = client.scan_iter(match=_alljobkeys, count=count) raw = ((key, client.hgetall(key)) for key in keys) return ((key, data) for key, data in raw if data) diff --git a/Products/Jobber/tests/test_storage.py b/Products/Jobber/tests/test_storage.py index d1aef54c66..584ac08db9 100644 --- a/Products/Jobber/tests/test_storage.py +++ b/Products/Jobber/tests/test_storage.py @@ -435,7 +435,8 @@ class PopulatedJobStoreTest(TestCase): def setUp(t): t.store = JobStore(t.layer.redis) for jobid, data in t.records.items(): - t.layer.redis.hmset("zenjobs:job:%s" % jobid, data) + t.store[jobid] = data + # t.layer.redis.hmset("zenjobs:job:%s" % jobid, data) def tearDown(t): del t.store diff --git a/Products/Zuul/facades/jobsfacade.py b/Products/Zuul/facades/jobsfacade.py index 8a9176003d..ec5d714a89 100644 --- a/Products/Zuul/facades/jobsfacade.py +++ b/Products/Zuul/facades/jobsfacade.py @@ -8,6 +8,9 @@ ############################################################################## import logging + +import six + from AccessControl import getSecurityManager from Products.Zuul.facades import ZuulFacade @@ -96,12 +99,15 @@ def getJobLog(self, jobid): None, ) - def getUserJobs(self): + def getUserJobs(self, statuses=None): """Returns the jobs associated with the current user. :rtype: Tuple[JobRecord] """ user = getSecurityManager().getUser() - if not isinstance(user, basestring): + if not isinstance(user, six.string_types): user = user.getId() - return self._dmd.JobManager.query(criteria={"userid": user})["jobs"] + criteria = {"userid": user} + if statuses: + criteria["status"] = statuses + return self._dmd.JobManager.query(criteria=criteria)["jobs"] diff --git a/Products/Zuul/routers/jobs.py b/Products/Zuul/routers/jobs.py index 4de8983534..4966bf2cb3 100644 --- a/Products/Zuul/routers/jobs.py +++ b/Products/Zuul/routers/jobs.py @@ -117,9 +117,8 @@ def userjobs(self): "PENDING": "created", "RETRY": "started", } - for job in self.api.getUserJobs(): - if job.status in validstates: - results[job.status].append(job) + for job in self.api.getUserJobs(statuses=validstates.keys()): + results[job.status].append(job) # Sort and slice appropriately -- most recent 10 items for status, jobs in results.iteritems(): try: