diff --git a/.travis.yml b/.travis.yml index 7ef95d7a..77337e2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,12 @@ cache: - pip services: - rabbitmq -install: pip install -Ur requirements.txt -script: coverage run --source=hirefire -m pytest +- redis-server +env: + - TOXENV = redis + - TOXENV = rabbitmq +install: pip install -U tox codecov +script: tox -e "$TOXENV" branches: only: - master diff --git a/hirefire/procs/celery.py b/hirefire/procs/celery.py index d43336c5..13b56c8f 100644 --- a/hirefire/procs/celery.py +++ b/hirefire/procs/celery.py @@ -5,6 +5,15 @@ from celery.app import app_or_default +try: + from librabbitmq import ChannelError +except ImportError: + try: + from amqp.exceptions import ChannelError + except ImportError: + # No RabbitMQ API wrapper installed, different celery broker used + ChannelError = Exception + from ..utils import KeyDefaultDict from . import Proc @@ -233,35 +242,35 @@ def __init__(self, app=None, *args, **kwargs): if app is not None: self.app = app self.app = app_or_default(self.app) - self.connection = self.app.connection() - self.channel = self.connection.channel() + + @staticmethod + def _get_redis_task_count(channel, queue): + return channel.client.llen(queue) + + @staticmethod + def _get_rabbitmq_task_count(channel, queue): + try: + return channel.queue_declare(queue=queue, passive=True).message_count + except ChannelError: + logger.warning("The requested queue %s has not been created yet", queue) + return 0 def quantity(self, cache=None, **kwargs): """ Returns the aggregated number of tasks of the proc queues. """ - if hasattr(self.channel, '_size'): - # Redis - return sum(self.channel._size(queue) for queue in self.queues) - # AMQP - try: - from librabbitmq import ChannelError - except ImportError: - from amqp.exceptions import ChannelError - count = 0 - for queue in self.queues: - try: - queue = self.channel.queue_declare(queue, passive=True) - except ChannelError: - # The requested queue has not been created yet - pass - else: - count += queue.message_count + with self.app.connection_or_acquire() as connection: + channel = connection.channel() - if cache is not None and self.inspect_statuses: - count += self.inspect_count(cache) - - return count + # Redis + if hasattr(channel, '_size'): + return sum(self._get_redis_task_count(channel, queue) for queue in self.queues) + + # RabbitMQ + count = sum(self._get_rabbitmq_task_count(channel, queue) for queue in self.queues) + if cache is not None and self.inspect_statuses: + count += self.inspect_count(cache) + return count def inspect_count(self, cache): """Use Celery's inspect() methods to see tasks on workers.""" diff --git a/tests/contrib/django/testapp/__init__.py b/tests/contrib/django/testapp/__init__.py index e69de29b..a78d7811 100644 --- a/tests/contrib/django/testapp/__init__.py +++ b/tests/contrib/django/testapp/__init__.py @@ -0,0 +1,17 @@ +from __future__ import absolute_import, unicode_literals +import os +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') + +app = Celery('proj') + +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +app.config_from_object('django.conf:settings', namespace='CELERY') + +# Load task modules from all registered Django app configs. +app.autodiscover_tasks() diff --git a/tests/contrib/django/testapp/procs.py b/tests/contrib/django/testapp/procs.py index 4239e850..934cc307 100644 --- a/tests/contrib/django/testapp/procs.py +++ b/tests/contrib/django/testapp/procs.py @@ -4,4 +4,4 @@ class WorkerProc(CeleryProc): name = 'worker' queues = ['celery'] - inspect_statuses = [] + inspect_statuses = ['active', 'reserved', 'scheduled'] diff --git a/tests/contrib/django/testapp/settings.py b/tests/contrib/django/testapp/settings.py index 35987cd3..c1eee272 100644 --- a/tests/contrib/django/testapp/settings.py +++ b/tests/contrib/django/testapp/settings.py @@ -125,3 +125,8 @@ # https://docs.djangoproject.com/en/2.1/howto/static-files/ STATIC_URL = '/static/' + + +# Celery + +CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'amqp://') diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..a32cf0f5 --- /dev/null +++ b/tox.ini @@ -0,0 +1,14 @@ +[[tox] +envlist = redis,rabbitmq + +[testenv] +deps = -rrequirements.txt +commands = coverage run --append --source=hirefire -m pytest + +[testenv:redis] +setenv = + CELERY_BROKER_URL = amqp:// + +[testenv:rabbitmq] +setenv = + CELERY_BROKER_URL = redis://