From 632c68b55bfa98f7c404aab4404049522aaa5c18 Mon Sep 17 00:00:00 2001 From: Anze Date: Sat, 14 Mar 2020 20:43:44 +0100 Subject: [PATCH] Allow jobs time to be aligned to specified parameter (start_ts) --- grafoleancollector/collector.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/grafoleancollector/collector.py b/grafoleancollector/collector.py index 9e4d730..555209c 100644 --- a/grafoleancollector/collector.py +++ b/grafoleancollector/collector.py @@ -25,16 +25,21 @@ class MultipleIntervalsTrigger(BaseTrigger): - multiple intervals, when aligned, cause only a single job invocation - remembers which intervals have caused the invocation; the list is cleared after `forget_affecting_after` seconds + - if start_ts is specified, it allows aligning jobs' start time; start_ts should be in the past """ __slots__ = 'intervals', 'start_ts', 'affecting_intervals', 'forget_affecting_after' - def __init__(self, intervals, forget_affecting_after=300): + def __init__(self, intervals, forget_affecting_after=300, start_ts=None): if not intervals: raise Exception("At least one interval must be specified") # we only operate in whole seconds, and only care about unique values: self.intervals = list(set([int(i) for i in intervals])) self.forget_affecting_after = forget_affecting_after - self.start_ts = int(time.time()) + now = int(time.time()) + self.start_ts = now if start_ts is None else int(start_ts) + if self.start_ts > now: + logging.warning("Job aligning with start_ts failed! Parameter start_ts must be in the past, never in the future.") + self.start_ts = now self.affecting_intervals = {} def get_next_fire_time(self, previous_fire_time, now): @@ -308,14 +313,22 @@ def fetch_job_configs(self, protocol): def refresh_jobs(self): wanted_jobs = set() - for job_id, intervals, job_func, job_data in self.jobs(): + for next_job in self.jobs(): + # We didn't anticipate that we might need another parameter (start_ts)... since this is a library, we can't simply + # change it now until all bots change this too. But we need another parameter and it would be ugly to put it elsewhere, + # so we detect different number of returned parameters and act accordingly: + if len(next_job) == 4: + job_id, intervals, job_func, job_data = next_job + start_ts = None + else: + job_id, intervals, job_func, job_data, start_ts = next_job wanted_jobs.add(job_id) # if the existing job's configuration is the same, leave it alone, otherwise the trigger will be reset: if self.known_jobs.get(job_id) == job_data: continue self.known_jobs[job_id] = job_data - trigger = MultipleIntervalsTrigger(intervals) + trigger = MultipleIntervalsTrigger(intervals, start_ts=start_ts) logging.info(f"Adding job: {job_id}") self.scheduler.add_job(job_func, id=job_id, trigger=trigger, executor='iaexecutor', kwargs=job_data, replace_existing=True)