forked from henriklied/dreque
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tests.py
133 lines (112 loc) · 4.95 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import tempfile
import time
import unittest
from dreque import Dreque, DrequeWorker
def test_func(tempfile, text, delay=0, fail=False):
if delay:
time.sleep(delay)
if fail:
with open(tempfile, "a") as fp:
fp.write(text)
raise Exception("")
with open(tempfile, "wb") as fp:
fp.write(text)
class TestDreque(unittest.TestCase):
db = None
def setUp(self):
import logging
logging.basicConfig(level=logging.DEBUG)
self.tempfile = tempfile.mkstemp()[1]
self.dreque = Dreque("127.0.0.1", db=self.db)
self.dreque.redis.flushdb()
self.queue = "test"
# self.dreque.remove_queue(self.queue)
def tearDown(self):
os.unlink(self.tempfile)
def _get_output(self):
with open(self.tempfile, "rb") as fp:
return fp.read()
def testSimple(self):
self.dreque.push("test", "foo")
self.failUnlessEqual(self.dreque.size("test"), 1)
self.failUnlessEqual(self.dreque.pop("test"), "foo")
self.failUnlessEqual(self.dreque.pop("test"), None)
self.failUnlessEqual(self.dreque.size("test"), 0)
def testFunction(self):
import tests
self.dreque.enqueue("test", tests.test_func, "positional")
self.failUnlessEqual(self.dreque.dequeue(["test"]), dict(queue="test", func="tests.test_func", args=["positional"], kwargs={}, retries_left=5))
self.dreque.enqueue("test", tests.test_func, keyword="argument")
self.failUnlessEqual(self.dreque.dequeue(["test"]), dict(queue="test", func="tests.test_func", args=[], kwargs={'keyword':"argument"}, retries_left=5))
def testPositionalWorker(self):
import tests
self.dreque.enqueue("test", tests.test_func, self.tempfile, "worker_test")
worker = DrequeWorker(["test"], "127.0.0.1", db=self.db)
worker.work(0)
self.failUnlessEqual(self._get_output(), "worker_test")
self.failUnlessEqual(self.dreque.stats.get("processed"), 1)
def testKeywordWorker(self):
import tests
self.dreque.enqueue("test", tests.test_func, tempfile=self.tempfile, text="worker_test")
worker = DrequeWorker(["test"], "127.0.0.1", db=self.db)
worker.work(0)
self.failUnlessEqual(self._get_output(), "worker_test")
def testDelayedJob(self):
import tests
self.dreque.enqueue("test", tests.test_func, val="worker_test", _delay=1)
self.failUnlessEqual(self.dreque.dequeue("test"), None)
time.sleep(1.5)
self.failUnlessEqual(self.dreque.dequeue(["test"]), dict(queue="test", func="tests.test_func", args=[], kwargs={'val':"worker_test"}, retries_left=5))
def testGracefulShutdown(self):
import tests
import signal
from multiprocessing import Process
def worker():
w = DrequeWorker(["test"], "127.0.0.1", db=self.db)
w.work(0)
self.dreque.enqueue("test", tests.test_func, tempfile=self.tempfile, text="graceful", delay=2)
worker_child = Process(target=worker, args=())
worker_child.start()
time.sleep(0.5) # Make sure the worker has spawned a child already
# worker_child.terminate()
os.kill(worker_child.pid, signal.SIGQUIT)
worker_child.join()
self.failUnlessEqual(worker_child.exitcode, 0)
self.failUnlessEqual(self._get_output(), "graceful")
def testForcedShutdown(self):
import tests
import signal
from multiprocessing import Process
def worker():
import logging
logging.getLogger("dreque.worker").setLevel(logging.CRITICAL)
w = DrequeWorker(["test"], "127.0.0.1", db=self.db)
w.work(0)
self.dreque.enqueue("test", tests.test_func, tempfile=self.tempfile, text="graceful", delay=2)
worker_child = Process(target=worker, args=())
worker_child.start()
time.sleep(0.5) # Make sure the worker has spawned a child already
worker_child.terminate()
worker_child.join()
self.failUnlessEqual(worker_child.exitcode, 0)
self.failUnlessEqual(self._get_output(), "")
def testRetries(self):
import tests
import signal
from multiprocessing import Process
def worker():
import logging
logging.getLogger("dreque.worker").setLevel(logging.CRITICAL)
w = DrequeWorker(["test"], "127.0.0.1", db=self.db)
w.work(0.1)
self.dreque.enqueue("test", tests.test_func, tempfile=self.tempfile, text=".", fail=True)
worker_child = Process(target=worker, args=())
worker_child.start()
time.sleep(3) # Give enough time for worker to grab and execute the job
os.kill(worker_child.pid, signal.SIGQUIT)
worker_child.join()
self.failUnlessEqual(worker_child.exitcode, 0)
self.failUnlessEqual(self._get_output(), "..")
if __name__ == '__main__':
unittest.main()