diff --git a/.github/workflows/cli_vamb.yml b/.github/workflows/cli_vamb.yml index 1f74ae2a..70164add 100644 --- a/.github/workflows/cli_vamb.yml +++ b/.github/workflows/cli_vamb.yml @@ -42,7 +42,7 @@ jobs: cat outdir_vamb/log.txt - name: Run TaxVAMB run: | - vamb bin taxvamb --outdir outdir_taxvamb --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy taxonomy_mock.tsv -pe 10 -pt 10 -e 10 -q -t 10 -o C --minfasta 200000 + vamb bin taxvamb --outdir outdir_taxvamb --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy taxonomy_mock.tsv -pe 10 -pt 10 -e 10 -q -pq -t 10 -o C --minfasta 200000 ls -la outdir_taxvamb cat outdir_taxvamb/log.txt vamb bin taxvamb --outdir outdir_taxvamb_no_predict --no_predictor --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy taxonomy_mock.tsv -e 10 -q -t 10 -o C --minfasta 200000 @@ -53,10 +53,10 @@ jobs: cat outdir_taxvamb_preds/log.txt - name: Run Taxometer run: | - vamb taxometer --outdir outdir_taxometer --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy taxonomy_mock.tsv -pe 10 -pt 10 + vamb taxometer --outdir outdir_taxometer --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy taxonomy_mock.tsv -pe 10 -pq -pt 10 ls -la outdir_taxometer cat outdir_taxometer/log.txt - vamb taxometer --outdir outdir_taxometer_pred --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy outdir_taxometer/results_taxometer.tsv -pe 10 -pt 10 + vamb taxometer --outdir outdir_taxometer_pred --fasta catalogue_mock.fna.gz --abundance abundance_mock.npz --taxonomy outdir_taxometer/results_taxometer.tsv -pe 10 -pq -pt 10 ls -la outdir_taxometer_pred cat outdir_taxometer/log.txt - name: Run k-means reclustering diff --git a/vamb/__main__.py b/vamb/__main__.py index 7c515873..ff5bf8d7 100755 --- a/vamb/__main__.py +++ b/vamb/__main__.py @@ -9,15 +9,15 @@ import argparse import torch import time -import random import pycoverm import itertools from math import isfinite -from typing import Optional, Tuple, Union, cast +from typing import Optional, Tuple, Union, cast, Callable, Literal, NamedTuple from pathlib import Path from collections.abc import Sequence from collections import defaultdict from torch.utils.data import DataLoader +from functools import partial from loguru import logger _ncpu = os.cpu_count() @@ -41,6 +41,12 @@ sys.path.append(parentdir) +def check_existing_file(path: Path) -> Path: + if not path.is_file(): + raise FileNotFoundError(path) + return path + + def format_log(record) -> str: colors = {"WARNING": "red", "INFO": "green", "DEBUG": "blue", "ERROR": "red"} L = colors.get(record["level"].name, "blue") @@ -51,6 +57,11 @@ def format_log(record) -> str: return f"{time} | {level} | {message}\n{{exception}}" +def typeasserted(x, Ty: Union[type, tuple[type, ...]]): + assert isinstance(x, Ty) + return x + + def try_make_dir(name: Union[Path, str]): try: os.mkdir(name) @@ -58,34 +69,35 @@ def try_make_dir(name: Union[Path, str]): pass -class FASTAPath(type(Path())): - __slots__ = [] - pass +class FASTAPath: + def __init__(self, path: Path): + self.path = check_existing_file(path) -class CompositionPath(type(Path())): - __slots__ = [] - pass +class CompositionPath: + def __init__(self, path: Path): + self.path = check_existing_file(path) class CompositionOptions: - __slots__ = ["path", "min_contig_length"] + __slots__ = ["path"] + + @staticmethod + def are_args_present(args: argparse.Namespace) -> bool: + return isinstance(args.fasta, Path) or isinstance(args.composition, Path) + + @classmethod + def from_args(cls, args: argparse.Namespace): + return cls( + typeasserted(args.fasta, (Path, type(None))), + typeasserted(args.composition, (Path, type(None))), + ) def __init__( self, fastapath: Optional[Path], npzpath: Optional[Path], - min_contig_length: int, ): - assert isinstance(fastapath, (Path, type(None))) - assert isinstance(npzpath, (Path, type(None))) - assert isinstance(min_contig_length, int) - - if min_contig_length < 250: - raise argparse.ArgumentTypeError( - "Minimum contig length must be at least 250" - ) - if not (fastapath is None) ^ (npzpath is None): raise argparse.ArgumentTypeError( "Must specify either FASTA or composition path" @@ -100,23 +112,83 @@ def __init__( else: assert npzpath is not None self.path = CompositionPath(npzpath) - self.min_contig_length = min_contig_length -class AbundancePath(type(Path())): - pass +class AbundancePath: + def __init__(self, abundance: Path): + self.abundance = check_existing_file(abundance) + +class BAMPaths: + @classmethod + def from_dir(cls, dir: Path, min_alignment_id: Optional[float]): + if not dir.is_dir(): + raise NotADirectoryError(dir) + paths = [b for b in dir.iterdir() if b.is_file() and b.suffix == ".bam"] + return cls(paths, min_alignment_id) + + def __init__(self, paths: list[Path], min_alignment_id: Optional[float]): + if len(paths) == 0: + raise ValueError( + "No BAM files passed. If `--bamdir` was used, check directory is not empty, " + "and all BAM files ends with the extension `.bam`" + ) + for path in paths: + check_existing_file(path) + if not pycoverm.is_bam_sorted(str(path)): + raise ValueError(f"Path {path} is not sorted by reference.") -class BAMPaths(list): - pass + if min_alignment_id is not None: + if ( + not isfinite(min_alignment_id) + or min_alignment_id < 0.0 + or min_alignment_id > 1.0 + ): + raise argparse.ArgumentTypeError( + "Minimum nucleotide ID must be in [0,1]" + ) + self.min_alignment_id = min_alignment_id + else: + self.min_alignment_id = 0.0 + self.paths = paths class AEMBPaths(list): - pass + __slots__ = ["paths"] + + def __init__(self, dir: Path): + if not dir.is_dir(): + raise NotADirectoryError(dir) + paths = [b for b in dir.iterdir() if b.is_file() and b.suffix == ".tsv"] + if len(paths) == 0: + raise ValueError( + f'No `.tsv` files found in --aemb argument "{dir}". ' + "Make sure all TSV files in the directory ends with `.tsv`." + ) + self.paths = paths class AbundanceOptions: - __slots__ = ["path", "min_alignment_id", "refcheck"] + __slots__ = ["paths"] + + @staticmethod + def are_args_present(args: argparse.Namespace) -> bool: + return ( + isinstance(args.bampaths, list) + or isinstance(args.bamdir, Path) + or isinstance(args.aemb, Path) + or isinstance(args.abundancepath, Path) + ) + + @classmethod + def from_args(cls, args: argparse.Namespace): + return cls( + typeasserted(args.bampaths, (list, type(None))), + typeasserted(args.bamdir, (Path, type(None))), + typeasserted(args.aemb, (Path, type(None))), + typeasserted(args.abundancepath, (Path, type(None))), + typeasserted(args.min_alignment_id, (float, type(None))), + ) def __init__( self, @@ -125,14 +197,12 @@ def __init__( aemb: Optional[Path], abundancepath: Optional[Path], min_alignment_id: Optional[float], - refcheck: bool, ): assert isinstance(bampaths, (list, type(None))) assert isinstance(bamdir, (Path, type(None))) assert isinstance(aemb, (Path, type(None))) assert isinstance(abundancepath, (Path, type(None))) assert isinstance(min_alignment_id, (float, type(None))) - assert isinstance(refcheck, bool) # Make sure only one abundance input is there if ( @@ -146,89 +216,95 @@ def __init__( ) if abundancepath is not None: - if not abundancepath.is_file(): - raise FileNotFoundError( - f'Not an existing non-directory file: "{str(abundancepath)}"' - ) - self.path = AbundancePath(abundancepath) + self.paths = AbundancePath(abundancepath) elif bamdir is not None: - if not bamdir.is_dir(): - raise NotADirectoryError( - f"Directory '{bamdir}' passed with --bamdir is not an existing directory." - ) - paths = [p for p in bamdir.iterdir() if p.is_file() and p.suffix == ".bam"] - if len(paths) == 0: - raise ValueError( - f"No `.bam` files found in --bamdir argument {bamdir}. " - "Make sure all BAM files in the directory ends with `.bam`." - ) - self.add_existing_bam_paths(paths) + self.paths = BAMPaths.from_dir(bamdir, min_alignment_id) elif bampaths is not None: logger.warning( "The --bamfiles argument is deprecated. It works, but might be removed in future versions of Vamb. Please use --bamdir instead" ) - for bampath in bampaths: - if not bampath.is_file(): - raise FileNotFoundError( - f'Not an existing non-directory file: "{str(bampath)}"' - ) - self.add_existing_bam_paths(bampaths) + self.paths = BAMPaths(bampaths, min_alignment_id) elif aemb is not None: - if not aemb.is_dir(): - raise NotADirectoryError( - f"Directory '{aemb}' passed with --aemb is not an existing directory." - ) - paths = [p for p in aemb.iterdir() if p.is_file() and p.suffix == ".tsv"] - if len(paths) == 0: - raise ValueError( - f"No `.tsv` files found in --aemb argument {aemb}. " - "Make sure all TSV files in the directory ends with `.tsv`." - ) - self.path = AEMBPaths(paths) + self.paths = AEMBPaths(aemb) - if min_alignment_id is not None: - if bampaths is None: - raise argparse.ArgumentTypeError( - "If minid is set, abundances must be passed as bam files" - ) - if ( - not isfinite(min_alignment_id) - or min_alignment_id < 0.0 - or min_alignment_id > 1.0 - ): - raise argparse.ArgumentTypeError( - "Minimum nucleotide ID must be in [0,1]" - ) - self.min_alignment_id = min_alignment_id - else: - self.min_alignment_id = 0.0 - self.refcheck = refcheck +class BasicTrainingOptions: + __slots__ = ["num_epochs", "starting_batch_size", "batch_steps"] - def add_existing_bam_paths(self, paths: list[Path]): - for bampath in paths: - if not pycoverm.is_bam_sorted(str(bampath)): - raise ValueError(f"Path {bampath} is not sorted by reference.") - self.path = BAMPaths(paths) + @classmethod + def from_args_vae(cls, args: argparse.Namespace): + return cls( + typeasserted(args.nepochs, int), + typeasserted(args.batchsize, int), + typeasserted(args.batchsteps, list), + ) + + @classmethod + def from_args_aae(cls, args: argparse.Namespace): + return cls( + typeasserted(args.nepochs_aae, int), + typeasserted(args.batchsize_aae, int), + typeasserted(args.batchsteps_aae, list), + ) + + @classmethod + def from_args_taxometer(cls, args: argparse.Namespace): + return cls( + typeasserted(args.pred_nepochs, int), + typeasserted(args.pred_batchsize, int), + typeasserted(args.pred_batchsteps, list), + ) + + def __init__( + self, num_epochs: int, starting_batch_size: int, batch_steps: list[int] + ): + if num_epochs < 1: + raise argparse.ArgumentTypeError(f"Minimum 1 epoch, not {num_epochs}") + self.num_epochs = num_epochs + + if starting_batch_size < 1: + raise argparse.ArgumentTypeError( + f"Minimum batchsize of 1, not {starting_batch_size}" + ) + self.starting_batch_size = starting_batch_size + + batch_steps = sorted(set(batch_steps)) + if max(batch_steps, default=0) >= self.num_epochs: + raise argparse.ArgumentTypeError("All batchsteps must be less than nepochs") + + if min(batch_steps, default=1) < 1: + raise argparse.ArgumentTypeError("All batchsteps must be 1 or higher") + self.batch_steps = batch_steps class VAEOptions: - __slots__ = ["nhiddens", "nlatent", "beta", "dropout"] + __slots__ = ["nhiddens", "nlatent", "alpha", "beta", "dropout", "basic_options"] + + @classmethod + def from_args(cls, basic: BasicTrainingOptions, args: argparse.Namespace): + return cls( + basic, + typeasserted(args.nhiddens, (list, type(None))), + typeasserted(args.nlatent, int), + typeasserted(args.alpha, (float, type(None))), + typeasserted(args.beta, float), + typeasserted(args.dropout, (float, type(None))), + ) def __init__( self, + basic_options: BasicTrainingOptions, nhiddens: Optional[list[int]], nlatent: int, + alpha: Optional[float], beta: float, dropout: Optional[float], ): - assert isinstance(nhiddens, (list, type(None))) - assert isinstance(nlatent, int) - - assert isinstance(beta, float) - assert isinstance(dropout, (float, type(None))) + self.basic_options = basic_options - if nhiddens is not None and any(i < 1 for i in nhiddens): + if nhiddens is not None and ( + len(nhiddens) == 0 or any(i < 1 for i in nhiddens) + ): raise argparse.ArgumentTypeError( f"Minimum 1 neuron per layer, not {min(nhiddens)}" ) @@ -238,6 +314,10 @@ def __init__( raise argparse.ArgumentTypeError(f"Minimum 1 latent neuron, not {nlatent}") self.nlatent = nlatent + if alpha is not None and ((not isfinite(alpha)) or alpha <= 0 or alpha >= 1): + raise argparse.ArgumentTypeError("alpha must be above 0 and below 1") + self.alpha = alpha + if beta <= 0 or not isfinite(beta): raise argparse.ArgumentTypeError("beta cannot be negative or zero") self.beta = beta @@ -247,141 +327,98 @@ def __init__( self.dropout = dropout -class AAEOptions: - def __init__( - self, - nhiddens: int, - nlatent_z: int, - nlatent_y: int, - sl: float, - slr: float, - ): - assert isinstance(nhiddens, int) - assert isinstance(nlatent_z, int) - assert isinstance(nlatent_y, int) - assert isinstance(sl, float) - assert isinstance(slr, float) - - self.nhiddens = nhiddens - self.nlatent_z = nlatent_z - self.nlatent_y = nlatent_y - self.sl = sl - self.slr = slr - - -class TaxonomyOptions: +class GeneralOptions: __slots__ = [ - "no_predictor", - "taxonomy_path", + "out_dir", + "min_contig_length", + "n_threads", + "refcheck", + "seed", + "cuda", ] - def __init__( - self, - no_predictor: bool, - taxonomy_path: Path, - ): - assert isinstance(taxonomy_path, Path) - assert isinstance(no_predictor, bool) - - if taxonomy_path is not None and not taxonomy_path.is_file(): - raise FileNotFoundError(taxonomy_path) - - self.taxonomy_path = taxonomy_path - self.no_predictor = no_predictor - - -class ReclusteringOptions: - __slots__ = [ - "latent_path", - "clusters_path", - "hmmout_path", - "binsplitter", - "algorithm", - ] + @classmethod + def from_args(cls, args: argparse.Namespace): + return cls( + typeasserted(args.outdir, Path), + typeasserted(args.minlength, int), + typeasserted(args.nthreads, int), + not typeasserted(args.norefcheck, bool), + typeasserted(args.seed, int), + typeasserted(args.cuda, bool), + ) def __init__( self, - latent_path: Path, - clusters_path: Path, - hmmout_path: Path, - binsplit_separator: Optional[str], - algorithm: str, + out_dir: Path, + min_contig_length: int, + n_threads: int, + refcheck: bool, + seed: int, + cuda: bool, ): - assert isinstance(latent_path, Path) - assert isinstance(clusters_path, Path) - assert isinstance(hmmout_path, Path) - - for path in [latent_path, clusters_path, hmmout_path]: - if not path.is_file(): - raise FileNotFoundError(path) - - if algorithm not in ("kmeans", "dbscan"): - raise ValueError("Algortihm must be 'kmeans' or 'dbscan'") - - self.latent_path = latent_path - self.clusters_path = clusters_path - self.hmmout_path = hmmout_path - self.algorithm = algorithm - self.binsplitter = vamb.vambtools.BinSplitter(binsplit_separator) - - -class EncoderOptions: - __slots__ = ["vae_options", "aae_options", "alpha"] + # Outdir does not exist + if out_dir.exists(): + raise FileExistsError(out_dir) - def __init__( - self, - vae_options: Optional[VAEOptions], - aae_options: Optional[AAEOptions], - alpha: Optional[float], - ): - assert isinstance(alpha, (float, type(None))) - assert isinstance(vae_options, (VAEOptions, type(None))) - assert isinstance(aae_options, (AAEOptions, type(None))) + # Outdir is in an existing parent dir + parent_dir = out_dir.parent + if not parent_dir.is_dir(): + raise NotADirectoryError(parent_dir) + self.out_dir = out_dir - if alpha is not None and (alpha <= 0 or alpha) >= 1: - raise argparse.ArgumentTypeError("alpha must be above 0 and below 1") - self.alpha = alpha + if n_threads < 1: + raise ValueError(f"Must pass at least 1 thread, not {n_threads}") + self.n_threads = n_threads - if vae_options is None and aae_options is None: + if min_contig_length < 250: raise argparse.ArgumentTypeError( - "You must specify either VAE or AAE or both as encoders, cannot run with no encoders." + "Minimum contig length must be at least 250" ) + self.min_contig_length = min_contig_length - self.vae_options = vae_options - self.aae_options = aae_options - - -class VAETrainingOptions: - def __init__(self, nepochs: int, batchsize: int, batchsteps: list[int]): - assert isinstance(nepochs, int) - assert isinstance(batchsize, int) - assert isinstance(batchsteps, list) + if cuda and not torch.cuda.is_available(): + raise ModuleNotFoundError( + "Cuda is not available on your PyTorch installation" + ) + self.seed = seed + self.cuda = cuda + self.refcheck = refcheck - if nepochs < 1: - raise argparse.ArgumentTypeError(f"Minimum 1 epoch, not {nepochs}") - self.nepochs = nepochs - if batchsize < 1: - raise argparse.ArgumentTypeError(f"Minimum batchsize of 1, not {batchsize}") - self.batchsize = batchsize +class TaxonomyPath: + @classmethod + def from_args(cls, args: argparse.Namespace): + return cls(typeasserted(args.taxonomy, Path)) - batchsteps = sorted(set(batchsteps)) - if max(batchsteps, default=0) >= self.nepochs: - raise argparse.ArgumentTypeError("All batchsteps must be less than nepochs") + def __init__(self, path: Path): + self.path = check_existing_file(path) - if min(batchsteps, default=1) < 1: - raise argparse.ArgumentTypeError("All batchsteps must be 1 or higher") - self.batchsteps = batchsteps +class TaxometerOptions: + @classmethod + def from_args(cls, args: argparse.Namespace): + return cls( + GeneralOptions.from_args(args), + CompositionOptions.from_args(args), + AbundanceOptions.from_args(args), + TaxonomyPath.from_args(args), + BasicTrainingOptions.from_args_taxometer(args), + typeasserted(args.pred_softmax_threshold, float), + args.ploss, + ) -class PredictorTrainingOptions(VAETrainingOptions): def __init__( self, - nepochs: int, - batchsize: int, - batchsteps: list[int], + general: GeneralOptions, + composition: CompositionOptions, + abundance: AbundanceOptions, + taxonomy: TaxonomyPath, + basic: BasicTrainingOptions, softmax_threshold: float, - ploss: str, + ploss: Union[ + Literal["float_softmax"], Literal["cond_softmax"], Literal["soft_margin"] + ], ): losses = ["flat_softmax", "cond_softmax", "soft_margin"] if (softmax_threshold > 1) or (softmax_threshold < 0): @@ -392,67 +429,83 @@ def __init__( raise argparse.ArgumentTypeError( f"Predictor loss needs to be one of {losses}" ) - self.softmax_threshold = softmax_threshold + self.general = general + self.taxonomy = taxonomy self.ploss = ploss - super(PredictorTrainingOptions, self).__init__( - nepochs, - batchsize, - batchsteps, - ) + self.softmax_threshold = softmax_threshold + self.composition = composition + self.abundance = abundance + self.basic = basic -class AAETrainingOptions: - def __init__( - self, - nepochs: int, - batchsize: int, - batchsteps: list[int], - temp: Optional[float], - ): - assert isinstance(nepochs, int) - assert isinstance(batchsize, int) - assert isinstance(batchsteps, list) +class RefinableTaxonomyOptions: + __slots__ = ["path_or_tax_options"] - if nepochs < 1: - raise argparse.ArgumentTypeError(f"Minimum 1 epoch, not {nepochs}") - self.nepochs = nepochs + @classmethod + def from_args(cls, args: argparse.Namespace): + predict = not typeasserted(args.no_predictor, bool) - if batchsize < 1: - raise argparse.ArgumentTypeError(f"Minimum batchsize of 1, not {batchsize}") - self.batchsize = batchsize + # TaxometerOptions have more options, but only the composition and the abundance + # can be omitted, so we only check those here. + if predict: + if not CompositionOptions.are_args_present( + args + ) or not AbundanceOptions.are_args_present(args): + raise ValueError( + "If `--no_predictor` is not passed, Taxometer is run to refine taxonomy, " + "and this requires composition input and abundance input to be passed in" + ) + return cls(TaxometerOptions.from_args(args)) + else: + return cls(TaxonomyPath.from_args(args)) - batchsteps = sorted(set(batchsteps)) - if max(batchsteps, default=0) >= self.nepochs: - raise argparse.ArgumentTypeError("All batchsteps must be less than nepochs") + def __init__(self, path_or_tax_options: Union[TaxonomyPath, TaxometerOptions]): + self.path_or_tax_options = path_or_tax_options - if min(batchsteps, default=1) < 1: - raise argparse.ArgumentTypeError("All batchsteps must be 1 or higher") - self.batchsteps = batchsteps - assert isinstance(temp, (float, type(None))) - self.temp = temp +class MarkerPath: + def __init__(self, path: Path): + self.path = check_existing_file(path) -class TrainingOptions: - def __init__( - self, - encoder_options: EncoderOptions, - vae_options: Optional[VAETrainingOptions], - aae_options: Optional[AAETrainingOptions], - lrate: Optional[float], - ): - assert isinstance(lrate, (type(None), float)) +class PredictMarkers: + def __init__(self, hmm: Path, comp: CompositionOptions): + self.hmm = check_existing_file(hmm) + if not isinstance(comp.path, FASTAPath): + raise ValueError( + "If markers is predicted via --hmm, composition must be passed with --fasta" + ) + self.fasta = comp.path - assert (encoder_options.vae_options is None) == (vae_options is None) - assert (encoder_options.aae_options is None) == (aae_options is None) - if lrate is not None: - logger.warning( - "The --lrate argument is deprecated, and has no effect in Vamb 5 onwards" +class MarkerOptions: + @classmethod + def from_args(cls, comp: CompositionOptions, args): + markers = typeasserted(args.markers, (Path, type(None))) + hmm = typeasserted(args.hmmout_path, (Path, type(None))) + if not (markers is None) ^ (hmm is None): + raise ValueError( + "Exactly one of --markers or --hmmout_path must be set to input taxonomic information" ) + if markers is not None: + return cls(MarkerPath(markers)) + else: + assert isinstance(hmm, Path) # we just checked this above + return cls(PredictMarkers(hmm, comp)) + + def __init__(self, markers: Union[MarkerPath, PredictMarkers]): + self.content = markers + + +class KmeansOptions: + def __init__(self, clusters: Path): + self.clusters = check_existing_file(clusters) - self.vae_options = vae_options - self.aae_options = aae_options + +class DBScanOptions: + def __init__(self, taxonomy_options: RefinableTaxonomyOptions, n_processes: int): + self.taxonomy_options = taxonomy_options + self.n_processes = n_processes class ClusterOptions: @@ -460,21 +513,22 @@ class ClusterOptions: "window_size", "min_successes", "max_clusters", - "binsplitter", ] + @classmethod + def from_args(cls, args: argparse.Namespace): + return cls( + typeasserted(args.window_size, int), + typeasserted(args.min_successes, int), + typeasserted(args.max_clusters, (int, type(None))), + ) + def __init__( self, window_size: int, min_successes: int, max_clusters: Optional[int], - binsplit_separator: Optional[str], ): - assert isinstance(window_size, int) - assert isinstance(min_successes, int) - assert isinstance(max_clusters, (int, type(None))) - assert isinstance(binsplit_separator, (str, type(None))) - if window_size < 1: raise argparse.ArgumentTypeError("Window size must be at least 1") self.window_size = window_size @@ -488,51 +542,97 @@ def __init__( if max_clusters is not None and max_clusters < 1: raise argparse.ArgumentTypeError("Max clusters must be at least 1") self.max_clusters = max_clusters - self.binsplitter = vamb.vambtools.BinSplitter(binsplit_separator) -class VambOptions: - __slots__ = [ - "out_dir", - "n_threads", - "comp_options", - "min_fasta_output_size", - "seed", - "cuda", - ] +class AAEOptions: + @classmethod + def from_args( + cls, + basic: BasicTrainingOptions, + clustering: ClusterOptions, + args: argparse.Namespace, + ): + return cls( + basic, + clustering, + typeasserted(args.nhiddens_aae, int), + typeasserted(args.nlatent_aae_z, int), + typeasserted(args.nlatent_aae_y, int), + typeasserted(args.sl, float), + typeasserted(args.slr, float), + typeasserted(args.temp, float), + ) def __init__( self, - out_dir: Path, - n_threads: int, - comp_options: CompositionOptions, - min_fasta_output_size: Optional[int], - seed: int, - cuda: bool, + basic: BasicTrainingOptions, + clustering: ClusterOptions, + nhiddens: int, + nlatent_z: int, + nlatent_y: int, + sl: float, + slr: float, + temp: float, ): - assert isinstance(out_dir, Path) - assert isinstance(n_threads, int) - assert isinstance(comp_options, CompositionOptions) - assert isinstance(min_fasta_output_size, (int, type(None))) - assert isinstance(seed, int) - assert isinstance(cuda, bool) + # For the VAE-AAE workflow, we must cluster the full set of sequences, + # else the cluster dereplication step makes no sense, as the different + # clusterings will contain different sets of clusters. + # So, enforce this here + if clustering.max_clusters is not None: + raise ValueError( + "When using the VAE-AAE model, `max_clusters` (option `-c`) " + "must not be set." + ) - # Outdir does not exist - if out_dir.exists(): - raise FileExistsError(out_dir) + self.basic = basic + if nhiddens < 1: + raise argparse.ArgumentTypeError( + f"Minimum 1 neuron in the hidden layers, not {nhiddens}" + ) - # Outdir is in an existing parent dir - parent_dir = out_dir.parent - if not parent_dir.is_dir(): - raise NotADirectoryError(parent_dir) - self.out_dir = out_dir + self.nhiddens = nhiddens - if n_threads < 1: - raise ValueError(f"Must pass at least 1 thread, not {n_threads}") - self.n_threads = n_threads + if nlatent_z < 1: + raise argparse.ArgumentTypeError( + f"Minimum 1 latent neuron, not {nlatent_z}" + ) + self.nlatent_z = nlatent_z + + if nlatent_y < 1: + raise argparse.ArgumentTypeError( + f"Minimum 1 latent neuron, not {nlatent_y}" + ) + self.nlatent_y = nlatent_y + self.temp = temp + self.sl = sl + self.slr = slr + + +class BinOutputOptions: + __slots__ = ["binsplitter", "min_fasta_output_size"] + + # We take a composition as arguments, because if min_fasta_output_size is set, + # we need to guarantee that the composition is passed as fasta. + # Similarly, we currently don't have any workflows that can use BinOutputOptions + # without using composition. + + @classmethod + def from_args(cls, comp: CompositionOptions, args: argparse.Namespace): + return cls( + comp, + typeasserted(args.binsplit_separator, (str, type(None))), + typeasserted(args.min_fasta_output_size, (int, type(None))), + ) + def __init__( + self, + composition: CompositionOptions, + binsplit_separator: Optional[str], + min_fasta_output_size: Optional[int], + ): + self.binsplitter = vamb.vambtools.BinSplitter(binsplit_separator) if min_fasta_output_size is not None: - if not isinstance(comp_options.path, FASTAPath): + if not isinstance(composition.path, FASTAPath): raise argparse.ArgumentTypeError( "If minfasta is not None, " "input fasta file must be given explicitly" @@ -543,35 +643,192 @@ def __init__( ) self.min_fasta_output_size = min_fasta_output_size - if cuda and not torch.cuda.is_available(): - raise ModuleNotFoundError( - "Cuda is not available on your PyTorch installation" - ) - self.seed = seed - self.cuda = cuda + +@logger.catch(reraise=True) +def run( + runner: Callable[[], None], + general: GeneralOptions, +): + torch.set_num_threads(general.n_threads) + try_make_dir(general.out_dir) + logger.add(general.out_dir.joinpath("log.txt"), format=format_log) + begintime = time.time() + logger.info("Starting Vamb version " + vamb.__version_str__) + logger.info("Random seed is " + str(general.seed)) + logger.info(f"Invoked with CLI args: 'f{' '.join(sys.argv)}'") + runner() + logger.info(f"Completed Vamb in {round(time.time() - begintime, 2)} seconds.") + + +class BinnerCommonOptions: + @classmethod + def from_args(cls, args: argparse.Namespace): + comp = CompositionOptions.from_args(args) + return cls( + GeneralOptions.from_args(args), + comp, + AbundanceOptions.from_args(args), + ClusterOptions.from_args(args), + BinOutputOptions.from_args(comp, args), + ) + + # We do not have BasicTrainingOptions because that is model-specific + def __init__( + self, + general: GeneralOptions, + comp: CompositionOptions, + abundance: AbundanceOptions, + clustering: ClusterOptions, + output: BinOutputOptions, + ): + self.general = general + self.comp = comp + self.abundance = abundance + self.clustering = clustering + self.output = output + + +class BinDefaultOptions: + @classmethod + def from_args(cls, args: argparse.Namespace): + common = BinnerCommonOptions.from_args(args) + basic = BasicTrainingOptions.from_args_vae(args) + return cls( + common, + VAEOptions.from_args(basic, args), + ) + + def __init__( + self, + common: BinnerCommonOptions, + vae: VAEOptions, + ): + self.common = common + self.vae = vae + + +class BinTaxVambOptions: + @classmethod + def from_args(cls, args: argparse.Namespace): + common = BinnerCommonOptions.from_args(args) + basic = BasicTrainingOptions.from_args_vae(args) + vae = VAEOptions.from_args(basic, args) + taxonomy = RefinableTaxonomyOptions.from_args(args) + return cls(common, vae, taxonomy) + + # The VAEVAE models share the same settings as the VAE model, so we just use VAEOptions + def __init__( + self, + common: BinnerCommonOptions, + vae: VAEOptions, + taxonomy: RefinableTaxonomyOptions, + ): + self.common = common + self.vae = vae + self.taxonomy = taxonomy + + +class BinAvambOptions: + @classmethod + def from_args(cls, args: argparse.Namespace): + common = BinnerCommonOptions.from_args(args) + basic_vae = BasicTrainingOptions.from_args_vae(args) + basic_aae = BasicTrainingOptions.from_args_aae(args) + clustering = ClusterOptions.from_args(args) + return cls( + common, + VAEOptions.from_args(basic_vae, args), + AAEOptions.from_args(basic_aae, clustering, args), + ) + + def __init__( + self, + common: BinnerCommonOptions, + vae: VAEOptions, + aae: AAEOptions, + ): + self.common = common + self.vae = vae + self.aae = aae + + +class ReclusteringOptions: + @classmethod + def from_args(cls, args: argparse.Namespace): + general = GeneralOptions.from_args(args) + latent = args.latent_path + if latent is None: + raise ValueError("--latent_path must be set") + if args.algorithm == "kmeans": + clusters = args.clusters_path + if clusters is None: + raise ValueError( + "If --algorithm is set to 'kmeans', --clusters_path must be set" + ) + algorithm = KmeansOptions(clusters) + elif args.algorithm == "dbscan": + tax = RefinableTaxonomyOptions.from_args(args) + algorithm = DBScanOptions(tax, general.n_threads) + else: + assert False # no more algorithms + + if isinstance(algorithm, DBScanOptions) and isinstance( + algorithm.taxonomy_options.path_or_tax_options, TaxometerOptions + ): + comp = algorithm.taxonomy_options.path_or_tax_options.composition + else: + comp = CompositionOptions.from_args(args) + + markers = MarkerOptions.from_args(comp, args) + output = BinOutputOptions.from_args(comp, args) + return cls( + latent, + comp, + markers, + general, + output, + algorithm, + ) + + def __init__( + self, + latent_path: Path, + comp: CompositionOptions, + markers: MarkerOptions, + general: GeneralOptions, + output: BinOutputOptions, + algorithm: Union[KmeansOptions, DBScanOptions], + ): + self.latent_path = check_existing_file(latent_path) + self.composition = comp + self.markers = markers + self.general = general + self.output = output + self.algorithm = algorithm def calc_tnf( options: CompositionOptions, + min_contig_length: int, outdir: Path, binsplitter: vamb.vambtools.BinSplitter, ) -> vamb.parsecontigs.Composition: begintime = time.time() logger.info("Loading TNF") - logger.info(f"\tMinimum sequence length: {options.min_contig_length}") + logger.info(f"\tMinimum sequence length: {min_contig_length}") path = options.path if isinstance(path, CompositionPath): - logger.info(f"\tLoading composition from npz at: {path}") - composition = vamb.parsecontigs.Composition.load(str(path)) - composition.filter_min_length(options.min_contig_length) + logger.info(f"\tLoading composition from npz at: {path.path}") + composition = vamb.parsecontigs.Composition.load(path.path) + composition.filter_min_length(min_contig_length) else: assert isinstance(path, FASTAPath) - logger.info(f"\tLoading data from FASTA file {path}") - with vamb.vambtools.Reader(str(path)) as file: + logger.info(f"\tLoading data from FASTA file {path.path}") + with vamb.vambtools.Reader(path.path) as file: composition = vamb.parsecontigs.Composition.from_file( - file, minlength=options.min_contig_length + file, minlength=min_contig_length ) composition.save(outdir.joinpath("composition.npz")) @@ -592,7 +849,7 @@ def calc_tnf( if not np.all(composition.metadata.mask): n_removed = len(composition.metadata.mask) - np.sum(composition.metadata.mask) message = ( - f"The minimum sequence length has been set to {options.min_contig_length}, " + f"The minimum sequence length has been set to {min_contig_length}, " f"but {n_removed} sequences fell below this threshold and was filtered away." "\nBetter results are obtained if the sequence file is filtered to the minimum " "sequence length before mapping.\n" @@ -612,41 +869,43 @@ def calc_tnf( def calc_abundance( abundance_options: AbundanceOptions, outdir: Path, + refcheck: bool, comp_metadata: vamb.parsecontigs.CompositionMetaData, nthreads: int, ) -> vamb.parsebam.Abundance: begintime = time.time() logger.info("Loading depths") logger.info( - f'\tReference hash: {comp_metadata.refhash.hex() if abundance_options.refcheck else "None"}' + f'\tReference hash: {comp_metadata.refhash.hex() if refcheck else "None"}' ) - path = abundance_options.path - if isinstance(path, AbundancePath): - logger.info(f"\tLoading depths from npz at: {str(path)}") + paths = abundance_options.paths + if isinstance(paths, AbundancePath): + logger.info(f"\tLoading depths from npz at: {str(paths)}") abundance = vamb.parsebam.Abundance.load( - path, comp_metadata.refhash if abundance_options.refcheck else None + paths.abundance, + comp_metadata.refhash if refcheck else None, ) # I don't want this check in any constructors of abundance, since the constructors # should be able to skip this check in case comp and abundance are independent. # But when running the main Vamb workflow, we need to assert this. if abundance.nseqs != comp_metadata.nseqs: - assert not abundance_options.refcheck + assert not refcheck raise ValueError( f"Loaded abundance has {abundance.nseqs} sequences, " f"but composition has {comp_metadata.nseqs}." ) - elif isinstance(path, BAMPaths): - logger.info(f"\tParsing {len(path)} BAM files with {nthreads} threads") - logger.info(f"\tMin identity: {abundance_options.min_alignment_id}") + elif isinstance(paths, BAMPaths): + logger.info(f"\tParsing {len(paths.paths)} BAM files with {nthreads} threads") + logger.info(f"\tMin identity: {paths.min_alignment_id}") abundance = vamb.parsebam.Abundance.from_files( - list(path), + list(paths.paths), outdir.joinpath("tmp").joinpath("pycoverm"), comp_metadata, - abundance_options.refcheck, - abundance_options.min_alignment_id, + refcheck, + paths.min_alignment_id, nthreads, ) abundance.save(outdir.joinpath("abundance.npz")) @@ -654,8 +913,8 @@ def calc_abundance( logger.info("\tOrder of columns is:") for i, samplename in enumerate(abundance.samplenames): logger.info(f"\t{i:>6}: {samplename}") - elif isinstance(path, AEMBPaths): - paths = list(path) + elif isinstance(paths, AEMBPaths): + paths = list(paths.paths) logger.info(f"\tParsing {len(paths)} AEMB TSV files") abundance = vamb.parsebam.Abundance.from_aemb( paths, @@ -675,11 +934,28 @@ def calc_abundance( return abundance +def load_composition_and_abundance( + vamb_options: GeneralOptions, + comp_options: CompositionOptions, + abundance_options: AbundanceOptions, + binsplitter: vamb.vambtools.BinSplitter, +) -> Tuple[vamb.parsecontigs.Composition, vamb.parsebam.Abundance]: + composition = calc_tnf( + comp_options, vamb_options.min_contig_length, vamb_options.out_dir, binsplitter + ) + abundance = calc_abundance( + abundance_options, + vamb_options.out_dir, + vamb_options.refcheck, + composition.metadata, + vamb_options.n_threads, + ) + return (composition, abundance) + + def trainvae( vae_options: VAEOptions, - training_options: VAETrainingOptions, - vamb_options: VambOptions, - alpha: Optional[float], + vamb_options: GeneralOptions, data_loader: DataLoader, ) -> np.ndarray: begintime = time.time() @@ -690,7 +966,7 @@ def trainvae( nsamples, nhiddens=vae_options.nhiddens, nlatent=vae_options.nlatent, - alpha=alpha, + alpha=vae_options.alpha, beta=vae_options.beta, dropout=vae_options.dropout, cuda=vamb_options.cuda, @@ -700,9 +976,11 @@ def trainvae( logger.info("\tCreated VAE") modelpath = vamb_options.out_dir.joinpath("model.pt") vae.trainmodel( - vamb.encode.set_batchsize(data_loader, training_options.batchsize), - nepochs=training_options.nepochs, - batchsteps=training_options.batchsteps, + vamb.encode.set_batchsize( + data_loader, vae_options.basic_options.starting_batch_size + ), + nepochs=vae_options.basic_options.num_epochs, + batchsteps=vae_options.basic_options.batch_steps, modelfile=modelpath, ) @@ -720,8 +998,7 @@ def trainvae( def trainaae( data_loader: DataLoader, aae_options: AAEOptions, - training_options: AAETrainingOptions, - vamb_options: VambOptions, + vamb_options: GeneralOptions, alpha: Optional[float], # set automatically if None contignames: Sequence[str], ) -> tuple[np.ndarray, dict[str, set[str]]]: @@ -744,10 +1021,10 @@ def trainaae( logger.info("\tCreated AAE") modelpath = os.path.join(vamb_options.out_dir, "aae_model.pt") aae.trainmodel( - vamb.encode.set_batchsize(data_loader, training_options.batchsize), - training_options.nepochs, - training_options.batchsteps, - training_options.temp, + vamb.encode.set_batchsize(data_loader, aae_options.basic.starting_batch_size), + aae_options.basic.num_epochs, + aae_options.basic.batch_steps, + aae_options.temp, modelpath, ) @@ -765,15 +1042,35 @@ def trainaae( return latent, clusters_y_dict +class FastaOutput(NamedTuple): + existing_fasta_path: FASTAPath + bins_dir_to_populate: Path # (or to create, if not existing) + min_fasta_size: int + + @classmethod + def try_from_common(cls, common: BinnerCommonOptions): + if common.output.min_fasta_output_size is not None: + # We have checked this in the BinOutputOptions constructor + assert isinstance(common.comp.path, FASTAPath) + return cls( + common.comp.path, + common.general.out_dir.joinpath("bins"), + common.output.min_fasta_output_size, + ) + else: + return None + + def cluster_and_write_files( - vamb_options: VambOptions, cluster_options: ClusterOptions, - base_clusters_name: str, # e.g. /foo/bar/vae -> /foo/bar/vae_unsplit.tsv - bins_dir: Path, - fasta_catalogue: Path, + binsplitter: vamb.vambtools.BinSplitter, latent: np.ndarray, sequence_names: Sequence[str], - sequence_lens: Sequence[int], + sequence_lens: np.ndarray, + seed: int, + cuda: bool, + base_clusters_name: str, # e.g. /foo/bar/vae -> /foo/bar/vae_unsplit.tsv + fasta_output: Optional[FastaOutput], ): begintime = time.time() # Create cluser iterator @@ -783,18 +1080,18 @@ def cluster_and_write_files( f"\tMin successful thresholds detected: {cluster_options.min_successes}", ) logger.info(f"\tMax clusters: {cluster_options.max_clusters}") - logger.info(f"\tUse CUDA for clustering: {vamb_options.cuda}") - logger.info(f"\tBinsplitter: {cluster_options.binsplitter.log_string()}") + logger.info(f"\tUse CUDA for clustering: {cuda}") + logger.info(f"\tBinsplitter: {binsplitter.log_string()}") cluster_generator = vamb.cluster.ClusterGenerator( latent, - sequence_lens, # type:ignore + sequence_lens, windowsize=cluster_options.window_size, minsuccesses=cluster_options.min_successes, destroy=True, normalized=False, - cuda=vamb_options.cuda, - rng_seed=vamb_options.seed, + cuda=cuda, + rng_seed=seed, ) # This also works correctly when max_clusters is None clusters = itertools.islice(cluster_generator, cluster_options.max_clusters) @@ -825,24 +1122,20 @@ def cluster_and_write_files( elapsed = round(time.time() - begintime, 2) write_clusters_and_bins( - vamb_options, - cluster_options.binsplitter, + fasta_output, + binsplitter, base_clusters_name, - bins_dir, - fasta_catalogue, cluster_dict, sequence_names, - sequence_lens, + cast(Sequence[int], sequence_lens), ) logger.info(f"\tClustered contigs in {elapsed} seconds.\n") def write_clusters_and_bins( - vamb_options: VambOptions, + fasta_output: Optional[FastaOutput], binsplitter: vamb.vambtools.BinSplitter, base_clusters_name: str, # e.g. /foo/bar/vae -> /foo/bar/vae_unsplit.tsv - bins_dir: Path, - fasta_catalogue: Path, clusters: dict[str, set[str]], sequence_names: Sequence[str], sequence_lens: Sequence[int], @@ -872,18 +1165,18 @@ def write_clusters_and_bins( logger.info(f"\tWrote cluster file(s) in {elapsed} seconds.") # Write bins, if necessary - if vamb_options.min_fasta_output_size is not None: + if fasta_output is not None: starttime = time.time() filtered_clusters: dict[str, set[str]] = dict() assert len(sequence_lens) == len(sequence_names) sizeof = dict(zip(sequence_names, sequence_lens)) for binname, contigs in clusters.items(): - if sum(sizeof[c] for c in contigs) >= vamb_options.min_fasta_output_size: + if sum(sizeof[c] for c in contigs) >= fasta_output.min_fasta_size: filtered_clusters[binname] = contigs - with vamb.vambtools.Reader(fasta_catalogue) as file: + with vamb.vambtools.Reader(fasta_output.existing_fasta_path.path) as file: vamb.vambtools.write_bins( - bins_dir, + fasta_output.bins_dir_to_populate, filtered_clusters, file, None, @@ -896,40 +1189,51 @@ def write_clusters_and_bins( ) -def load_composition_and_abundance( - vamb_options: VambOptions, - comp_options: CompositionOptions, - abundance_options: AbundanceOptions, - binsplitter: vamb.vambtools.BinSplitter, -) -> Tuple[vamb.parsecontigs.Composition, vamb.parsebam.Abundance]: - composition = calc_tnf(comp_options, vamb_options.out_dir, binsplitter) - abundance = calc_abundance( - abundance_options, - vamb_options.out_dir, - composition.metadata, - vamb_options.n_threads, +def run_bin_default(opt: BinDefaultOptions): + composition, abundance = load_composition_and_abundance( + vamb_options=opt.common.general, + comp_options=opt.common.comp, + abundance_options=opt.common.abundance, + binsplitter=opt.common.output.binsplitter, ) - return (composition, abundance) + data_loader = vamb.encode.make_dataloader( + abundance.matrix, + composition.matrix, + composition.metadata.lengths, + 256, # dummy value - we change this before using the actual loader + destroy=True, + cuda=opt.common.general.cuda, + ) + latent = trainvae( + vae_options=opt.vae, + vamb_options=opt.common.general, + data_loader=data_loader, + ) + # Free up memory + comp_metadata = composition.metadata + del composition, abundance + assert comp_metadata.nseqs == len(latent) + cluster_and_write_files( + opt.common.clustering, + opt.common.output.binsplitter, + latent, + cast(Sequence[str], comp_metadata.identifiers), + comp_metadata.lengths, + opt.common.general.seed, + opt.common.general.cuda, + str(opt.common.general.out_dir.joinpath("vae_clusters")), + FastaOutput.try_from_common(opt.common), + ) + del latent -def run( - vamb_options: VambOptions, - comp_options: CompositionOptions, - abundance_options: AbundanceOptions, - encoder_options: EncoderOptions, - training_options: TrainingOptions, - cluster_options: ClusterOptions, -): - vae_options = encoder_options.vae_options - aae_options = encoder_options.aae_options - vae_training_options = training_options.vae_options - aae_training_options = training_options.aae_options +def run_bin_aae(opt: BinAvambOptions): composition, abundance = load_composition_and_abundance( - vamb_options=vamb_options, - comp_options=comp_options, - abundance_options=abundance_options, - binsplitter=cluster_options.binsplitter, + vamb_options=opt.common.general, + comp_options=opt.common.comp, + abundance_options=opt.common.abundance, + binsplitter=opt.common.output.binsplitter, ) data_loader = vamb.encode.make_dataloader( abundance.matrix, @@ -937,81 +1241,42 @@ def run( composition.metadata.lengths, 256, # dummy value - we change this before using the actual loader destroy=True, - cuda=vamb_options.cuda, + cuda=opt.common.general.cuda, + ) + latent_z, clusters_y_dict = trainaae( + data_loader=data_loader, + aae_options=opt.aae, + vamb_options=opt.common.general, + alpha=opt.vae.alpha, + contignames=cast(Sequence[str], composition.metadata.identifiers), ) - - latent = None - if vae_options is not None: - assert vae_training_options is not None - # Train, save model - latent = trainvae( - vae_options=vae_options, - training_options=vae_training_options, - vamb_options=vamb_options, - alpha=encoder_options.alpha, - data_loader=data_loader, - ) - - clusters_y_dict = None - latent_z = None - if aae_options is not None: - assert aae_training_options is not None - # Train, save model - latent_z, clusters_y_dict = trainaae( - data_loader=data_loader, - aae_options=aae_options, - vamb_options=vamb_options, - training_options=aae_training_options, - alpha=encoder_options.alpha, - contignames=composition.metadata.identifiers, # type:ignore - ) - # Free up memory comp_metadata = composition.metadata del composition, abundance - - if vae_options is not None: - assert latent is not None - assert comp_metadata.nseqs == len(latent) - cluster_and_write_files( - vamb_options, - cluster_options, - str(vamb_options.out_dir.joinpath("vae_clusters")), - vamb_options.out_dir.joinpath("bins"), - comp_options.path, - latent, - comp_metadata.identifiers, # type:ignore - comp_metadata.lengths, # type:ignore - ) - del latent - - if aae_options is not None: - assert latent_z is not None - assert clusters_y_dict is not None - assert comp_metadata.nseqs == len(latent_z) - cluster_and_write_files( - vamb_options, - cluster_options, - str(vamb_options.out_dir.joinpath("aae_z_clusters")), - vamb_options.out_dir.joinpath("bins"), - comp_options.path, - latent_z, - comp_metadata.identifiers, # type:ignore - comp_metadata.lengths, # type:ignore - ) - del latent_z - # We enforce this in the VAEAAEOptions constructor, see comment there - assert cluster_options.max_clusters is None - write_clusters_and_bins( - vamb_options, - cluster_options.binsplitter, - str(vamb_options.out_dir.joinpath("aae_y_clusters")), - vamb_options.out_dir.joinpath("bins"), - comp_options.path, - clusters_y_dict, - comp_metadata.identifiers, # type:ignore - comp_metadata.lengths, # type:ignore - ) + assert comp_metadata.nseqs == len(latent_z) + cluster_and_write_files( + opt.common.clustering, + opt.common.output.binsplitter, + latent_z, + cast(Sequence[str], comp_metadata.identifiers), + comp_metadata.lengths, + opt.common.general.seed, + opt.common.general.cuda, + str(opt.common.general.out_dir.joinpath("aae_z_clusters")), + FastaOutput.try_from_common(opt.common), + ) + del latent_z + + # We enforce this in the VAEAAEOptions constructor, see comment there + assert opt.common.clustering.max_clusters is None + write_clusters_and_bins( + FastaOutput.try_from_common(opt.common), + binsplitter=opt.common.output.binsplitter, + base_clusters_name=str(opt.common.general.out_dir.joinpath("aae_y_clusters")), + clusters=clusters_y_dict, + sequence_names=cast(Sequence[str], comp_metadata.identifiers), + sequence_lens=cast(Sequence[int], comp_metadata.lengths), + ) def predict_taxonomy( @@ -1021,14 +1286,14 @@ def predict_taxonomy( contignames: np.ndarray, taxonomy_path: Path, out_dir: Path, - predictor_training_options: PredictorTrainingOptions, + taxonomy_options: TaxometerOptions, cuda: bool, ): begintime = time.time() taxonomies = vamb.vambtools.parse_taxonomy( input_file=taxonomy_path, - contignames=contignames, # type:ignore + contignames=cast(Sequence[str], contignames), is_canonical=False, ) nodes, ind_nodes, table_parent = vamb.taxvamb_encode.make_graph(taxonomies) @@ -1047,7 +1312,7 @@ def predict_taxonomy( nodes, table_parent, nhiddens=[512, 512, 512, 512], - hier_loss=predictor_training_options.ploss, + hier_loss=taxonomy_options.ploss, cuda=cuda, ) @@ -1055,7 +1320,7 @@ def predict_taxonomy( abundance, tnfs, lengths, - batchsize=predictor_training_options.batchsize, + batchsize=taxonomy_options.basic.starting_batch_size, ) dataloader_joint = vamb.taxvamb_encode.make_dataloader_concat_hloss( abundance, @@ -1064,22 +1329,22 @@ def predict_taxonomy( targets, len(nodes), table_parent, - batchsize=predictor_training_options.batchsize, + batchsize=taxonomy_options.basic.starting_batch_size, ) logger.info("\tCreated dataloader") predictortime = time.time() logger.info("Starting training the taxonomy predictor") - logger.info(f"Using threshold {predictor_training_options.softmax_threshold}") + logger.info(f"Using threshold {taxonomy_options.softmax_threshold}") model_path = out_dir.joinpath("predictor_model.pt") with open(model_path, "wb") as modelfile: model.trainmodel( dataloader_joint, - nepochs=predictor_training_options.nepochs, + nepochs=taxonomy_options.basic.num_epochs, modelfile=modelfile, - batchsteps=predictor_training_options.batchsteps, + batchsteps=taxonomy_options.basic.batch_steps, ) logger.info( f"Finished training the taxonomy predictor in {round(time.time() - predictortime, 2)} seconds." @@ -1089,7 +1354,7 @@ def predict_taxonomy( predicted_path = out_dir.joinpath("results_taxometer.tsv") nodes_ar = np.array(nodes) - logger.info(f"Using threshold {predictor_training_options.softmax_threshold}") + logger.info(f"Using threshold {taxonomy_options.softmax_threshold}") row = 0 output_exists = False for predicted_vector, predicted_labels in model.predict(dataloader_vamb): @@ -1100,9 +1365,7 @@ def predict_taxonomy( label = predicted_labels[i] while table_parent[label] != -1: label = table_parent[label] - threshold_mask = ( - predicted_vector[i] > predictor_training_options.softmax_threshold - ) + threshold_mask = predicted_vector[i] > taxonomy_options.softmax_threshold pred_line = ";".join(nodes_ar[threshold_mask][1:]) predictions.append(pred_line) absolute_probs = predicted_vector[i][threshold_mask] @@ -1124,18 +1387,12 @@ def predict_taxonomy( ) -def run_taxonomy_predictor( - vamb_options: VambOptions, - comp_options: CompositionOptions, - abundance_options: AbundanceOptions, - predictor_training_options: PredictorTrainingOptions, - taxonomy_path: Path, -): +def run_taxonomy_predictor(opt: TaxometerOptions): composition, abundance = load_composition_and_abundance( - vamb_options=vamb_options, - comp_options=comp_options, - abundance_options=abundance_options, - binsplitter=vamb.vambtools.BinSplitter.inert_splitter(), + opt.general, + opt.composition, + opt.abundance, + vamb.vambtools.BinSplitter.inert_splitter(), ) abundance, tnfs, lengths, contignames = ( abundance.matrix, @@ -1148,28 +1405,19 @@ def run_taxonomy_predictor( tnfs=tnfs, lengths=lengths, contignames=contignames, - taxonomy_path=taxonomy_path, - out_dir=vamb_options.out_dir, - predictor_training_options=predictor_training_options, - cuda=vamb_options.cuda, + taxonomy_path=opt.taxonomy.path, + out_dir=opt.general.out_dir, + taxonomy_options=opt, + cuda=opt.general.cuda, ) -def run_vaevae( - vamb_options: VambOptions, - comp_options: CompositionOptions, - abundance_options: AbundanceOptions, - vae_training_options: VAETrainingOptions, - predictor_training_options: PredictorTrainingOptions, - taxonomy_options: TaxonomyOptions, - cluster_options: ClusterOptions, - encoder_options: EncoderOptions, -): - vae_options = encoder_options.vae_options +def run_vaevae(opt: BinTaxVambOptions): + vae_options = opt.vae composition, abundance = load_composition_and_abundance( - vamb_options=vamb_options, - comp_options=comp_options, - abundance_options=abundance_options, + vamb_options=opt.common.general, + comp_options=opt.common.comp, + abundance_options=opt.common.abundance, binsplitter=vamb.vambtools.BinSplitter.inert_splitter(), ) abundance, tnfs, lengths, contignames = ( @@ -1178,40 +1426,30 @@ def run_vaevae( composition.metadata.lengths, composition.metadata.identifiers, ) - - if taxonomy_options.taxonomy_path is not None and not taxonomy_options.no_predictor: + if isinstance(opt.taxonomy.path_or_tax_options, TaxometerOptions): logger.info("Predicting missing values from taxonomy") predict_taxonomy( abundance=abundance, tnfs=tnfs, lengths=lengths, contignames=contignames, - taxonomy_path=taxonomy_options.taxonomy_path, - out_dir=vamb_options.out_dir, - predictor_training_options=predictor_training_options, - cuda=vamb_options.cuda, + taxonomy_path=opt.taxonomy.path_or_tax_options.taxonomy.path, + out_dir=opt.common.general.out_dir, + taxonomy_options=opt.taxonomy.path_or_tax_options, + cuda=opt.common.general.cuda, ) - predictions_path = vamb_options.out_dir.joinpath("results_taxometer.tsv") - else: - logger.info("Not predicting the taxonomy") - predictions_path = None - - if predictions_path is not None: contig_taxonomies = vamb.vambtools.parse_taxonomy( - input_file=predictions_path, - contignames=contignames, # type:ignore + input_file=opt.common.general.out_dir.joinpath("results_taxometer.tsv"), + contignames=cast(Sequence[str], contignames), is_canonical=False, ) - elif taxonomy_options.no_predictor: - logger.info("Using taxonomy for semisupervised learning") - assert taxonomy_options.taxonomy_path is not None + else: + logger.info("Not predicting the taxonomy") contig_taxonomies = vamb.vambtools.parse_taxonomy( - input_file=taxonomy_options.taxonomy_path, - contignames=contignames, # type:ignore + input_file=opt.taxonomy.path_or_tax_options.path, + contignames=cast(Sequence[str], contignames), is_canonical=False, ) - else: - raise argparse.ArgumentTypeError("One of the taxonomy arguments is missing") nodes, ind_nodes, table_parent = vamb.taxvamb_encode.make_graph(contig_taxonomies) classes_order: list[str] = [] @@ -1222,7 +1460,6 @@ def run_vaevae( classes_order.append(i.ranks[-1]) targets = np.array([ind_nodes[i] for i in classes_order]) - assert vae_options is not None vae = vamb.taxvamb_encode.VAEVAEHLoss( abundance.shape[1], len(nodes), @@ -1230,18 +1467,18 @@ def run_vaevae( table_parent, nhiddens=vae_options.nhiddens, nlatent=vae_options.nlatent, - alpha=encoder_options.alpha, + alpha=vae_options.alpha, beta=vae_options.beta, dropout=vae_options.dropout, - cuda=vamb_options.cuda, + cuda=opt.common.general.cuda, ) dataloader_vamb = vamb.encode.make_dataloader( abundance, tnfs, lengths, - batchsize=vae_training_options.batchsize, - cuda=vamb_options.cuda, + batchsize=vae_options.basic_options.starting_batch_size, + cuda=opt.common.general.cuda, ) dataloader_joint = vamb.taxvamb_encode.make_dataloader_concat_hloss( abundance, @@ -1250,8 +1487,8 @@ def run_vaevae( targets, len(nodes), table_parent, - batchsize=vae_training_options.batchsize, - cuda=vamb_options.cuda, + batchsize=vae_options.basic_options.starting_batch_size, + cuda=opt.common.general.cuda, ) dataloader_labels = vamb.taxvamb_encode.make_dataloader_labels_hloss( abundance, @@ -1260,8 +1497,8 @@ def run_vaevae( targets, len(nodes), table_parent, - batchsize=vae_training_options.batchsize, - cuda=vamb_options.cuda, + batchsize=vae_options.basic_options.starting_batch_size, + cuda=opt.common.general.cuda, ) shapes = (abundance.shape[1], 103, 1, len(nodes)) @@ -1272,343 +1509,170 @@ def run_vaevae( len(nodes), table_parent, shapes, - vamb_options.seed, - batchsize=vae_training_options.batchsize, - cuda=vamb_options.cuda, + opt.common.general.seed, + batchsize=vae_options.basic_options.starting_batch_size, + cuda=opt.common.general.cuda, ) - model_path = vamb_options.out_dir.joinpath("vaevae_model.pt") + model_path = opt.common.general.out_dir.joinpath("vaevae_model.pt") with open(model_path, "wb") as modelfile: vae.trainmodel( dataloader, - nepochs=vae_training_options.nepochs, + nepochs=vae_options.basic_options.num_epochs, modelfile=modelfile, - batchsteps=vae_training_options.batchsteps, + batchsteps=vae_options.basic_options.batch_steps, ) latent_both = vae.VAEJoint.encode(dataloader_joint) logger.info(f"{latent_both.shape} embedding shape") - latent_path = vamb_options.out_dir.joinpath("vaevae_latent.npz") + latent_path = opt.common.general.out_dir.joinpath("vaevae_latent.npz") vamb.vambtools.write_npz(latent_path, latent_both) # Cluster, save tsv file cluster_and_write_files( - vamb_options, - cluster_options, - str(vamb_options.out_dir.joinpath("vaevae_clusters")), - vamb_options.out_dir.joinpath("bins"), - comp_options.path, + opt.common.clustering, + opt.common.output.binsplitter, latent_both, - contignames, # type:ignore - lengths, # type:ignore + cast(Sequence[str], contignames), + lengths, + opt.common.general.seed, + opt.common.general.cuda, + str(opt.common.general.out_dir.joinpath("vaevae_clusters")), + FastaOutput.try_from_common(opt.common), ) -def run_reclustering( - vamb_options: VambOptions, - comp_options: CompositionOptions, - abundance_options: AbundanceOptions, - reclustering_options: ReclusteringOptions, - taxonomy_options: TaxonomyOptions, -): - composition, _ = load_composition_and_abundance( - vamb_options=vamb_options, - comp_options=comp_options, - abundance_options=abundance_options, - binsplitter=reclustering_options.binsplitter, +def run_reclustering(opt: ReclusteringOptions): + composition = calc_tnf( + opt.composition, + opt.general.min_contig_length, + opt.general.out_dir, + opt.output.binsplitter, ) - logger.info(f"{len(composition.metadata.identifiers)} contig names after filtering") logger.info("Taxonomy predictions are provided") - predictions_path = taxonomy_options.taxonomy_path - + markers = opt.markers.content + assert isinstance(markers, PredictMarkers) reclustered = vamb.reclustering.recluster_bins( - reclustering_options.clusters_path, - reclustering_options.latent_path, - str(comp_options.path), + None if isinstance(opt.algorithm, DBScanOptions) else opt.algorithm.clusters, + opt.latent_path, + str(opt.composition.path.path), composition.metadata.identifiers, - out_dir=vamb_options.out_dir, + out_dir=opt.general.out_dir, minfasta=0, binned_length=1000, num_process=40, - random_seed=int(random.uniform(0, 2**32 - 1)), - hmmout_path=reclustering_options.hmmout_path, - algorithm=reclustering_options.algorithm, - predictions_path=predictions_path, + random_seed=opt.general.seed, + hmmout_path=markers.hmm, + algorithm="kmeans" if isinstance(opt.algorithm, KmeansOptions) else "dbscan", + predictions_path=None + if isinstance(opt.algorithm, KmeansOptions) + else ( + opt.algorithm.taxonomy_options.path_or_tax_options.path + if isinstance( + opt.algorithm.taxonomy_options.path_or_tax_options, TaxonomyPath + ) + else opt.algorithm.taxonomy_options.path_or_tax_options.taxonomy.path + ), ) - cluster_dict: defaultdict[str, set[str]] = defaultdict(set) for k, v in zip(reclustered, composition.metadata.identifiers): - cluster_dict[k].add(v) + cluster_dict[cast(str, k)].add(v) + + if opt.output.min_fasta_output_size is None: + fasta_output = None + else: + assert isinstance(opt.composition.path, FASTAPath) + fasta_output = FastaOutput( + opt.composition.path, + opt.general.out_dir.joinpath("bins"), + opt.output.min_fasta_output_size, + ) write_clusters_and_bins( - vamb_options, - reclustering_options.binsplitter, - str(vamb_options.out_dir.joinpath("clusters_reclustered")), - vamb_options.out_dir.joinpath("bins"), - comp_options.path, + fasta_output, + opt.output.binsplitter, + str(opt.general.out_dir.joinpath("clusters_reclustered")), cluster_dict, - composition.metadata.identifiers, # type:ignore - composition.metadata.lengths, # type:ignore + cast(Sequence[str], composition.metadata.identifiers), + cast(Sequence[int], composition.metadata.lengths), ) -class BasicArguments(object): - LOGS_PATH = "log.txt" - - def __init__(self, args): - self.args = args - self.comp_options = CompositionOptions( - self.args.fasta, - self.args.composition, - self.args.minlength, - ) - self.abundance_options = AbundanceOptions( - self.args.bampaths, - self.args.bamdir, - self.args.aemb, - self.args.abundancepath, - self.args.min_alignment_id, - not self.args.norefcheck, - ) - self.vamb_options = VambOptions( - self.args.outdir, - self.args.nthreads, - self.comp_options, - self.args.min_fasta_output_size, - self.args.seed, - self.args.cuda, - ) - - def run_inner(self): - raise NotImplementedError - - @logger.catch(reraise=True) - def run(self): - torch.set_num_threads(self.vamb_options.n_threads) - try_make_dir(self.vamb_options.out_dir) - logger.add( - self.vamb_options.out_dir.joinpath(self.LOGS_PATH), format=format_log - ) - begintime = time.time() - logger.info("Starting Vamb version " + vamb.__version_str__) - logger.info("Random seed is " + str(self.vamb_options.seed)) - logger.info(f"Invoked with CLI args: 'f{' '.join(sys.argv)}'") - self.run_inner() - logger.info(f"Completed Vamb in {round(time.time() - begintime, 2)} seconds.") - - -class TaxometerArguments(BasicArguments): - def __init__(self, args): - super(TaxometerArguments, self).__init__(args) - self.taxonomy_options = TaxonomyOptions( - taxonomy_path=args.taxonomy, - no_predictor=False, - ) - self.predictor_training_options = PredictorTrainingOptions( - nepochs=args.pred_nepochs, - batchsize=args.pred_batchsize, - batchsteps=[], - softmax_threshold=args.pred_softmax_threshold, - ploss=args.ploss, - ) - - def run_inner(self): - run_taxonomy_predictor( - vamb_options=self.vamb_options, - comp_options=self.comp_options, - abundance_options=self.abundance_options, - predictor_training_options=self.predictor_training_options, - taxonomy_path=self.taxonomy_options.taxonomy_path, - ) - - -class BinnerArguments(BasicArguments): - def __init__(self, args): - super(BinnerArguments, self).__init__(args) - self.vae_options = VAEOptions( - nhiddens=args.nhiddens, - nlatent=args.nlatent, - beta=args.beta, - dropout=args.dropout, - ) - self.vae_training_options = VAETrainingOptions( - nepochs=args.nepochs, - batchsize=args.batchsize, - batchsteps=args.batchsteps, - ) - self.aae_options = None - self.aae_training_options = None - self.cluster_options = ClusterOptions( - args.window_size, - args.min_successes, - args.max_clusters, - args.binsplit_separator, - ) - - -class VAEAAEArguments(BinnerArguments): - def __init__(self, args): - super(VAEAAEArguments, self).__init__(args) - - # For the VAE-AAE workflow, we must cluster the full set of sequences, - # else the cluster dereplication step makes no sense, as the different - # clusterings will contain different sets of clusters. - # So, enforce this here - if self.cluster_options.max_clusters is not None: - raise ValueError( - "When using the VAE-AAE model, `max_clusters` (option `-c`) " - "must not be set." - ) - - self.aae_options = AAEOptions( - nhiddens=args.nhiddens_aae, - nlatent_z=args.nlatent_aae_z, - nlatent_y=args.nlatent_aae_y, - sl=args.sl, - slr=args.slr, - ) - self.aae_training_options = AAETrainingOptions( - nepochs=args.nepochs_aae, - batchsize=args.batchsize_aae, - batchsteps=args.batchsteps_aae, - temp=args.temp, - ) - self.init_encoder_and_training() - - def init_encoder_and_training(self): - self.encoder_options = EncoderOptions( - vae_options=self.vae_options, - aae_options=self.aae_options, - alpha=self.args.alpha, - ) - self.training_options = TrainingOptions( - encoder_options=self.encoder_options, - vae_options=self.vae_training_options, - aae_options=self.aae_training_options, - lrate=self.args.lrate, - ) - - def run_inner(self): - run( - vamb_options=self.vamb_options, - comp_options=self.comp_options, - abundance_options=self.abundance_options, - encoder_options=self.encoder_options, - training_options=self.training_options, - cluster_options=self.cluster_options, - ) - - -class VAEArguments(BinnerArguments): - def __init__(self, args): - super(VAEArguments, self).__init__(args) - self.encoder_options = EncoderOptions( - vae_options=self.vae_options, - aae_options=None, - alpha=self.args.alpha, - ) - self.training_options = TrainingOptions( - encoder_options=self.encoder_options, - vae_options=self.vae_training_options, - aae_options=None, - lrate=self.args.lrate, - ) - - def run_inner(self): - run( - vamb_options=self.vamb_options, - comp_options=self.comp_options, - abundance_options=self.abundance_options, - encoder_options=self.encoder_options, - training_options=self.training_options, - cluster_options=self.cluster_options, - ) - - -class VAEVAEArguments(BinnerArguments): - def __init__(self, args): - super(VAEVAEArguments, self).__init__(args) - self.encoder_options = EncoderOptions( - vae_options=self.vae_options, - aae_options=None, - alpha=self.args.alpha, - ) - self.training_options = TrainingOptions( - encoder_options=self.encoder_options, - vae_options=self.vae_training_options, - aae_options=None, - lrate=self.args.lrate, - ) - self.taxonomy_options = TaxonomyOptions( - taxonomy_path=self.args.taxonomy, - no_predictor=self.args.no_predictor, - ) - self.predictor_training_options = PredictorTrainingOptions( - nepochs=self.args.pred_nepochs, - batchsize=self.args.pred_batchsize, - batchsteps=[], - softmax_threshold=self.args.pred_softmax_threshold, - ploss=self.args.ploss, - ) +def add_help_arguments(parser: argparse.ArgumentParser): + helpos = parser.add_argument_group(title="Help and version", description=None) + helpos.add_argument("-h", "--help", help="print help and exit", action="help") + helpos.add_argument( + "--version", + action="version", + version=f"Vamb {vamb.__version_str__}", + ) - def run_inner(self): - run_vaevae( - vamb_options=self.vamb_options, - comp_options=self.comp_options, - abundance_options=self.abundance_options, - taxonomy_options=self.taxonomy_options, - vae_training_options=self.vae_training_options, - predictor_training_options=self.predictor_training_options, - cluster_options=self.cluster_options, - encoder_options=self.encoder_options, - ) +def add_general_arguments(subparser: argparse.ArgumentParser): + add_help_arguments(subparser) + reqos = subparser.add_argument_group(title="Output (required)", description=None) + reqos.add_argument( + "--outdir", + metavar="", + type=Path, + # required=True, # Setting this flag breaks the argparse with positional arguments. The existence of the folder is checked elsewhere + help="output directory to create", + required=True, + ) -class ReclusteringArguments(BasicArguments): - def __init__(self, args): - super(ReclusteringArguments, self).__init__(args) - self.reclustering_options = ReclusteringOptions( - latent_path=args.latent_path, - clusters_path=args.clusters_path, - binsplit_separator=args.binsplit_separator_recluster, - hmmout_path=args.hmmout_path, - algorithm=args.algorithm, - ) - if self.args.algorithm == "dbscan": - self.preds = self.args.taxonomy_predictions - else: - self.preds = self.args.clusters_path - self.taxonomy_options = TaxonomyOptions( - taxonomy_path=self.preds, - no_predictor=False, - ) + general = subparser.add_argument_group( + title="General optional arguments", description=None + ) + general.add_argument( + "-m", + dest="minlength", + metavar="", + type=int, + default=2000, + help="ignore contigs shorter than this [2000]", + ) - def run_inner(self): - run_reclustering( - vamb_options=self.vamb_options, - comp_options=self.comp_options, - abundance_options=self.abundance_options, - reclustering_options=self.reclustering_options, - taxonomy_options=self.taxonomy_options, - ) + general.add_argument( + "-p", + dest="nthreads", + metavar="", + type=int, + default=DEFAULT_THREADS, + help=( + "number of threads to use when parsing BAM" + "[min(" + str(DEFAULT_THREADS) + ", nbamfiles)]" + ), + ) + general.add_argument( + "--norefcheck", + help="skip reference name hashing check [False]", + action="store_true", + ) + general.add_argument( + "--cuda", help="use GPU to train & cluster [False]", action="store_true" + ) + general.add_argument( + "--seed", + metavar="", + type=int, + default=int.from_bytes(os.urandom(7), "little"), + help="Random seed (random determinism not guaranteed)", + ) + return subparser -def add_input_output_arguments(subparser): - """Add TNFs and abundance arguments to a subparser.""" - # TNF arguments - tnfos = subparser.add_argument_group( - title="TNF input (either fasta or all .npz files required)" - ) +def add_composition_arguments(subparser: argparse.ArgumentParser): + tnfos = subparser.add_argument_group(title="Composition input") tnfos.add_argument("--fasta", metavar="", type=Path, help="path to fasta file") tnfos.add_argument( "--composition", metavar="", type=Path, help="path to .npz of composition" ) + return subparser - # Abundance arguments - abundanceos = subparser.add_argument_group( - title="Abundance input (exactly one input required)" - ) + +def add_abundance_arguments(subparser: argparse.ArgumentParser): + abundanceos = subparser.add_argument_group(title="Abundance input") # Note: This argument is deprecated, but we'll keep supporting it for now. # Instead, use --bamdir. abundanceos.add_argument( @@ -1638,27 +1702,7 @@ def add_input_output_arguments(subparser): type=Path, help="path to .npz of abundances", ) - - reqos = subparser.add_argument_group(title="Output (required)", description=None) - reqos.add_argument( - "--outdir", - metavar="", - type=Path, - # required=True, # Setting this flag breaks the argparse with positional arguments. The existence of the folder is checked elsewhere - help="output directory to create", - ) - - # # Optional arguments - inputos = subparser.add_argument_group(title="IO options", description=None) - inputos.add_argument( - "-m", - dest="minlength", - metavar="", - type=int, - default=2000, - help="ignore contigs shorter than this [2000]", - ) - inputos.add_argument( + abundanceos.add_argument( "-z", dest="min_alignment_id", metavar="", @@ -1666,23 +1710,46 @@ def add_input_output_arguments(subparser): default=None, help=argparse.SUPPRESS, ) - inputos.add_argument( - "-p", - dest="nthreads", + return subparser + + +def add_taxonomy_arguments(subparser: argparse.ArgumentParser, taxonomy_only=False): + taxonomys = subparser.add_argument_group(title="Taxonomy input") + taxonomys.add_argument( + "--taxonomy", metavar="", - type=int, - default=DEFAULT_THREADS, - help=( - "number of threads to use when parsing BAM" - "[min(" + str(DEFAULT_THREADS) + ", nbamfiles)]" - ), + type=Path, + help="path to the taxonomy file", ) - inputos.add_argument( - "--norefcheck", - help="skip reference name hashing check [False]", - action="store_true", + if not taxonomy_only: + taxonomys.add_argument( + "--no_predictor", + help="do not complete input taxonomy with Taxometer [False]", + action="store_true", + ) + return subparser + + +def add_marker_arguments(subparser: argparse.ArgumentParser): + marker_s = subparser.add_argument_group(title="Marker gene input") + marker_s.add_argument( + "--markers", + metavar="", + type=Path, + help="path to the marker .npz file", + ) + marker_s.add_argument( + "--hmmout_path", + metavar="", + type=Path, + help="path to the .hmm file of marker gene profiles", ) - inputos.add_argument( + return subparser + + +def add_bin_output_arguments(subparser: argparse.ArgumentParser): + bin_os = subparser.add_argument_group(title="Bin output options", description=None) + bin_os.add_argument( "--minfasta", dest="min_fasta_output_size", metavar="", @@ -1690,18 +1757,17 @@ def add_input_output_arguments(subparser): default=None, help="minimum bin size to output as fasta [None = no files]", ) - inputos.add_argument( - "--cuda", help="use GPU to train & cluster [False]", action="store_true" - ) - - # Other arguments - otheros = subparser.add_argument_group(title="Other options", description=None) - otheros.add_argument( - "--seed", + bin_os.add_argument( + "-o", + dest="binsplit_separator", metavar="", - type=int, - default=int.from_bytes(os.urandom(7), "little"), - help="Random seed (random determinism not guaranteed)", + type=str, + # This means: None is not passed, "" if passed but empty, e.g. "-o -c 5" + # otherwise a string. + default=None, + const="", + nargs="?", + help="binsplit separator [C if present] (pass empty string to disable)", ) return subparser @@ -1807,6 +1873,15 @@ def add_predictor_arguments(subparser): default=1024, help=argparse.SUPPRESS, ) + pred_trainos.add_argument( + "-pq", + dest="pred_batchsteps", + metavar="", + type=int, + nargs="*", + default=[25, 75, 150, 225], + help=argparse.SUPPRESS, + ) pred_trainos.add_argument( "-pthr", dest="pred_softmax_threshold", @@ -1820,6 +1895,7 @@ def add_predictor_arguments(subparser): dest="ploss", metavar="", type=str, + choices=["flat_softmax", "cond_softmax", "soft_margin"], default="flat_softmax", help=argparse.SUPPRESS, ) @@ -1853,18 +1929,7 @@ def add_clustering_arguments(subparser): default=None, help="stop after c clusters [None = infinite]", ) - clusto.add_argument( - "-o", - dest="binsplit_separator", - metavar="", - type=str, - # This means: None is not passed, "" if passed but empty, e.g. "-o -c 5" - # otherwise a string. - default=None, - const="", - nargs="?", - help="binsplit separator [C if present] (pass empty string to disable)", - ) + return subparser @@ -1944,11 +2009,7 @@ def add_aae_arguments(subparser): help=argparse.SUPPRESS, ) - aaetrainos = subparser.add_argument_group( - title="Training options AAE", description=None - ) - - aaetrainos.add_argument( + aaeos.add_argument( "--e_aae", dest="nepochs_aae", metavar="", @@ -1956,7 +2017,7 @@ def add_aae_arguments(subparser): default=70, help=argparse.SUPPRESS, ) - aaetrainos.add_argument( + aaeos.add_argument( "--t_aae", dest="batchsize_aae", metavar="", @@ -1964,7 +2025,7 @@ def add_aae_arguments(subparser): default=256, help=argparse.SUPPRESS, ) - aaetrainos.add_argument( + aaeos.add_argument( "--q_aae", dest="batchsteps_aae", metavar="", @@ -1976,24 +2037,6 @@ def add_aae_arguments(subparser): return subparser -def add_taxonomy_arguments(subparser, taxonomy_only=False): - # Taxonomy arguments - taxonomys = subparser.add_argument_group(title="Taxonomy") - taxonomys.add_argument( - "--taxonomy", - metavar="", - type=Path, - help="path to the taxonomy file", - ) - if not taxonomy_only: - taxonomys.add_argument( - "--no_predictor", - help="do not complete input taxonomy with Taxometer [False]", - action="store_true", - ) - return subparser - - def add_reclustering_arguments(subparser): # Reclustering arguments reclusters = subparser.add_argument_group(title="k-means reclustering arguments") @@ -2009,26 +2052,12 @@ def add_reclustering_arguments(subparser): type=Path, help="path to a cluster file corresponding to the latent space", ) - reclusters.add_argument( - "--hmmout_path", - metavar="", - type=Path, - default=None, - help="path to markers.hmmout path generated for the same set of contigs", - ) - reclusters.add_argument( - "-ro", - dest="binsplit_separator_recluster", - metavar="", - type=str, - default="C", - help="binsplit separator for reclustering [C]", - ) reclusters.add_argument( "--algorithm", metavar="", type=str, default="kmeans", + choices=["kmeans", "dbscan"], help="which reclustering algorithm to use ('kmeans', 'dbscan'). DBSCAN requires a taxonomy predictions file [kmeans]", ) return subparser @@ -2050,15 +2079,7 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False, ) - - # Help - helpos = parser.add_argument_group(title="Help and version", description=None) - helpos.add_argument("-h", "--help", help="print help and exit", action="help") - helpos.add_argument( - "--version", - action="version", - version=f"Vamb {vamb.__version_str__}", - ) + add_help_arguments(parser) if len(sys.argv) == 1: parser.print_help() @@ -2078,7 +2099,9 @@ def main(): help=""" VAMB, TaxVAMB, AVAMB binners """, + add_help=False, ) + add_help_arguments(vaevae_parserbin_parser) subparsers_model = vaevae_parserbin_parser.add_subparsers(dest="model_subcommand") vae_parser = subparsers_model.add_parser( @@ -2088,8 +2111,13 @@ def main(): See the paper "Improved metagenome binning and assembly using deep variational autoencoders" (https://www.nature.com/articles/s41587-020-00777-4) """, + add_help=False, + usage="%(prog)s [options]", ) - add_input_output_arguments(vae_parser) + add_general_arguments(vae_parser) + add_composition_arguments(vae_parser) + add_abundance_arguments(vae_parser) + add_bin_output_arguments(vae_parser) add_vae_arguments(vae_parser) add_clustering_arguments(vae_parser) @@ -2100,12 +2128,17 @@ def main(): See the paper "TaxVAMB: taxonomic annotations improve metagenome binning" (link) """, + add_help=False, + usage="%(prog)s [options]", ) - add_input_output_arguments(vaevae_parser) + add_general_arguments(vaevae_parser) + add_composition_arguments(vaevae_parser) + add_abundance_arguments(vaevae_parser) + add_taxonomy_arguments(vaevae_parser) + add_bin_output_arguments(vaevae_parser) add_vae_arguments(vaevae_parser) add_clustering_arguments(vaevae_parser) add_predictor_arguments(vaevae_parser) - add_taxonomy_arguments(vaevae_parser) vaeaae_parser = subparsers_model.add_parser( AVAMB, @@ -2115,8 +2148,13 @@ def main(): (https://www.nature.com/articles/s42003-023-05452-3). WARNING: recommended use is through the Snakemake pipeline """, + add_help=False, + usage="%(prog)s [options]", ) - add_input_output_arguments(vaeaae_parser) + add_general_arguments(vaeaae_parser) + add_composition_arguments(vaeaae_parser) + add_abundance_arguments(vaeaae_parser) + add_bin_output_arguments(vaeaae_parser) add_vae_arguments(vaeaae_parser) add_aae_arguments(vaeaae_parser) add_clustering_arguments(vaeaae_parser) @@ -2128,8 +2166,12 @@ def main(): See the paper "Taxometer: deep learning improves taxonomic classification of contigs using binning features and a hierarchical loss" (link) """, + add_help=False, + usage="%(prog)s [options]", ) - add_input_output_arguments(predict_parser) + add_general_arguments(predict_parser) + add_composition_arguments(predict_parser) + add_abundance_arguments(predict_parser) add_taxonomy_arguments(predict_parser, taxonomy_only=True) add_predictor_arguments(predict_parser) @@ -2138,31 +2180,47 @@ def main(): help=""" reclustering using single-copy genes for the binning results of VAMB, TaxVAMB or AVAMB """, + add_help=False, + usage="%(prog)s [options]", ) - add_input_output_arguments(recluster_parser) + add_general_arguments(recluster_parser) + add_composition_arguments(recluster_parser) + add_abundance_arguments(recluster_parser) + add_marker_arguments(recluster_parser) + add_bin_output_arguments(recluster_parser) add_reclustering_arguments(recluster_parser) add_taxonomy_arguments(recluster_parser) args = parser.parse_args() if args.subcommand == TAXOMETER: - runner = TaxometerArguments(args) + opt = TaxometerOptions.from_args(args) + runner = partial(run_taxonomy_predictor, opt) + run(runner, opt.general) elif args.subcommand == BIN: - if args.model_subcommand is None: + model = args.model_subcommand + if model is None: vaevae_parserbin_parser.print_help() sys.exit(1) - classes_map = { - VAMB: VAEArguments, - AVAMB: VAEAAEArguments, - TAXVAMB: VAEVAEArguments, - } - runner = classes_map[args.model_subcommand](args) + if model == VAMB: + opt = BinDefaultOptions.from_args(args) + runner = partial(run_bin_default, opt) + run(runner, opt.common.general) + elif model == TAXVAMB: + opt = BinTaxVambOptions.from_args(args) + runner = partial(run_vaevae, opt) + run(runner, opt.common.general) + elif model == AVAMB: + opt = BinAvambOptions.from_args(args) + runner = partial(run_bin_aae, opt) + run(runner, opt.common.general) elif args.subcommand == RECLUSTER: - runner = ReclusteringArguments(args) + opt = ReclusteringOptions.from_args(args) + runner = partial(run_reclustering, opt) + run(runner, opt.general) else: # There are no more subcommands assert False - runner.run() if __name__ == "__main__": diff --git a/vamb/vambtools.py b/vamb/vambtools.py index 4c2c2afb..6ffba314 100644 --- a/vamb/vambtools.py +++ b/vamb/vambtools.py @@ -42,7 +42,7 @@ class BinSplitter: """ _DEFAULT_SPLITTER = "C" - __slots__ = ["is_default", "splitter"] + __slots__ = ["is_default", "splitter", "is_initialized"] def __init__(self, binsplitter: Optional[str]): if binsplitter is None: @@ -55,12 +55,17 @@ def __init__(self, binsplitter: Optional[str]): self.splitter = None else: self.splitter = binsplitter + + self.is_initialized = False @classmethod def inert_splitter(cls): return cls("") def initialize(self, identifiers: Iterable[str]): + if self.is_initialized: + return None + self.is_initialized = True separator = self.splitter if separator is None: return None