Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise default threads for BAM parsing 8->32, BLAS 8->16 #292

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions vamb/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import pandas as pd

_ncpu = os.cpu_count()
DEFAULT_THREADS = 8 if _ncpu is None else min(_ncpu, 8)
DEFAULT_BLAS_THREADS = 16 if _ncpu is None else min(_ncpu, 16)

# These MUST be set before importing numpy
# I know this is a shitty hack, see https://github.com/numpy/numpy/issues/11826
os.environ["MKL_NUM_THREADS"] = str(DEFAULT_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(DEFAULT_THREADS)
os.environ["OMP_NUM_THREADS"] = str(DEFAULT_THREADS)
os.environ["MKL_NUM_THREADS"] = str(DEFAULT_BLAS_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(DEFAULT_BLAS_THREADS)
os.environ["OMP_NUM_THREADS"] = str(DEFAULT_BLAS_THREADS)

# Append vamb to sys.path to allow vamb import even if vamb was not installed
# using pip
Expand Down Expand Up @@ -771,9 +771,11 @@ def cluster_and_write_files(
print(
str(i + 1),
None if cluster.radius is None else round(cluster.radius, 3),
None
if cluster.observed_pvr is None
else round(cluster.observed_pvr, 2),
(
None
if cluster.observed_pvr is None
else round(cluster.observed_pvr, 2)
),
cluster.kind_str,
sum(sequence_lens[i] for i in cluster.members),
len(cluster.members),
Expand Down Expand Up @@ -1686,9 +1688,11 @@ def add_input_output_arguments(subparser):
dest="nthreads",
metavar="",
type=int,
default=DEFAULT_THREADS,
default=vamb.parsebam.DEFAULT_BAM_THREADS,
help=(
"number of threads to use " "[min(" + str(DEFAULT_THREADS) + ", nbamfiles)]"
"number of threads to read BAM files [min("
+ str(vamb.parsebam.DEFAULT_BAM_THREADS)
+ ", nbamfiles)]"
),
)
inputos.add_argument(
Expand Down
1 change: 0 additions & 1 deletion vamb/aamb_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Adversarial autoencoders (AAE) for metagenomics binning, this files contains the implementation of the AAE"""


import numpy as np
from math import log, isfinite
import time
Expand Down
15 changes: 8 additions & 7 deletions vamb/parsebam.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
from typing import Optional, TypeVar, Union, IO, Sequence, Iterable
from pathlib import Path
import shutil

_ncpu = _os.cpu_count()
DEFAULT_THREADS = 8 if _ncpu is None else _ncpu
import os

A = TypeVar("A", bound="Abundance")

_ncpu = os.cpu_count()
DEFAULT_BAM_THREADS = 32 if _ncpu is None else min(_ncpu, 32)


class Abundance:
"Object representing contig abundance. Contains a matrix and refhash."
Expand Down Expand Up @@ -115,10 +116,10 @@ def from_files(

chunksize = min(nthreads, len(paths))

# We cap it to 16 threads, max. This will prevent pycoverm from consuming a huge amount
# We cap it to DEFAULT_BAM_THREADS threads, max. This will prevent pycoverm from consuming a huge amount
# of memory if given a crapload of threads, and most programs will probably be IO bound
# when reading 16 files at a time.
chunksize = min(chunksize, 16)
# when reading DEFAULT_BAM_THREADS files at a time.
chunksize = min(chunksize, DEFAULT_BAM_THREADS)

# If it can be done in memory, do so
if chunksize >= len(paths):
Expand All @@ -134,7 +135,7 @@ def from_files(
else:
if cache_directory is None:
raise ValueError(
"If min(16, nthreads) < len(paths), cache_directory must not be None"
"If min(DEFAULT_BAM_THREADS, nthreads) < len(paths), cache_directory must not be None"
)
return cls.chunkwise_loading(
paths,
Expand Down
1 change: 0 additions & 1 deletion vamb/semisupervised_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Semisupervised multimodal VAEs for metagenomics binning, this files contains the implementation of the VAEVAE for MMSEQ predictions"""


__cmd_doc__ = """Encode depths and TNF using a VAE to latent representation"""

import numpy as _np
Expand Down
1 change: 0 additions & 1 deletion vamb/taxvamb_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Hierarchical loss for the labels suggested in https://arxiv.org/abs/2210.10929"""


__cmd_doc__ = """Hierarchical loss for the labels"""


Expand Down
12 changes: 6 additions & 6 deletions workflow_avamb/src/rip_bins.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ def remove_meaningless_edges_from_pairs(
contig_length,
)
print("Cluster ripped because of a meaningless edge ", cluster_updated)
clusters_changed_but_not_intersecting_contigs[
cluster_updated
] = cluster_contigs[cluster_updated]
clusters_changed_but_not_intersecting_contigs[cluster_updated] = (
cluster_contigs[cluster_updated]
)

components: list[set[str]] = list()
for component in nx.connected_components(graph_clusters):
Expand Down Expand Up @@ -295,9 +295,9 @@ def make_all_components_pair(
contig_length,
)
print("Cluster ripped because of a pairing component ", cluster_updated)
clusters_changed_but_not_intersecting_contigs[
cluster_updated
] = cluster_contigs[cluster_updated]
clusters_changed_but_not_intersecting_contigs[cluster_updated] = (
cluster_contigs[cluster_updated]
)
component_len = max(
[
len(nx.node_connected_component(graph_clusters, node_i))
Expand Down
Loading