Skip to content

Commit

Permalink
make this multithreaded to speed up DVC run
Browse files Browse the repository at this point in the history
  • Loading branch information
bhlieberman committed Jul 10, 2024
1 parent a550872 commit 5b44bf5
Showing 1 changed file with 61 additions and 11 deletions.
72 changes: 61 additions & 11 deletions stages/03_build.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from concurrent.futures import ThreadPoolExecutor
from queue import Full, Queue, SimpleQueue
import logging
from typing import Dict, List
from lxml import etree
import pandas as pd
from pathlib import Path
from dataclasses import dataclass, field

raw_dir = Path("raw")
brick_dir = Path("brick")

biocxml_in = raw_dir / "output/BioCXML"
biocxml_out = brick_dir / "BioCXML"

parser = etree.XMLParser(huge_tree=True)


def parse_element(element):
element_dict = {
Expand All @@ -19,13 +22,26 @@ def parse_element(element):
return element_dict


if __name__ == "__main__":
@dataclass(frozen=True)
class Pubtator:
logger: logging.Logger = field(default=logging.getLogger(__name__))
exec: ThreadPoolExecutor = field(default=ThreadPoolExecutor(max_workers=4))
queue: Queue = field(default=SimpleQueue())

def create_out_dir(self) -> Path:
brick_dir = Path("brick/BioCXML")
if not brick_dir.exists():
brick_dir.mkdir(parents=True)
return brick_dir

if not biocxml_out.exists():
biocxml_out.mkdir(parents=True)
def create_parquet(self, file, elems) -> None:
name = file.with_suffix(".parquet").stem
df = pd.DataFrame.from_records(elems)
df.to_parquet(biocxml_out / name)

for f in biocxml_in.iterdir():
with open(f, "rb") as xml:
def parse_xml(self, xml_file) -> List[Dict]:
with open(xml_file, "rb") as xml:
logging.info(f"opening {xml_file}")
parsed_elements = []
for event, element in etree.iterparse(xml, events=("end",)):
if element.tag == "collection":
Expand All @@ -34,7 +50,41 @@ def parse_element(element):
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
df = pd.DataFrame.from_records(parsed_elements)
out_path = Path(f.name)
df.to_parquet(biocxml_out / out_path.with_suffix(".parquet"))
print(f"done parsing file: {f.name}")
logging.info(f"finished parsing {xml_file}")
return parsed_elements

def take_from_queue(self):
if not self.queue.empty():
task, fname = self.queue.get(timeout=10.0)
logging.info(f"creating Parquet from {fname}")
self.create_parquet(fname, task.result())

def config_logger(self):
logging.basicConfig(
filename="pubtator.log",
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(message)s",
)

def run(self):
self.config_logger()
biocxml_out.mkdir(exist_ok=True)
futures = [
(self.exec.submit(self.parse_xml, file), file)
for file in biocxml_in.iterdir()
]
for fut in futures:
_, file = fut
try:
logging.info(f"putting future from {file} on queue")
self.queue.put(fut, timeout=20.0)
except Full:
print("Queue is full and timeout exceeded.")
continue

while not self.queue.empty():
self.take_from_queue()

if __name__ == '__main__':
pubtator = Pubtator()
pubtator.run()

0 comments on commit 5b44bf5

Please sign in to comment.