diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 5ea98e26a9..6ee6f3aebd 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -7,7 +7,7 @@ "website": "https://github.com/OCA/queue", "license": "LGPL-3", "category": "Generic Modules", - "depends": ["mail", "base_sparse_field"], + "depends": ["mail", "base_sparse_field", "alfaleads_utils"], "external_dependencies": {"python": ["requests"]}, "data": [ "security/security.xml", diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index ca5a747746..5fb424fa66 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -28,6 +28,18 @@ code model.autovacuum() + + AutoVacuum Failed Jobs + + + + 1 + days + -1 + + code + model.autovacuum(vacuum_failed=True) + diff --git a/queue_job/delay.py b/queue_job/delay.py index 77c823c63c..fd1109452d 100644 --- a/queue_job/delay.py +++ b/queue_job/delay.py @@ -447,6 +447,7 @@ class Delayable: "description", "channel", "identity_key", + "retryable_exceptions", ) __slots__ = _properties + ( "recordset", @@ -466,6 +467,7 @@ def __init__( description=None, channel=None, identity_key=None, + retryable_exceptions=None, ): self._graph = DelayableGraph() self._graph.add_vertex(self) @@ -478,6 +480,7 @@ def __init__( self.description = description self.channel = channel self.identity_key = identity_key + self.retryable_exceptions = retryable_exceptions self._job_method = None self._job_args = () @@ -547,6 +550,7 @@ def _build_job(self): description=self.description, channel=self.channel, identity_key=self.identity_key, + retryable_exceptions=self.retryable_exceptions, ) return self._generated_job diff --git a/queue_job/exception.py b/queue_job/exception.py index 093344ed3d..a2b1bccc3e 100644 --- a/queue_job/exception.py +++ b/queue_job/exception.py @@ -1,5 +1,14 @@ # Copyright 2012-2016 Camptocamp # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from enum import Enum + +from odoo.exceptions import CacheMiss +from psycopg2.errors import ( + LockNotAvailable, + SerializationFailure, + UniqueViolation, +) +from requests.exceptions import ConnectionError class BaseQueueJobError(Exception): @@ -41,3 +50,17 @@ class NothingToDoJob(JobError): class ChannelNotFound(BaseQueueJobError): """A channel could not be found""" + + +class UnusedException(Exception): + """An exception class that is never raised by any code anywhere""" + + +class StringifyExceptions(Enum): + UnusedException = UnusedException + + UniqueViolation = UniqueViolation + LockNotAvailable = LockNotAvailable + SerializationFailure = SerializationFailure + CacheMiss = CacheMiss + ConnectionError = ConnectionError diff --git a/queue_job/job.py b/queue_job/job.py index f824020a27..9814079ee1 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -14,7 +14,12 @@ import odoo -from .exception import FailedJobError, NoSuchJobError, RetryableJobError +from .exception import ( + FailedJobError, + NoSuchJobError, + RetryableJobError, + StringifyExceptions, +) WAIT_DEPENDENCIES = "wait_dependencies" PENDING = "pending" @@ -281,6 +286,7 @@ def _load_from_db_record(cls, job_db_record): job_.exc_info = stored.exc_info if stored.exc_info else None job_.retry = stored.retry job_.max_retries = stored.max_retries + job_.retryable_exceptions = stored.retryable_exceptions if stored.company_id: job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key @@ -390,6 +396,7 @@ def __init__( description=None, channel=None, identity_key=None, + retryable_exceptions=None, ): """Create a Job @@ -466,6 +473,7 @@ def __init__( self.date_created = datetime.now() self._description = description + self.retryable_exceptions = retryable_exceptions if isinstance(identity_key, str): self._identity_key = identity_key @@ -507,14 +515,33 @@ def add_depends(self, jobs): if any(j.state != DONE for j in jobs): self.state = WAIT_DEPENDENCIES + def failed_retries(self): + type_, value, traceback = sys.exc_info() + # change the exception type but keep the original + # traceback and message: + # http://blog.ianbicking.org/2007/09/12/re-raising-exceptions/ + return FailedJobError( + "Max. retries (%d) reached: %s" % (self.max_retries, value or type_) + ) + def perform(self): """Execute the job. The job is executed with the user which has initiated it. """ self.retry += 1 + if self.retryable_exceptions: + expected_errors = tuple( + [StringifyExceptions[exc].value for exc in self.retryable_exceptions] + ) + else: + expected_errors = (StringifyExceptions.UnusedException.value,) try: self.result = self.func(*tuple(self.args), **self.kwargs) + except expected_errors as err: + if self.max_retries and self.retry >= self.max_retries: + raise self.failed_retries() + raise RetryableJobError(msg="Postponed, %s" % str(err)) except RetryableJobError as err: if err.ignore_retry: self.retry -= 1 @@ -522,14 +549,7 @@ def perform(self): elif not self.max_retries: # infinite retries raise elif self.retry >= self.max_retries: - type_, value, traceback = sys.exc_info() - # change the exception type but keep the original - # traceback and message: - # http://blog.ianbicking.org/2007/09/12/re-raising-exceptions/ - new_exc = FailedJobError( - "Max. retries (%d) reached: %s" % (self.max_retries, value or type_) - ) - raise new_exc from err + raise self.failed_retries() from err raise return self.result @@ -646,6 +666,7 @@ def _store_values(self, create=False): "records": self.recordset, "args": self.args, "kwargs": self.kwargs, + "retryable_exceptions": self.retryable_exceptions, } ) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index fb6e60e5be..2f239d7825 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -146,10 +146,12 @@ import threading import time from contextlib import closing, contextmanager +from urllib.parse import urlparse import psycopg2 import requests from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT +from psycopg2.errors import UndefinedTable import odoo from odoo.tools import config @@ -365,13 +367,38 @@ def __init__( self._stop = False self._stop_pipe = os.pipe() + @classmethod + def get_web_base_url(cls): + scheme, hostname = None, None + for db_name in cls.get_db_names(): + db = Database(db_name) + with closing(db.conn.cursor()) as cr: + try: + cr.execute( + "SELECT value FROM ir_config_parameter WHERE key='web.base.url' limit 1" + ) + res = cr.fetchone() + if res: + url = urlparse(res[0]) + scheme, hostname = url.scheme, url.hostname + except UndefinedTable: + _logger.warning("No ir_config_parameter table - maybe this is the first run with -i option") + except Exception: + _logger.exception("Getting web.base.url failed") + db.close() + return scheme, hostname + @classmethod def from_environ_or_config(cls): - scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get( - "scheme" + web_base_scheme, web_base_host = cls.get_web_base_url() + scheme = ( + web_base_scheme + or os.environ.get("ODOO_QUEUE_JOB_SCHEME") + or queue_job_config.get("scheme") ) host = ( - os.environ.get("ODOO_QUEUE_JOB_HOST") + web_base_host + or os.environ.get("ODOO_QUEUE_JOB_HOST") or queue_job_config.get("host") or config["http_interface"] ) @@ -395,7 +422,8 @@ def from_environ_or_config(cls): ) return runner - def get_db_names(self): + @staticmethod + def get_db_names(): if config["db_name"]: db_names = config["db_name"].split(",") else: @@ -516,6 +544,8 @@ def run(self): except InterruptedError: # Interrupted system call, i.e. KeyboardInterrupt during select self.stop() + except OSError: + self.stop() except Exception: _logger.exception( "exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY diff --git a/queue_job/models/base.py b/queue_job/models/base.py index 16f106450a..5597bdb3fc 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -72,6 +72,7 @@ def delayable( description=None, channel=None, identity_key=None, + retryable_exceptions=None, ): """Return a ``Delayable`` @@ -143,6 +144,7 @@ def delayable( description=description, channel=channel, identity_key=identity_key, + retryable_exceptions=retryable_exceptions, ) def _patch_job_auto_delay(self, method_name, context_key=None): diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 164695ce64..d798478977 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,14 +3,13 @@ import logging import random -from datetime import datetime, timedelta +from datetime import timedelta from odoo import _, api, exceptions, fields, models +from odoo.addons.base_sparse_field.models.fields import Serialized from odoo.osv import expression from odoo.tools import config, html_escape -from odoo.addons.base_sparse_field.models.fields import Serialized - from ..delay import Graph from ..exception import JobError from ..fields import JobSerialized @@ -33,7 +32,11 @@ class QueueJob(models.Model): _name = "queue.job" _description = "Queue Job" - _inherit = ["mail.thread", "mail.activity.mixin"] + _inherit = [ + "mail.thread", + "mail.activity.mixin", + "alfaleads_utils.autovacuum_mixin", + ] _log_access = False _order = "date_created DESC, date_done DESC" @@ -129,6 +132,7 @@ class QueueJob(models.Model): identity_key = fields.Char(readonly=True) worker_pid = fields.Integer(readonly=True) + retryable_exceptions = JobSerialized(readonly=True, base_type=list) def init(self): self._cr.execute( @@ -390,32 +394,40 @@ def _needaction_domain_get(self): """ return [("state", "=", "failed")] - def autovacuum(self): + def autovacuum(self, batch_size=1000, limit_batches=0, vacuum_failed=False): """Delete all jobs done based on the removal interval defined on the channel Called from a cron. """ + for channel in self.env["queue.job.channel"].search([]): - deadline = datetime.now() - timedelta(days=int(channel.removal_interval)) - while True: - jobs = self.search( - [ - "|", - ("date_done", "<=", deadline), - ("date_cancelled", "<=", deadline), - ("channel", "=", channel.complete_name), - ], - limit=1000, - ) - if jobs: - jobs.unlink() - if not config["test_enable"]: - self.env.cr.commit() # pylint: disable=E8102 - else: - break + self._vacuum( + domain=self._get_vacuum_domain(vacuum_failed, channel), + batch_size=batch_size, + limit_batches=limit_batches, + ) return True + @staticmethod + def _get_vacuum_domain(vacuum_failed, channel): + deadline = fields.Datetime.now() - timedelta(days=int(channel.removal_interval)) + if vacuum_failed: + domain = [ + ("state", "=", FAILED), + ("date_created", "<=", deadline), + ("channel", "=", channel.complete_name), + ] + else: + domain = [ + "|", + ("date_done", "<=", deadline), + ("date_cancelled", "<=", deadline), + ("channel", "=", channel.complete_name), + ] + + return domain + def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0): """Fix jobs that are in a bad states