Skip to content

Commit

Permalink
Make pynbody server work asynchronously so it doesn't block the serve…
Browse files Browse the repository at this point in the history
…r from processing other quick messages
  • Loading branch information
apontzen committed Jan 12, 2024
1 parent 58fb151 commit e33a838
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 17 deletions.
6 changes: 3 additions & 3 deletions tangos/parallel_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ class MessageExit(message.Message):


def _server_thread():
# Sit idle until request for a job comes in, then assign first
# available job and move on. Jobs are labelled through the
# provided iterator

from .async_message import init_async_processing_thread
init_async_processing_thread() # uses on_exit_parallelism to ensure thread is cleared up

alive = [True for i in range(backend.size())]

Expand Down
34 changes: 34 additions & 0 deletions tangos/parallel_tasks/async_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import queue
from threading import Thread

from . import on_exit_parallelism
from .message import Message


class AsyncProcessedMessage(Message):
_async_task_queue = queue.Queue()
def process_async(self):
"""Override to provide the processing/response mechanism, that will be performed in a separate thread"""
raise NotImplementedError()

def process(self):
self._async_task_queue.put(self)

def init_async_processing_thread():
def async_processing_thread():
while True:
msg = AsyncProcessedMessage._async_task_queue.get()
if msg is None:
break
msg.process_async()

t = Thread(target=async_processing_thread)
t.daemon = True
t.start()

def exit_async_processing_thread():
AsyncProcessedMessage._async_task_queue.empty()
AsyncProcessedMessage._async_task_queue.put(None)
t.join()

on_exit_parallelism(exit_async_processing_thread)
25 changes: 15 additions & 10 deletions tangos/parallel_tasks/backends/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

_print_exceptions = True

send_lock = threading.Lock() # a lock to make sure if multiple threads are running, only one can send/receive at a time
receive_lock = threading.Lock()

# Compatibility fix for python >=3.8 on MacOS, where the default process start
# method changed:
if sys.version_info[:2]>=(3,3):
Expand All @@ -30,22 +33,24 @@ class NoMatchingItem(Exception):


def send(data, destination, tag=0):
_pipe.send((data, destination, tag))
with send_lock:
_pipe.send((data, destination, tag))

def receive_any(source=None):
return receive(source,None,True)


def receive(source=None, tag=0, return_tag=False):
while True:
try:
item = _pop_first_match_from_reception_buffer(source, tag)
if return_tag:
return item
else:
return item[0]
except NoMatchingItem:
_receive_item_into_buffer()
with receive_lock:
while True:
try:
item = _pop_first_match_from_reception_buffer(source, tag)
if return_tag:
return item
else:
return item[0]
except NoMatchingItem:
_receive_item_into_buffer()


NUMPY_SPECIAL_TAG = 1515
Expand Down
9 changes: 5 additions & 4 deletions tangos/parallel_tasks/pynbody_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tangos.parallel_tasks.pynbody_server.snapshot_queue

from .. import log, remote_import
from ..async_message import AsyncProcessedMessage
from ..message import ExceptionMessage, Message
from . import snapshot_queue, transfer_array
from .snapshot_queue import (ConfirmLoadPynbodySnapshot,
Expand Down Expand Up @@ -55,7 +56,7 @@ def send(self, destination):
# send contents
transfer_array.send_array(self.contents, destination, use_shared_memory=self.shared_mem)

class RequestPynbodyArray(Message):
class RequestPynbodyArray(AsyncProcessedMessage):
_time_to_start_processing = []

def __init__(self, filter_or_object_spec, array, fam=None, request_sent_time=None):
Expand Down Expand Up @@ -102,7 +103,7 @@ def get_num_requests(cls):
def reset_performance_stats(cls):
cls._time_to_start_processing = []

def process(self):
def process_async(self):
start_time = time.time()
self._time_to_start_processing.append(start_time - self.request_sent_time)

Expand Down Expand Up @@ -153,7 +154,7 @@ def deserialize(cls, source, message):



class RequestPynbodySubsnapInfo(Message):
class RequestPynbodySubsnapInfo(AsyncProcessedMessage):
def __init__(self, filename, filter_):
super().__init__()
self.filename = filename
Expand All @@ -168,7 +169,7 @@ def deserialize(cls, source, message):
def serialize(self):
return (self.filename, self.filter_or_object_spec)

def process(self):
def process_async(self):
start_time = time.time()
assert(_server_queue.current_timestep == self.filename)
if self.filter_or_object_spec is not None:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_parallel_tasks_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import time

from tangos import parallel_tasks as pt
from tangos.parallel_tasks import async_message, message, testing


class Response(message.Message):
pass


class SlowProcessingMessage(async_message.AsyncProcessedMessage):
def process_async(self):
time.sleep(0.1)
Response("slow").send(self.source)

class FastProcessingMessage(message.Message):
def process(self):
Response("fast").send(self.source)

def _test_async_message():
SlowProcessingMessage().send(0)
FastProcessingMessage().send(0)
msg = Response.receive(0)
# fast message response should overtake slow message response
assert msg.contents == "fast"

msg = Response.receive(0)
assert msg.contents == "slow"



def test_async_message():
pt.use('multiprocessing-2')
pt.launch(_test_async_message)

0 comments on commit e33a838

Please sign in to comment.