-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a bounded Async writer that prevents oom errors #174
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ | |
from shutil import rmtree | ||
from six.moves.queue import Queue | ||
from tempfile import mkstemp | ||
from threading import Thread | ||
from threading import Thread, Lock | ||
import logging as lg | ||
import os | ||
import os.path as osp | ||
|
@@ -31,6 +31,18 @@ def __init__(self, message, *args, **kwargs): | |
self.exception = kwargs.get("exception") | ||
|
||
|
||
def wrapped_consumer(asyncWriter, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make this private and use Python naming conventions (recommend shortening |
||
"""Wrapped consumer that lets us get a child's exception.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be updated now that the function is top-level. |
||
try: | ||
_logger.debug('Starting consumer.') | ||
asyncWriter._consumer(data) | ||
except Exception as err: # pylint: disable=broad-except | ||
_logger.exception('Exception in child.') | ||
asyncWriter._err = err | ||
finally: | ||
_logger.debug('Finished consumer.') | ||
|
||
|
||
class AsyncWriter(object): | ||
|
||
"""Asynchronous publisher-consumer. | ||
|
@@ -69,17 +81,6 @@ def __enter__(self): | |
self._queue = Queue() | ||
self._err = None | ||
|
||
def consumer(data): | ||
"""Wrapped consumer that lets us get a child's exception.""" | ||
try: | ||
_logger.debug('Starting consumer.') | ||
self._consumer(data) | ||
except Exception as err: # pylint: disable=broad-except | ||
_logger.exception('Exception in child.') | ||
self._err = err | ||
finally: | ||
_logger.debug('Finished consumer.') | ||
|
||
def reader(queue): | ||
"""Generator read by the consumer.""" | ||
while True: | ||
|
@@ -88,7 +89,7 @@ def reader(queue): | |
break | ||
yield chunk | ||
|
||
self._reader = Thread(target=consumer, args=(reader(self._queue), )) | ||
self._reader = Thread(target=wrapped_consumer, args=(self, reader(self._queue), )) | ||
self._reader.start() | ||
_logger.debug('Started child thread.') | ||
return self | ||
|
@@ -136,6 +137,105 @@ def write(self, chunk): | |
self._queue.put(chunk) | ||
|
||
|
||
class BoundedAsyncWriter(AsyncWriter): | ||
|
||
"""A Bounded asynchronous publisher-consumer. | ||
|
||
:param consumer: Function which takes a single generator as argument. | ||
:param buffer_size: Number of entities that are buffered. When this number is exeeded, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please stick to 80 chars per line throughout. |
||
write will block untill some of the entities are consumed | ||
|
||
This class extends AsyncWriter with a fixed buffer size. If the buffer size is exeeded, | ||
writes will be blocked untill some of the buffer is consumed: | ||
|
||
""" | ||
|
||
# Expected by pandas to write csv files (https://github.com/mtth/hdfs/pull/130). | ||
__iter__ = None | ||
|
||
def __init__(self, consumer, buffer_size=1024): | ||
super().__init__(consumer) | ||
self._content_length = 0 | ||
self._content_max = buffer_size | ||
self._content_lock = Lock() | ||
|
||
@property | ||
def is_full(self): | ||
return self._content_lock.locked() | ||
|
||
def __enter__(self): | ||
|
||
if self._queue: | ||
raise ValueError('Cannot nest contexts.') | ||
|
||
self._queue = Queue() | ||
self._err = None | ||
|
||
self._content_length = 0 | ||
|
||
def reader(queue): | ||
"""Generator read by the consumer.""" | ||
while True: | ||
chunk = queue.get() | ||
if chunk is None: | ||
break | ||
|
||
self._content_length -= len(chunk) | ||
if self._content_lock.locked() and self._content_length < self._content_max: | ||
_logger.debug("releasing lock from reader") | ||
_logger.debug(f"Current buffer size: {self._content_length}") | ||
try: | ||
self._content_lock.release() | ||
except RuntimeError: | ||
pass | ||
Comment on lines
+189
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a big code smell. Is there a way to avoid this? Locks are best used via |
||
|
||
yield chunk | ||
|
||
self._reader = Thread(target=wrapped_consumer, args=(self, reader(self._queue), )) | ||
self._reader.start() | ||
_logger.debug('Started child thread.') | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_value, traceback): | ||
if exc_value: | ||
_logger.debug('Exception in parent.') | ||
if self._reader and self._reader.is_alive(): | ||
_logger.debug('Signaling child.') | ||
self._queue.put(None) | ||
self._reader.join() | ||
if self._err: | ||
raise self._err # pylint: disable=raising-bad-type | ||
else: | ||
_logger.debug('Child terminated without errors.') | ||
self._queue = None | ||
|
||
def write(self, chunk): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much of the logic in this class is identical to the original writer, is there a way to consolidate? |
||
"""Stream data to the underlying consumer. | ||
|
||
:param chunk: Bytes to write. These will be buffered in memory until the | ||
consumer reads them. | ||
|
||
""" | ||
self._content_lock.acquire() | ||
|
||
_logger.debug(f"produce called with {chunk}") | ||
|
||
if chunk: | ||
# We skip empty chunks, otherwise they cause request to terminate the | ||
# response stream. Note that these chunks can be produced by valid | ||
# upstream encoders (e.g. bzip2). | ||
self._content_length += len(chunk) | ||
_logger.debug(f"Current buffer size: {self._content_length}") | ||
|
||
self._queue.put(chunk) | ||
|
||
if self._content_length < self._content_max and self._content_lock.locked(): | ||
_logger.debug("releasing lock from write") | ||
try: | ||
self._content_lock.release() | ||
except RuntimeError: | ||
pass | ||
|
||
@contextmanager | ||
def temppath(dpath=None): | ||
"""Create a temporary path. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffersize
is an HDFS argument, better not to overload its use here. Let's introduce a separate argument instead.