From 2fe4280daf5649ec960b9334435a3d4977cabeb6 Mon Sep 17 00:00:00 2001 From: bhlieberman Date: Mon, 5 Aug 2024 21:00:56 +0000 Subject: [PATCH] fix logic error in queue pattern that was blocking progress --- stages/03_build.py | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/stages/03_build.py b/stages/03_build.py index 8f57ab4..249d1e3 100644 --- a/stages/03_build.py +++ b/stages/03_build.py @@ -1,5 +1,5 @@ -from concurrent.futures import ThreadPoolExecutor -from queue import Full, Queue, SimpleQueue +from concurrent.futures import as_completed, ThreadPoolExecutor +from queue import Full, Queue import logging from typing import Dict, List from lxml import etree @@ -25,8 +25,8 @@ def parse_element(element): @dataclass(frozen=True) class Pubtator: logger: logging.Logger = field(default=logging.getLogger(__name__)) - exec: ThreadPoolExecutor = field(default=ThreadPoolExecutor(max_workers=4)) - queue: Queue = field(default=SimpleQueue()) + exec: ThreadPoolExecutor = field(default=ThreadPoolExecutor(max_workers=6)) + xml_queue: Queue = field(default=Queue(maxsize=100)) def create_out_dir(self) -> Path: brick_dir = Path("brick/BioCXML") @@ -35,7 +35,7 @@ def create_out_dir(self) -> Path: return brick_dir def create_parquet(self, file, elems) -> None: - name = file.with_suffix(".parquet").stem + name = file.with_suffix(".parquet").name df = pd.DataFrame.from_records(elems) df.to_parquet(biocxml_out / name) @@ -51,13 +51,13 @@ def parse_xml(self, xml_file) -> List[Dict]: while element.getprevious() is not None: del element.getparent()[0] logging.info(f"finished parsing {xml_file}") - return parsed_elements + return parsed_elements, xml_file - def take_from_queue(self): - if not self.queue.empty(): - task, fname = self.queue.get(timeout=10.0) - logging.info(f"creating Parquet from {fname}") - self.create_parquet(fname, task.result()) + def take_from_xml_queue(self): + fut = self.xml_queue.get(timeout=10.0) + result, fname = fut.result() + logging.info(f"creating Parquet from {fname}") + self.create_parquet(fname, result) def config_logger(self): logging.basicConfig( @@ -68,23 +68,22 @@ def config_logger(self): def run(self): self.config_logger() - biocxml_out.mkdir(exist_ok=True) + self.create_out_dir() futures = [ - (self.exec.submit(self.parse_xml, file), file) - for file in biocxml_in.iterdir() + self.exec.submit(self.parse_xml, file) for file in biocxml_in.iterdir() ] - for fut in futures: - _, file = fut + for fut in as_completed(futures): try: - logging.info(f"putting future from {file} on queue") - self.queue.put(fut, timeout=20.0) + self.xml_queue.put(fut, timeout=20.0) except Full: print("Queue is full and timeout exceeded.") continue - while not self.queue.empty(): - self.take_from_queue() + if not self.xml_queue.empty(): + self.take_from_xml_queue() + -if __name__ == '__main__': + +if __name__ == "__main__": pubtator = Pubtator() - pubtator.run() \ No newline at end of file + pubtator.run()