This repository has been archived by the owner on Jun 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
use high level interface kumbu to connect to multipul queues
- Loading branch information
Showing
10 changed files
with
207 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
#!/usr/bin/env python | ||
# -*- encoding: utf-8 -*- | ||
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: | ||
# Author: Binux<[email protected]> | ||
# http://binux.me | ||
# Created on 2015-05-22 20:54:01 | ||
|
||
import time | ||
import umsgpack | ||
from kombu import Connection, enable_insecure_serializers | ||
from kombu.serialization import register | ||
from kombu.exceptions import ChannelError | ||
from six.moves import queue as BaseQueue | ||
|
||
|
||
register('umsgpack', umsgpack.packb, umsgpack.unpackb, 'application/x-msgpack') | ||
enable_insecure_serializers(['umsgpack']) | ||
|
||
|
||
class KombuQueue(object): | ||
""" | ||
kombu is a high-level interface for multiple message queue backends. | ||
KombuQueue is built on top of kombu API. | ||
""" | ||
|
||
Empty = BaseQueue.Empty | ||
Full = BaseQueue.Full | ||
max_timeout = 0.3 | ||
|
||
def __init__(self, name, url="amqp://", maxsize=0, lazy_limit=True): | ||
""" | ||
Constructor for KombuQueue | ||
url: http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls | ||
maxsize: an integer that sets the upperbound limit on the number of | ||
items that can be placed in the queue. | ||
""" | ||
self.name = name | ||
self.conn = Connection(url) | ||
self.queue = self.conn.SimpleQueue(self.name, no_ack=True, serializer='umsgpack') | ||
|
||
self.maxsize = maxsize | ||
self.lazy_limit = lazy_limit | ||
if self.lazy_limit and self.maxsize: | ||
self.qsize_diff_limit = int(self.maxsize * 0.1) | ||
else: | ||
self.qsize_diff_limit = 0 | ||
self.qsize_diff = 0 | ||
|
||
def qsize(self): | ||
try: | ||
return self.queue.qsize() | ||
except ChannelError: | ||
return 0 | ||
|
||
def empty(self): | ||
if self.qsize() == 0: | ||
return True | ||
else: | ||
return False | ||
|
||
def full(self): | ||
if self.maxsize and self.qsize() >= self.maxsize: | ||
return True | ||
else: | ||
return False | ||
|
||
def put(self, obj, block=True, timeout=None): | ||
if not block: | ||
return self.put_nowait() | ||
|
||
start_time = time.time() | ||
while True: | ||
try: | ||
return self.put_nowait(obj) | ||
except BaseQueue.Full: | ||
if timeout: | ||
lasted = time.time() - start_time | ||
if timeout > lasted: | ||
time.sleep(min(self.max_timeout, timeout - lasted)) | ||
else: | ||
raise | ||
else: | ||
time.sleep(self.max_timeout) | ||
|
||
def put_nowait(self, obj): | ||
if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit: | ||
pass | ||
elif self.full(): | ||
raise BaseQueue.Full | ||
else: | ||
self.qsize_diff = 0 | ||
return self.queue.put(obj) | ||
|
||
def get(self, block=True, timeout=None): | ||
try: | ||
ret = self.queue.get(block, timeout) | ||
return ret.payload | ||
except self.queue.Empty: | ||
raise BaseQueue.Empty | ||
|
||
def get_nowait(self): | ||
try: | ||
ret = self.queue.get_nowait() | ||
return ret.payload | ||
except self.queue.Empty: | ||
raise BaseQueue.Empty | ||
|
||
def delete(self): | ||
self.queue.queue.delete() | ||
|
||
def __del__(self): | ||
self.queue.close() | ||
|
||
|
||
Queue = KombuQueue |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,3 +18,4 @@ SQLAlchemy>=0.9.7 | |
six | ||
amqp>=1.3.0 | ||
redis | ||
kombu |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,7 @@ | |
'unittest2>=0.5.1', | ||
'SQLAlchemy>=0.9.7', | ||
'redis', | ||
'kombu', | ||
], | ||
}, | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters