Skip to content

Commit

Permalink
Raise default threads for BAM parsing 8->32, BLAS 8->16
Browse files Browse the repository at this point in the history
It was originally capped at 8 under the belief that reading 8 BAM files in
parallel would saturate the disk, so there would be no benefit of going higher.
However, my laptop can read at 4 GB/s, and decompress BAM files perhaps 40 times
slower, so it's CPU bottlenecked even with 32 threads.

This change is significant, because users have reported slow BAM file parsing.
However, it will potentially quadruple the memory usage of the BAM parsing step.
Will be benchmarked before merging.

The BLAS change is simply because I think 8 CPUs is too conservative.
  • Loading branch information
jakobnissen committed Feb 15, 2024
1 parent f37b5fd commit 60c22a9
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 25 deletions.
22 changes: 13 additions & 9 deletions vamb/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import pandas as pd

_ncpu = os.cpu_count()
DEFAULT_THREADS = 8 if _ncpu is None else min(_ncpu, 8)
DEFAULT_BLAS_THREADS = 16 if _ncpu is None else min(_ncpu, 16)

# These MUST be set before importing numpy
# I know this is a shitty hack, see https://github.com/numpy/numpy/issues/11826
os.environ["MKL_NUM_THREADS"] = str(DEFAULT_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(DEFAULT_THREADS)
os.environ["OMP_NUM_THREADS"] = str(DEFAULT_THREADS)
os.environ["MKL_NUM_THREADS"] = str(DEFAULT_BLAS_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(DEFAULT_BLAS_THREADS)
os.environ["OMP_NUM_THREADS"] = str(DEFAULT_BLAS_THREADS)

# Append vamb to sys.path to allow vamb import even if vamb was not installed
# using pip
Expand Down Expand Up @@ -771,9 +771,11 @@ def cluster_and_write_files(
print(
str(i + 1),
None if cluster.radius is None else round(cluster.radius, 3),
None
if cluster.observed_pvr is None
else round(cluster.observed_pvr, 2),
(
None
if cluster.observed_pvr is None
else round(cluster.observed_pvr, 2)
),
cluster.kind_str,
sum(sequence_lens[i] for i in cluster.members),
len(cluster.members),
Expand Down Expand Up @@ -1686,9 +1688,11 @@ def add_input_output_arguments(subparser):
dest="nthreads",
metavar="",
type=int,
default=DEFAULT_THREADS,
default=vamb.parsebam.DEFAULT_BAM_THREADS,
help=(
"number of threads to use " "[min(" + str(DEFAULT_THREADS) + ", nbamfiles)]"
"number of threads to read BAM files [min("
+ str(vamb.parsebam.DEFAULT_BAM_THREADS)
+ ", nbamfiles)]"
),
)
inputos.add_argument(
Expand Down
1 change: 0 additions & 1 deletion vamb/aamb_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Adversarial autoencoders (AAE) for metagenomics binning, this files contains the implementation of the AAE"""


import numpy as np
from math import log, isfinite
import time
Expand Down
15 changes: 8 additions & 7 deletions vamb/parsebam.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
from typing import Optional, TypeVar, Union, IO, Sequence, Iterable
from pathlib import Path
import shutil

_ncpu = _os.cpu_count()
DEFAULT_THREADS = 8 if _ncpu is None else _ncpu
import os

A = TypeVar("A", bound="Abundance")

_ncpu = os.cpu_count()
DEFAULT_BAM_THREADS = 32 if _ncpu is None else min(_ncpu, 32)


class Abundance:
"Object representing contig abundance. Contains a matrix and refhash."
Expand Down Expand Up @@ -115,10 +116,10 @@ def from_files(

chunksize = min(nthreads, len(paths))

# We cap it to 16 threads, max. This will prevent pycoverm from consuming a huge amount
# We cap it to DEFAULT_BAM_THREADS threads, max. This will prevent pycoverm from consuming a huge amount
# of memory if given a crapload of threads, and most programs will probably be IO bound
# when reading 16 files at a time.
chunksize = min(chunksize, 16)
# when reading DEFAULT_BAM_THREADS files at a time.
chunksize = min(chunksize, DEFAULT_BAM_THREADS)

# If it can be done in memory, do so
if chunksize >= len(paths):
Expand All @@ -134,7 +135,7 @@ def from_files(
else:
if cache_directory is None:
raise ValueError(
"If min(16, nthreads) < len(paths), cache_directory must not be None"
"If min(DEFAULT_BAM_THREADS, nthreads) < len(paths), cache_directory must not be None"
)
return cls.chunkwise_loading(
paths,
Expand Down
1 change: 0 additions & 1 deletion vamb/semisupervised_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Semisupervised multimodal VAEs for metagenomics binning, this files contains the implementation of the VAEVAE for MMSEQ predictions"""


__cmd_doc__ = """Encode depths and TNF using a VAE to latent representation"""

import numpy as _np
Expand Down
1 change: 0 additions & 1 deletion vamb/taxvamb_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Hierarchical loss for the labels suggested in https://arxiv.org/abs/2210.10929"""


__cmd_doc__ = """Hierarchical loss for the labels"""


Expand Down
12 changes: 6 additions & 6 deletions workflow_avamb/src/rip_bins.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ def remove_meaningless_edges_from_pairs(
contig_length,
)
print("Cluster ripped because of a meaningless edge ", cluster_updated)
clusters_changed_but_not_intersecting_contigs[
cluster_updated
] = cluster_contigs[cluster_updated]
clusters_changed_but_not_intersecting_contigs[cluster_updated] = (
cluster_contigs[cluster_updated]
)

components: list[set[str]] = list()
for component in nx.connected_components(graph_clusters):
Expand Down Expand Up @@ -295,9 +295,9 @@ def make_all_components_pair(
contig_length,
)
print("Cluster ripped because of a pairing component ", cluster_updated)
clusters_changed_but_not_intersecting_contigs[
cluster_updated
] = cluster_contigs[cluster_updated]
clusters_changed_but_not_intersecting_contigs[cluster_updated] = (
cluster_contigs[cluster_updated]
)
component_len = max(
[
len(nx.node_connected_component(graph_clusters, node_i))
Expand Down

0 comments on commit 60c22a9

Please sign in to comment.