Skip to content

Commit

Permalink
fix logic error in queue pattern that was blocking progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bhlieberman committed Aug 5, 2024
1 parent 5b44bf5 commit 2fe4280
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions stages/03_build.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from concurrent.futures import ThreadPoolExecutor
from queue import Full, Queue, SimpleQueue
from concurrent.futures import as_completed, ThreadPoolExecutor
from queue import Full, Queue
import logging
from typing import Dict, List
from lxml import etree
Expand All @@ -25,8 +25,8 @@ def parse_element(element):
@dataclass(frozen=True)
class Pubtator:
logger: logging.Logger = field(default=logging.getLogger(__name__))
exec: ThreadPoolExecutor = field(default=ThreadPoolExecutor(max_workers=4))
queue: Queue = field(default=SimpleQueue())
exec: ThreadPoolExecutor = field(default=ThreadPoolExecutor(max_workers=6))
xml_queue: Queue = field(default=Queue(maxsize=100))

def create_out_dir(self) -> Path:
brick_dir = Path("brick/BioCXML")
Expand All @@ -35,7 +35,7 @@ def create_out_dir(self) -> Path:
return brick_dir

def create_parquet(self, file, elems) -> None:
name = file.with_suffix(".parquet").stem
name = file.with_suffix(".parquet").name
df = pd.DataFrame.from_records(elems)
df.to_parquet(biocxml_out / name)

Expand All @@ -51,13 +51,13 @@ def parse_xml(self, xml_file) -> List[Dict]:
while element.getprevious() is not None:
del element.getparent()[0]
logging.info(f"finished parsing {xml_file}")
return parsed_elements
return parsed_elements, xml_file

def take_from_queue(self):
if not self.queue.empty():
task, fname = self.queue.get(timeout=10.0)
logging.info(f"creating Parquet from {fname}")
self.create_parquet(fname, task.result())
def take_from_xml_queue(self):
fut = self.xml_queue.get(timeout=10.0)
result, fname = fut.result()
logging.info(f"creating Parquet from {fname}")
self.create_parquet(fname, result)

def config_logger(self):
logging.basicConfig(
Expand All @@ -68,23 +68,22 @@ def config_logger(self):

def run(self):
self.config_logger()
biocxml_out.mkdir(exist_ok=True)
self.create_out_dir()
futures = [
(self.exec.submit(self.parse_xml, file), file)
for file in biocxml_in.iterdir()
self.exec.submit(self.parse_xml, file) for file in biocxml_in.iterdir()
]
for fut in futures:
_, file = fut
for fut in as_completed(futures):
try:
logging.info(f"putting future from {file} on queue")
self.queue.put(fut, timeout=20.0)
self.xml_queue.put(fut, timeout=20.0)
except Full:
print("Queue is full and timeout exceeded.")
continue

while not self.queue.empty():
self.take_from_queue()
if not self.xml_queue.empty():
self.take_from_xml_queue()


if __name__ == '__main__':

if __name__ == "__main__":
pubtator = Pubtator()
pubtator.run()
pubtator.run()

0 comments on commit 2fe4280

Please sign in to comment.