From b1f25cee2f920189a527f2edea69bd525ceaaaa0 Mon Sep 17 00:00:00 2001 From: Will Fondrie Date: Thu, 7 Mar 2024 13:22:52 -0800 Subject: [PATCH] Update API and add support for small molecules (#43) * Add small molecule support and update peptide transformers to analyte transformers * Various fixes and new precursor calculation * Finished tests and fixed some bugs * Revert spliting peptide datasets * Fix bugs and improve test coverage * Make missing residues an error * Add molecule tests * Added customizable start and stop tokens * Allow add stop and start even if no token exists * Update test * Add embed method * Start making Wout's edits * Most of Wout's edits done * Final fixes * Fix formatting errors * Bump pre-commit versions * Ruff format update --- .pre-commit-config.yaml | 2 +- depthcharge/__init__.py | 1 + depthcharge/constants.py | 1 + depthcharge/data/__init__.py | 3 +- ...eptide_datasets.py => analyte_datasets.py} | 19 +- depthcharge/data/arrow.py | 17 +- depthcharge/data/fields.py | 2 + depthcharge/data/parsers.py | 18 + depthcharge/data/preprocessing.py | 3 + depthcharge/data/spectrum_datasets.py | 12 + depthcharge/encoders/__init__.py | 1 + depthcharge/encoders/sinusoidal.py | 7 + depthcharge/feedforward.py | 3 + depthcharge/mixins.py | 41 ++ depthcharge/primitives.py | 11 + depthcharge/testing.py | 2 + depthcharge/tokenizers/__init__.py | 4 +- depthcharge/tokenizers/molecules.py | 139 +++++ depthcharge/tokenizers/peptides.py | 421 ++++++--------- depthcharge/tokenizers/tokenizer.py | 56 +- depthcharge/transformers/__init__.py | 7 +- depthcharge/transformers/analytes.py | 478 ++++++++++++++++++ depthcharge/transformers/peptides.py | 289 ----------- depthcharge/transformers/spectra.py | 96 ++-- depthcharge/utils.py | 14 + depthcharge/version.py | 1 + pyproject.toml | 2 +- tests/conftest.py | 4 + tests/unit_tests/test_data/test_arrow.py | 1 + tests/unit_tests/test_data/test_datasets.py | 33 +- tests/unit_tests/test_data/test_loaders.py | 9 +- tests/unit_tests/test_data/test_parsers.py | 1 + .../test_encoders/test_sinusoidal.py | 1 + tests/unit_tests/test_feedforward.py | 1 + tests/unit_tests/test_primitives.py | 1 + tests/unit_tests/test_testing.py | 1 + .../test_tokenizers/test_molecules.py | 69 +++ .../test_tokenizers/test_peptides.py | 134 ++--- ...ormers.py => test_analyte_transformers.py} | 43 +- .../test_spectrum_transformers.py | 15 +- tests/unit_tests/test_version.py | 1 + 41 files changed, 1173 insertions(+), 791 deletions(-) rename depthcharge/data/{peptide_datasets.py => analyte_datasets.py} (79%) create mode 100644 depthcharge/mixins.py create mode 100644 depthcharge/tokenizers/molecules.py create mode 100644 depthcharge/transformers/analytes.py delete mode 100644 depthcharge/transformers/peptides.py create mode 100644 tests/unit_tests/test_tokenizers/test_molecules.py rename tests/unit_tests/test_transformers/{test_peptide_transformers.py => test_analyte_transformers.py} (50%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index db342af..f3f2761 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: trailing-whitespace - id: detect-private-key - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.1.7 + rev: v0.3.1 hooks: # Run the linter. - id: ruff diff --git a/depthcharge/__init__.py b/depthcharge/__init__.py index b7a1c97..08392af 100644 --- a/depthcharge/__init__.py +++ b/depthcharge/__init__.py @@ -1,4 +1,5 @@ """Initialize the depthcharge package.""" + # Ignore a bunch of pkg_resources warnings from dependencies: import warnings diff --git a/depthcharge/constants.py b/depthcharge/constants.py index d9b6cf9..55ad7cb 100644 --- a/depthcharge/constants.py +++ b/depthcharge/constants.py @@ -1,4 +1,5 @@ """Constants.""" + HYDROGEN = 1.007825035 OXYGEN = 15.99491463 H2O = 2 * HYDROGEN + OXYGEN diff --git a/depthcharge/data/__init__.py b/depthcharge/data/__init__.py index 14a189e..e6b8f3f 100644 --- a/depthcharge/data/__init__.py +++ b/depthcharge/data/__init__.py @@ -1,12 +1,13 @@ """The Pytorch Datasets.""" + from . import preprocessing +from .analyte_datasets import AnalyteDataset from .arrow import ( spectra_to_df, spectra_to_parquet, spectra_to_stream, ) from .fields import CustomField -from .peptide_datasets import PeptideDataset from .spectrum_datasets import ( AnnotatedSpectrumDataset, SpectrumDataset, diff --git a/depthcharge/data/peptide_datasets.py b/depthcharge/data/analyte_datasets.py similarity index 79% rename from depthcharge/data/peptide_datasets.py rename to depthcharge/data/analyte_datasets.py index 5fa476b..c626944 100644 --- a/depthcharge/data/peptide_datasets.py +++ b/depthcharge/data/analyte_datasets.py @@ -1,13 +1,14 @@ """Datasets for working with peptide sequences.""" + from collections.abc import Iterable import torch from torch.utils.data import DataLoader, TensorDataset -from ..tokenizers import PeptideTokenizer +from ..tokenizers import Tokenizer -class PeptideDataset(TensorDataset): +class AnalyteDataset(TensorDataset): """A dataset for peptide sequences. Parameters @@ -18,33 +19,26 @@ class PeptideDataset(TensorDataset): sequences : Iterable[str] The peptide sequences in a format compatible with your tokenizer. ProForma is preferred. - charges : torch.Tensor, - The charge state for each peptide. *args : torch.Tensor, optional Additional values to include during data loading. + """ def __init__( self, - tokenizer: PeptideTokenizer, + tokenizer: Tokenizer, sequences: Iterable[str], - charges: torch.Tensor, *args: torch.Tensor, ) -> None: """Initialize a PeptideDataset.""" tokens = tokenizer.tokenize(sequences) - super().__init__(tokens, charges, *args) + super().__init__(tokens, *args) @property def tokens(self) -> torch.Tensor: """The peptide sequence tokens.""" return self.tensors[0] - @property - def charges(self) -> torch.Tensor: - """The peptide charges.""" - return self.tensors[1] - def loader(self, *args: tuple, **kwargs: dict) -> DataLoader: """A PyTorch DataLoader for peptides. @@ -61,5 +55,6 @@ def loader(self, *args: tuple, **kwargs: dict) -> DataLoader: ------- torch.utils.data.DataLoader A DataLoader for the peptide. + """ return DataLoader(self, *args, **kwargs) diff --git a/depthcharge/data/arrow.py b/depthcharge/data/arrow.py index 456b024..81e9240 100644 --- a/depthcharge/data/arrow.py +++ b/depthcharge/data/arrow.py @@ -1,4 +1,5 @@ """Store spectrum data as Arrow tables.""" + from collections.abc import Callable, Generator, Iterable from os import PathLike from pathlib import Path @@ -82,6 +83,7 @@ def spectra_to_stream( ------- Generator of pyarrow.RecordBatch Batches of parsed spectra. + """ parser_args = { "ms_level": ms_level, @@ -195,6 +197,7 @@ def spectra_to_parquet( ------- Path The Parquet file that was written. + """ streamer = spectra_to_stream( peak_file=peak_file, @@ -210,12 +213,15 @@ def spectra_to_parquet( if parquet_file is None: parquet_file = Path(Path(peak_file).stem).with_suffix(".parquet") - writer = None - for batch in streamer: - if writer is None: - writer = pq.ParquetWriter(parquet_file, schema=batch.schema) + try: + writer = None + for batch in streamer: + if writer is None: + writer = pq.ParquetWriter(parquet_file, schema=batch.schema) - writer.write_batch(batch) + writer.write_batch(batch) + finally: + writer.close() return parquet_file @@ -287,6 +293,7 @@ def spectra_to_df( ------- Path The Parquet file that was written. + """ streamer = spectra_to_stream( peak_file=peak_file, diff --git a/depthcharge/data/fields.py b/depthcharge/data/fields.py index 7d77665..73343d5 100644 --- a/depthcharge/data/fields.py +++ b/depthcharge/data/fields.py @@ -1,4 +1,5 @@ """Custom fields for the Arrow Schema.""" + from collections.abc import Callable from dataclasses import dataclass @@ -24,6 +25,7 @@ class CustomField: each spectrum. dtype: pyarrow.DataType The expected Arrow data type for the column in the schema. + """ name: str diff --git a/depthcharge/data/parsers.py b/depthcharge/data/parsers.py index 139a24d..29c218b 100644 --- a/depthcharge/data/parsers.py +++ b/depthcharge/data/parsers.py @@ -1,4 +1,5 @@ """Mass spectrometry data parsers.""" + from __future__ import annotations import logging @@ -44,6 +45,7 @@ class BaseParser(ABC): Enable or disable the progress bar. id_type : str, optional The Hupo-PSI prefix for the spectrum identifier. + """ def __init__( @@ -111,6 +113,7 @@ def sniff(self) -> None: ------ IOError Raised if the file is not the expected format. + """ @abstractmethod @@ -130,6 +133,7 @@ def parse_spectrum(self, spectrum: dict) -> MassSpectrum | None: ------- MassSpectrum or None The parsed mass spectrum or None if it is skipped. + """ def parse_custom_fields(self, spectrum: dict) -> dict[str, Any]: @@ -144,6 +148,7 @@ def parse_custom_fields(self, spectrum: dict) -> dict[str, Any]: ------- dict The parsed value of each, whatever it may be. + """ out = {} if self.custom_fields is None: @@ -167,6 +172,7 @@ def iter_batches(self, batch_size: int | None) -> pa.RecordBatch: ------ RecordBatch A batch of spectra and their metadata. + """ batch_size = float("inf") if batch_size is None else batch_size pbar_args = { @@ -229,6 +235,7 @@ def _update_batch(self, entry: dict) -> None: ---------- entry : dict The elemtn to add. + """ if self._batch is None: self._batch = {k: [v] for k, v in entry.items()} @@ -264,6 +271,7 @@ class MzmlParser(BaseParser): spectrum from the corresponding Pyteomics parser. progress : bool, optional Enable or disable the progress bar. + """ def sniff(self) -> None: @@ -273,6 +281,7 @@ def sniff(self) -> None: ------ IOError Raised if the file is not the expected format. + """ with self.peak_file.open() as mzdat: next(mzdat) @@ -295,6 +304,7 @@ def parse_spectrum(self, spectrum: dict) -> MassSpectrum | None: ------- MassSpectrum or None The parsed mass spectrum or None if not at the correct MS level. + """ ms_level = spectrum["ms level"] if self.ms_level is not None and ms_level not in self.ms_level: @@ -363,6 +373,7 @@ class MzxmlParser(BaseParser): spectrum from the corresponding Pyteomics parser. progress : bool, optional Enable or disable the progress bar. + """ def sniff(self) -> None: @@ -372,6 +383,7 @@ def sniff(self) -> None: ------ IOError Raised if the file is not the expected format. + """ scent = "http://sashimi.sourceforge.net/schema_revision/mzXML" with self.peak_file.open() as mzdat: @@ -395,6 +407,7 @@ def parse_spectrum(self, spectrum: dict) -> MassSpectrum | None: ------- MassSpectrum The parsed mass spectrum. + """ ms_level = spectrum["msLevel"] if self.ms_level is not None and ms_level not in self.ms_level: @@ -442,6 +455,7 @@ class MgfParser(BaseParser): spectrum from the corresponding Pyteomics parser. progress : bool, optional Enable or disable the progress bar. + """ def __init__( @@ -476,6 +490,7 @@ def sniff(self) -> None: ------ IOError Raised if the file is not the expected format. + """ with self.peak_file.open() as mzdat: if not next(mzdat).startswith("BEGIN IONS"): @@ -492,6 +507,7 @@ def parse_spectrum(self, spectrum: dict) -> MassSpectrum: ---------- spectrum : dict The dictionary defining the spectrum in MGF format. + """ self._counter += 1 if self.ms_level is not None and 1 not in self.ms_level: @@ -531,6 +547,7 @@ def _parse_scan_id(scan_str: str | int) -> int: ------- int The scan ID number. + """ try: return int(scan_str) @@ -565,6 +582,7 @@ def get_parser(cls, peak_file: PathLike, **kwargs: dict) -> BaseParser: The peak file to parse. kwargs : dict Keyword arguments to pass to the parser. + """ for parser in cls.parsers: try: diff --git a/depthcharge/data/preprocessing.py b/depthcharge/data/preprocessing.py index 8b5bb93..3dc6a29 100644 --- a/depthcharge/data/preprocessing.py +++ b/depthcharge/data/preprocessing.py @@ -39,6 +39,7 @@ def my_func(spec: MassSpectrum) -> MassSpectrum: ``` """ + from collections.abc import Callable from functools import wraps @@ -79,6 +80,7 @@ def wrapper( ------- Callable A valid deptcharge preprocessing function. + """ @wraps(wrapper) @@ -94,6 +96,7 @@ def preprocess(spec: MassSpectrum) -> MassSpectrum: ------- MassSpectrum The processed mass spectrum. + """ # Call the spectrum_utils method: getattr(spec, func)(*args, **kwargs) diff --git a/depthcharge/data/spectrum_datasets.py b/depthcharge/data/spectrum_datasets.py index 30f111e..cd41eb9 100644 --- a/depthcharge/data/spectrum_datasets.py +++ b/depthcharge/data/spectrum_datasets.py @@ -1,4 +1,5 @@ """Serve mass spectra to neural networks.""" + from __future__ import annotations import logging @@ -52,6 +53,7 @@ def collate_fn( dict of str, tensor or list A dictionary mapping the columns of the lance dataset to a PyTorch tensor or list of values. + """ mz_array = nn.utils.rnn.pad_sequence( [s.pop("mz_array") for s in batch], @@ -124,6 +126,7 @@ class SpectrumDataset(Dataset, CollateFnMixin): ---------- peak_files : list of str path : Path + """ def __init__( @@ -180,6 +183,7 @@ def add_spectra( Keyword arguments passed `depthcharge.spectra_to_stream()` for peak files that are provided. This argument has no affect for DataFrame or parquet file inputs. + """ spectra = utils.listify(spectra) batch = next(_get_records(spectra, **kwargs)) @@ -207,6 +211,7 @@ def __getitem__(self, idx: int) -> dict[str, Any]: key is a column and the value is the value for that row. List columns are automatically converted to PyTorch tensors if the nested data type is compatible. + """ return { k: _tensorize(v[0]) @@ -249,6 +254,7 @@ def from_lance(cls, path: PathLike, **kwargs: dict) -> SpectrumDataset: Keyword arguments passed `depthcharge.spectra_to_stream()` for peak files that are added. This argument has no affect for DataFrame or parquet file inputs. + """ return cls(spectra=None, path=path, **kwargs) @@ -295,6 +301,7 @@ class AnnotatedSpectrumDataset(SpectrumDataset): The tokenizer for the annotations. annotations : str The annotation column in the dataset. + """ def __init__( @@ -333,6 +340,7 @@ def collate_fn( dict of str, tensor or list A dictionary mapping the columns of the lance dataset to a PyTorch tensor or list of values. + """ batch = super().collate_fn(batch) batch[self.annotations] = self.tokenizer.tokenize( @@ -363,6 +371,7 @@ def from_lance( Keyword arguments passed `depthcharge.spectra_to_stream()` for peak files that are added. This argument has no affect for DataFrame or parquet file inputs. + """ return cls( spectra=None, @@ -408,6 +417,7 @@ class StreamingSpectrumDataset(IterableDataset, CollateFnMixin): ---------- batch_size : int The batch size to use for loading mass spectra. + """ def __init__( @@ -451,6 +461,7 @@ def _get_records( The data to add. **kwargs : dict Keyword arguments for the parser. + """ for spectra in data: try: @@ -478,6 +489,7 @@ def _tensorize(obj: Any) -> Any: # noqa: ANN401 Any Whatever type the object is, unless its been transformed to a PyTorch tensor. + """ if not isinstance(obj, list): return obj diff --git a/depthcharge/encoders/__init__.py b/depthcharge/encoders/__init__.py index a1496dd..a2f6209 100644 --- a/depthcharge/encoders/__init__.py +++ b/depthcharge/encoders/__init__.py @@ -1,4 +1,5 @@ """Avalailable encoders.""" + from .sinusoidal import ( FloatEncoder, PeakEncoder, diff --git a/depthcharge/encoders/sinusoidal.py b/depthcharge/encoders/sinusoidal.py index 919ccd7..e5f38ed 100644 --- a/depthcharge/encoders/sinusoidal.py +++ b/depthcharge/encoders/sinusoidal.py @@ -1,4 +1,5 @@ """Simple encoders for input into Transformers and the like.""" + import math import einops @@ -20,6 +21,7 @@ class FloatEncoder(torch.nn.Module): learnable_wavelengths : bool, optional Allow the selected wavelengths to be fine-tuned by the model. + """ def __init__( @@ -71,6 +73,7 @@ def forward(self, X: torch.Tensor) -> torch.Tensor: ------- torch.Tensor of shape (batch_size, n_float, d_model) The encoded features for the floating point numbers. + """ sin_mz = torch.sin(X[:, :, None] / self.sin_term) cos_mz = torch.cos(X[:, :, None] / self.cos_term) @@ -97,6 +100,7 @@ class PeakEncoder(torch.nn.Module): learnable_wavelengths : bool, optional Allow the selected wavelengths to be fine-tuned by the model. + """ def __init__( @@ -146,6 +150,7 @@ def forward(self, X: torch.Tensor) -> torch.Tensor: ------- torch.Tensor of shape (n_spectra, n_peaks, d_model) The encoded features for the mass spectra. + """ encoded = torch.cat( [ @@ -169,6 +174,7 @@ class PositionalEncoder(FloatEncoder): The shortest wavelength in the geometric progression. max_wavelength : float, optional The longest wavelength in the geometric progression. + """ def __init__( @@ -198,6 +204,7 @@ def forward(self, X: torch.Tensor) -> torch.Tensor: ------- torch.Tensor of shape (batch_size, n_sequence, n_features) The encoded features for the mass spectra. + """ pos = torch.arange(X.shape[1]).type_as(self.sin_term) pos = einops.repeat(pos, "n -> b n", b=X.shape[0]) diff --git a/depthcharge/feedforward.py b/depthcharge/feedforward.py index 7e207b6..9e5a3f9 100644 --- a/depthcharge/feedforward.py +++ b/depthcharge/feedforward.py @@ -1,4 +1,5 @@ """A flexible feed-forward neural network.""" + from collections.abc import Iterable import numpy as np @@ -25,6 +26,7 @@ class FeedForward(torch.nn.Module): The activation function to place between layers. append : torch.nn.Module or None, optional A final layer to append, such as a sigmoid or tanh. + """ def __init__( @@ -73,5 +75,6 @@ def forward(self, X: torch.Tensor) -> torch.Tensor: ------- torch.Tensor of shape (..., out_features) The output tensor. + """ return self.layers(X) diff --git a/depthcharge/mixins.py b/depthcharge/mixins.py new file mode 100644 index 0000000..f416878 --- /dev/null +++ b/depthcharge/mixins.py @@ -0,0 +1,41 @@ +"""Helpful Mixins.""" + +import torch + + +class ModelMixin: + """Add helpful properties for depthcharge models.""" + + @property + def device(self) -> torch.device: + """The current device for first parameter of the model.""" + return next(self.parameters()).device + + +class TransformerMixin: + """Properties shared by Transformer models.""" + + @property + def d_model(self) -> int: + """The latent dimensionality of the model.""" + return self._d_model + + @property + def nhead(self) -> int: + """The number of attention heads.""" + return self._nhead + + @property + def dim_feedforward(self) -> int: + """The dimensionality of the Transformer feedforward layers.""" + return self._dim_feedforward + + @property + def n_layers(self) -> int: + """The number of Transformer layers.""" + return self._n_layers + + @property + def dropout(self) -> float: + """The dropout for the transformer layers.""" + return self._dropout diff --git a/depthcharge/primitives.py b/depthcharge/primitives.py index 80430b6..b1d03f2 100644 --- a/depthcharge/primitives.py +++ b/depthcharge/primitives.py @@ -1,4 +1,5 @@ """Fundamental dataclasses for depthcharge.""" + from __future__ import annotations import re @@ -46,6 +47,7 @@ class Peptide: that no modifications are present. charge : int, optional The charge of the peptide. + """ sequence: str @@ -139,6 +141,7 @@ def from_proforma( ------- Peptide The parsed ProForma peptide. + """ pep, meta = proforma.parse(sequence) try: @@ -191,6 +194,7 @@ def from_massivekb( Peptide The parsed MassIVE peptide after conversion to a ProForma format. + """ sequence = cls.massivekb_to_proforma(sequence, charge) return cls.from_proforma(sequence) @@ -217,6 +221,7 @@ def massivekb_to_proforma( str The parsed MassIVE peptide after conversion to a ProForma format. + """ sequence = "".join( [ @@ -242,6 +247,7 @@ class PeptideIons: The monoisotopic m/z of the precursor ion. fragments : torch.Tensor[float] The generated fragment ions originated from the peptide. + """ tokens: list[str] @@ -274,6 +280,7 @@ class Molecule: A SMILES string defining the molecule. charge : int, optional The charge of the molecule. + """ smiles: str @@ -295,6 +302,7 @@ def show(self, **kwargs: dict) -> PngImageFile: ---------- **kwargs : dict Keyword arguments passed to ``rdkit.Chem.Draw.MolToImage`` + """ return Draw.MolToImage(self._mol, **kwargs) @@ -321,6 +329,7 @@ def from_selfies( ------- Molecule The parsed Molecule. + """ return cls(sf.decoder(selfies), charge) @@ -350,6 +359,7 @@ class MassSpectrum(MsmsSpectrum): A label for the mass spectrum. This is typically an annotation, such as the generating peptide sequence, but is distinct from spectrum_utils' annotation. + """ def __init__( @@ -421,5 +431,6 @@ def to_tensor(self) -> torch.tensor: ------- torch.tensor of shape (n_peaks, 2) The mass spectrum information. + """ return torch.tensor(np.vstack([self.mz, self.intensity]).T) diff --git a/depthcharge/testing.py b/depthcharge/testing.py index 353b7a0..e8a31fc 100644 --- a/depthcharge/testing.py +++ b/depthcharge/testing.py @@ -1,4 +1,5 @@ """Helper functions for testing.""" + from typing import Any import torch @@ -22,6 +23,7 @@ def assert_dicts_equal( ------ AssertionError Indicates that the two dictionaries are not equal. + """ bad_keys = [] assert set(dict1.keys()) == set(dict2.keys()) diff --git a/depthcharge/tokenizers/__init__.py b/depthcharge/tokenizers/__init__.py index 1a76264..e2e2916 100644 --- a/depthcharge/tokenizers/__init__.py +++ b/depthcharge/tokenizers/__init__.py @@ -1,3 +1,5 @@ -"""Deptcharge tokenizers.""" +"""Depthcharge tokenizers.""" + +from .molecules import MoleculeTokenizer from .peptides import PeptideTokenizer from .tokenizer import Tokenizer diff --git a/depthcharge/tokenizers/molecules.py b/depthcharge/tokenizers/molecules.py new file mode 100644 index 0000000..736365e --- /dev/null +++ b/depthcharge/tokenizers/molecules.py @@ -0,0 +1,139 @@ +"""Tokenizers for small molecules.""" + +from __future__ import annotations + +from collections.abc import Iterable + +import selfies as sf + +from .. import utils +from ..primitives import Molecule +from .tokenizer import Tokenizer + + +class MoleculeTokenizer(Tokenizer): + """A tokenizer for small molecules. + + Tokenize SMILES and SELFIES representations of small molecules. + SMILES are internally converted to SELFIES representations. + + Parameters + ---------- + selfies_vocab : Iterable[str] + The SELFIES tokens to be considered. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. + + Attributes + ---------- + index : SortedDict{str, int} + The mapping of residues and modifications to integer representations. + reverse_index : list[None | str] + The ordered residues and modifications where the list index is the + integer representation for a token. + start_token : str + The start token + stop_token : str + The stop token. + start_int : int + The integer representation of the start token + stop_int : int + The integer representation of the stop token. + padding_int : int + The integer used to represent padding. + + """ + + def __init__( + self, + selfies_vocab: Iterable[str] | None = None, + start_token: str | None = None, + stop_token: str | None = "$", + ) -> None: + """Initialize a MoleculeTokenizer.""" + if selfies_vocab is None: + selfies_vocab = sf.get_semantic_robust_alphabet() + + self.selfies_vocab = selfies_vocab + super().__init__(selfies_vocab, start_token, stop_token) + + def split(self, sequence: str) -> list[str]: + """Split a SMILES or SELFIES string into SELFIES tokens. + + Parameters + ---------- + sequence : str + The SMILES or SELFIES string representing a molecule. + + Returns + ------- + List[str] + The SELFIES tokens representing the molecule. + + """ + try: + return list(sf.split_selfies(sf.encoder(sequence))) + except sf.EncoderError: + return list(sf.split_selfies(sequence)) + + @classmethod + def from_smiles( + cls, + smiles: Iterable[str] | str, + start_token: str | None = None, + stop_token: str | None = "$", + ) -> MoleculeTokenizer: + """Learn the vocabulary from SMILES strings. + + Parameters + ---------- + smiles : Iterable[str] | str + Create a vocabulary from all unique tokens in these SMILES strings. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. + + Returns + ------- + MoleculeTokenizer + The tokenizer restricted to the vocabulary present in the + input SMILES strings. + + """ + vocab = sf.get_alphabet_from_selfies( + Molecule(s).to_selfies() for s in utils.listify(smiles) + ) + + return cls(vocab, start_token, stop_token) + + @classmethod + def from_selfies( + cls, + selfies: Iterable[str] | str, + start_token: str | None = None, + stop_token: str | None = "$", + ) -> MoleculeTokenizer: + """Learn the vocabulary from SELFIES strings. + + Parameters + ---------- + selfies : Iterable[str] | str + Create a vocabulary from all unique tokens in these SELFIES + strings. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. + + Returns + ------- + MoleculeTokenizer + The tokenizer restricted to the vocabulary present in the + input SMILES strings. + + """ + vocab = sf.get_alphabet_from_selfies(utils.listify(selfies)) + return cls(vocab, start_token, stop_token) diff --git a/depthcharge/tokenizers/peptides.py b/depthcharge/tokenizers/peptides.py index bddad42..18254e3 100644 --- a/depthcharge/tokenizers/peptides.py +++ b/depthcharge/tokenizers/peptides.py @@ -1,40 +1,42 @@ """Tokenizers for peptides.""" + from __future__ import annotations import re from collections.abc import Iterable -import numba as nb -import numpy as np import torch from pyteomics.proforma import GenericModification, MassModification from .. import utils from ..constants import H2O, PROTON -from ..primitives import MSKB_TO_UNIMOD, Peptide, PeptideIons +from ..primitives import MSKB_TO_UNIMOD, Peptide from .tokenizer import Tokenizer class PeptideTokenizer(Tokenizer): """A tokenizer for ProForma peptide sequences. - Parse and tokenize ProForma-compliant peptide sequences. Additionally, - use this class to calculate fragment and precursor ion masses. + Parse and tokenize ProForma-compliant peptide sequences. Parameters ---------- residues : dict[str, float], optional - Residues and modifications to add to the vocabulary beyond the - standard 20 amino acids. - replace_isoleucine_with_leucine : bool - Replace I with L residues, because they are isobaric and often + Residues and modifications to add to the vocabulary beyond + the standard 20 amino acids. + replace_isoleucine_with_leucine : bool, optional + Replace I with L residues, because they are isomeric and often indistinguishable by mass spectrometry. - reverse : bool + reverse : bool, optional Reverse the sequence for tokenization, C-terminus to N-terminus. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. Attributes ---------- - residues : numba.typed.Dict[str, float] + residues : SortedDict[str, float] The residues and modifications and their associated masses. terminal modifcations are indicated by `-`. index : SortedDict{str, int} @@ -42,72 +44,101 @@ class PeptideTokenizer(Tokenizer): reverse_index : list[None | str] The ordered residues and modifications where the list index is the integer representation for a token. + start_token : str + The start token stop_token : str The stop token. + start_int : int + The integer representation of the start token + stop_int : int + The integer representation of the stop token. + padding_int : int + The integer used to represent padding. + """ - residues = nb.typed.Dict.empty( - nb.types.unicode_type, - nb.types.float64, - ) - residues.update( - G=57.021463735, - A=71.037113805, - S=87.032028435, - P=97.052763875, - V=99.068413945, - T=101.047678505, - C=103.009184505, - L=113.084064015, - I=113.084064015, - N=114.042927470, - D=115.026943065, - Q=128.058577540, - K=128.094963050, - E=129.042593135, - M=131.040484645, - H=137.058911875, - F=147.068413945, - R=156.101111050, - Y=163.063328575, - W=186.079312980, - ) + residues = { + "G": 57.021463735, + "A": 71.037113805, + "S": 87.032028435, + "P": 97.052763875, + "V": 99.068413945, + "T": 101.047678505, + "C": 103.009184505, + "L": 113.084064015, + "I": 113.084064015, + "N": 114.042927470, + "D": 115.026943065, + "Q": 128.058577540, + "K": 128.094963050, + "E": 129.042593135, + "M": 131.040484645, + "H": 137.058911875, + "F": 147.068413945, + "R": 156.101111050, + "Y": 163.063328575, + "W": 186.079312980, + } # The peptide parsing function: _parse_peptide = Peptide.from_proforma def __init__( self, - residues: dict[str, float] | None = None, + residues: Iterable[str] | None = None, replace_isoleucine_with_leucine: bool = False, reverse: bool = False, + start_token: str | None = None, + stop_token: str | None = "$", ) -> None: """Initialize a PeptideTokenizer.""" self.replace_isoleucine_with_leucine = replace_isoleucine_with_leucine self.reverse = reverse + + # Note that these also secretly work on dicts too ;) self.residues = self.residues.copy() if residues is not None: self.residues.update(residues) if self.replace_isoleucine_with_leucine: - del self.residues["I"] - - super().__init__(list(self.residues.keys())) - - def __getstate__(self) -> dict: - """How to pickle the object.""" - self.residues = dict(self.residues) - return self.__dict__ - - def __setstate__(self, state: dict) -> None: - """How to unpickle the object.""" - self.__dict__ = state - residues = self.residues - self.residues = nb.typed.Dict.empty( - nb.types.unicode_type, - nb.types.float64, + if "I" in self.residues: + del self.residues["I"] + + super().__init__(self.residues, start_token, stop_token) + self.masses = torch.tensor( + [self.residues.get(a, 0.0) for a in self.reverse_index] ) - self.residues.update(residues) + + def calculate_precursor_ions( + self, + tokens: torch.Tensor | Iterable[str], + charges: torch.Tensor, + ) -> torch.Tensor: + """Calculate the m/z for precursor ions. + + Parameters + ---------- + tokens : torch.Tensor of shape (n_sequences, len_seq) + The tokens corresponding to the peptide sequence. + charges : torch.Tensor of shape (n_sequences,) + The charge state for each peptide. + + Returns + ------- + torch.Tensor + The monoisotopic m/z for each charged peptide. + + """ + if isinstance(tokens[0], str): + tokens = self.tokenize(utils.listify(tokens)) + + if not isinstance(charges, torch.Tensor): + charges = torch.tensor(charges) + if not charges.shape: + charges = charges[None] + + masses = self.masses[tokens].sum(dim=1) + H2O + return (masses / charges) + PROTON def split(self, sequence: str) -> list[str]: """Split a ProForma peptide sequence. @@ -120,7 +151,8 @@ def split(self, sequence: str) -> list[str]: Returns ------- list[str] - The tokens that compprise the peptide sequence. + The tokens that comprise the peptide sequence. + """ pep = self._parse_peptide(sequence) if self.replace_isoleucine_with_leucine: @@ -132,101 +164,14 @@ def split(self, sequence: str) -> list[str]: return pep - def ions( # noqa: C901 - self, - sequences: Iterable[str], - precursor_charges: Iterable[int] | str, - max_fragment_charge: int | None = None, - ) -> tuple[torch.Tensor[float], list[torch.Tensor[float]]]: - """Calculate the m/z for the precursor and fragment ions. - - Currently depthcharge only support b and y ions. - - Parameters - ---------- - sequences : Iterable[str], - The peptide sequences. - precursor_charges : Iterable[int] or None, optional - The charge of each precursor ion. If ``None``, the charge state - is expected to be found in the peptide strings. - max_fragment_charge : int or None, optional - The maximum charge for fragment ions. The default is to consider - up to the ``max(precursor_charge - 1, 1)``. - - Returns - ------- - list of PeptideIons - The precursor and fragment ions generated by the peptide. - """ - sequences = utils.listify(sequences) - if max_fragment_charge is None: - max_fragment_charge = np.inf - - if precursor_charges is None: - precursor_charges = [None] * len(sequences) - else: - precursor_charges = utils.listify(precursor_charges) - - if len(sequences) != len(precursor_charges): - raise ValueError( - "The number of sequences and precursor charges did not match." - ) - - out = [] - for seq, charge in zip(sequences, precursor_charges): - if isinstance(seq, str): - if self.replace_isoleucine_with_leucine: - seq = seq.replace("I", "L") - - try: - pep = Peptide.from_proforma(seq) - except ValueError: - pep = Peptide.from_massivekb(seq) - - tokens = pep.split() - if charge is None: - charge = max(pep.charge - 1, 1) - else: - tokens = seq - - if charge is None: - raise ValueError( - f"No charge was provided for {seq}", - ) - - try: - prec = _calc_precursor_mass( - nb.typed.List(tokens), - charge, - self.residues, - ) - except KeyError as err: - raise ValueError( - f"Unrecognized token(s) in {''.join(tokens)}" - ) from err - - frags = _calc_fragment_masses( - nb.typed.List(tokens), - min(charge, max_fragment_charge), - self.residues, - ) - - ions = PeptideIons( - tokens=tokens, - precursor=prec, - fragments=torch.tensor(frags), - ) - - out.append(ions) - - return out - @classmethod def from_proforma( cls, sequences: Iterable[str], - replace_isoleucine_with_leucine: bool = True, + replace_isoleucine_with_leucine: bool = False, reverse: bool = True, + start_token: str | None = None, + stop_token: str | None = "$", ) -> PeptideTokenizer: """Create a tokenizer with the observed peptide modications. @@ -237,54 +182,65 @@ def from_proforma( ---------- sequences : Iterable[str] The peptides from which to parse modifications. - replace_isoleucine_with_leucine : bool + replace_isoleucine_with_leucine : bool, optional Replace I with L residues, because they are isobaric and often indistinguishable by mass spectrometry. - reverse : bool + reverse : bool, optional Reverse the sequence for tokenization, C-terminus to N-terminus. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. Returns ------- PeptideTokenizer A tokenizer for peptides with the observed modifications. + """ if isinstance(sequences, str): sequences = [sequences] # Parse modifications: - new_res = cls.residues.copy() + new_res = {} for peptide in sequences: parsed = Peptide.from_proforma(peptide).split() for token in parsed: - if token in new_res.keys(): - continue - - if token == "-": + if token in cls.residues: continue - match = re.search(r"(.*)\[(.*)\]", token) try: - res, mod = match.groups() - if res and res != "-": - res_mass = new_res[res] - else: - res_mass = 0 - except (AttributeError, KeyError) as err: - raise ValueError("Unrecognized token {token}.") from err + res, mod = re.search(r"(.*)\[(.*)\]", token).groups() + try: + mod_mass = MassModification(mod).mass + except ValueError: + mod_mass = GenericModification(mod).mass + except AttributeError as err: + raise KeyError(f"Unknown residue {token}") from err try: - mod = MassModification(mod) - except ValueError: - mod = GenericModification(mod) + res_mass = cls.residues.get(res, 0) + except KeyError as err: + raise ValueError(f"Unrecognized token {token}.") from err + except AttributeError: + res_mass = 0.0 # In case we don't care about ions. - new_res[token] = res_mass + mod.mass + new_res[token] = res_mass + mod_mass - return cls(new_res, replace_isoleucine_with_leucine, reverse) + return cls( + new_res, + replace_isoleucine_with_leucine, + reverse, + start_token, + stop_token, + ) @staticmethod def from_massivekb( - replace_isoleucine_with_leucine: bool = True, + replace_isoleucine_with_leucine: bool = False, reverse: bool = True, + start_token: str | None = None, + stop_token: str | None = "$", ) -> MskbPeptideTokenizer: """Create a tokenizer with the observed peptide modications. @@ -293,21 +249,28 @@ def from_massivekb( Parameters ---------- - replace_isoleucine_with_leucine : bool + replace_isoleucine_with_leucine : bool, optional Replace I with L residues, because they are isobaric and often indistinguishable by mass spectrometry. - reverse : bool + reverse : bool, optional Reverse the sequence for tokenization, C-terminus to N-terminus. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. Returns ------- MskbPeptideTokenizer A tokenizer for peptides with the observed modifications. + """ return MskbPeptideTokenizer.from_proforma( [f"{mod}A" for mod in MSKB_TO_UNIMOD.values()], replace_isoleucine_with_leucine, reverse, + start_token, + stop_token, ) @@ -325,6 +288,10 @@ class MskbPeptideTokenizer(PeptideTokenizer): indistinguishable by mass spectrometry. reverse : bool Reverse the sequence for tokenization, C-terminus to N-terminus. + start_token : str, optional + The start token to use. + stop_token : str, optional + The stop token to use. Attributes ---------- @@ -336,117 +303,13 @@ class MskbPeptideTokenizer(PeptideTokenizer): reverse_index : list[None | str] The ordered residues and modifications where the list index is the integer representation for a token. - stop_token : str - The stop token. + start_int : int + The integer representation of the start token + stop_int : int + The integer representation of the stop token. + padding_int : int + The integer used to represent padding. """ _parse_peptide = Peptide.from_massivekb - - -@nb.njit -def _calc_precursor_mass( - tokens: list[str], - charge: int, - masses: nb.typed.Dict, -) -> float: - """Calculate the precursor mass of a peptide sequence. - - Parameters - ---------- - tokens : list of str - The tokenized peptide sequence. - charge : int - The charge state to consider. Use 'None' to get the neutral mass. - masses : nb.typed.Dict - The mass dictionary to use. - - Returns - ------- - float - The precurosr monoisotopic m/z. - """ - mass = sum([masses[t] for t in tokens]) + H2O - if charge is not None: - mass = _mass2mz(mass, charge) - - return mass - - -@nb.njit -def _calc_fragment_masses( - tokens: list[str], - charge: int, - masses: nb.typed.Dict, -) -> np.ndarray[float]: - """Calculate the b and y ions for a peptide sequence. - - Parameters - ---------- - tokens : list of str - The tokenized peptide sequence. - charge : int - The charge state to consider. Use 'None' to get the neutral mass. - masses : nb.typed.Dict - The mass dictionary to use. - - Returns - ------- - np.ndarray of shape (2, len(seq) - 1, charge) - The m/z of the predicted b and y ions. - """ - # Handle terminal mods: - seq = np.empty(len(tokens)) - n_mod = False - c_mod = False - for idx, token in enumerate(tokens): - if not idx and token.endswith("-"): - n_mod = True - - if idx == (len(tokens) - 1) and token.startswith("-"): - c_mod = True - - seq[idx] = masses[token] - - if n_mod: - seq[1] += seq[0] - seq = seq[1:] - - if c_mod: - seq[-2] += seq[-1] - seq = seq[:-1] - - # Calculate fragments: - max_charge = min(charge, 2) - n_ions = len(seq) - 1 - ions = np.empty((2, n_ions, max_charge)) - b_mass = 0 - y_mass = H2O - for idx in range(n_ions): - b_mass += seq[idx] - y_mass += seq[-(idx + 1)] - for cur_charge in range(1, max_charge + 1): - z_idx = cur_charge - 1 - ions[0, idx, z_idx] = _mass2mz(b_mass, cur_charge) - ions[1, idx, z_idx] = _mass2mz(y_mass, cur_charge) - - return ions - - -@nb.njit -def _mass2mz(mass: float, charge: int) -> float: - """Calculate the m/z. - - Parameters - ---------- - mass : float - The neutral mass. - charge : int - The charge. - - Returns - ------- - float - The m/z - """ - return (mass / charge) + PROTON diff --git a/depthcharge/tokenizers/tokenizer.py b/depthcharge/tokenizers/tokenizer.py index e232cb5..c07a0fa 100644 --- a/depthcharge/tokenizers/tokenizer.py +++ b/depthcharge/tokenizers/tokenizer.py @@ -1,4 +1,5 @@ """A base Tokenizer class.""" + from __future__ import annotations from abc import ABC, abstractmethod @@ -18,12 +19,21 @@ class Tokenizer(ABC): ---------- tokens : Sequence[str] The tokens to consider. + start_token : str, optional + The start token to use. stop_token : str, optional The stop token to use. + """ - def __init__(self, tokens: Sequence[str], stop_token: str = "$") -> None: + def __init__( + self, + tokens: Sequence[str], + start_token: str | None = None, + stop_token: str | None = "$", + ) -> None: """Initialize a tokenizer.""" + self.start_token = start_token self.stop_token = stop_token tokens = SortedSet(tokens) @@ -32,10 +42,16 @@ def __init__(self, tokens: Sequence[str], stop_token: str = "$") -> None: f"Stop token {stop_token} already exists in tokens.", ) - tokens.add(self.stop_token) + if start_token is not None: + tokens.add(self.start_token) + if stop_token is not None: + tokens.add(self.stop_token) + self.index = SortedDict({k: i + 1 for i, k in enumerate(tokens)}) - self.reverse_index = [None] + list(tokens) - self.stop_int = self.index[self.stop_token] + self.reverse_index = [None] + list(tokens) # 0 is padding. + self.start_int = self.index.get(self.start_token, None) + self.stop_int = self.index.get(self.stop_token, None) + self.padding_int = 0 def __len__(self) -> int: """The number of tokens.""" @@ -47,33 +63,42 @@ def split(self, sequence: str) -> list[str]: def tokenize( self, - sequences: Iterable[str], - to_strings: bool = False, + sequences: Iterable[str] | str, + add_start: bool = False, add_stop: bool = False, - ) -> torch.Tensor | list[list[str]]: + to_strings: bool = False, + ) -> torch.tensor | list[list[str]]: """Tokenize the input sequences. Parameters ---------- - sequences : Iterable[str] + sequences : Iterable[str] or str The sequences to tokenize. + add_start : bool, optional + Prepend the start token to the beginning of the sequence. + add_stop : bool, optional + Append the stop token to the end of the sequence. to_strings : bool, optional Return each as a list of token strings rather than a tensor. This is useful for debugging. - add_stop : bool, optional - Append the stop token tothe end of the sequence. Returns ------- - torch.Tensor of shape (n_sequences, max_length) or list[list[str]] + torch.tensor of shape (n_sequences, max_length) or list[list[str]] Either a tensor containing the integer values for each token, padded with 0's, or the list of tokens comprising each sequence. + """ + add_start = add_start and self.start_token is not None + add_stop = add_stop and self.stop_token is not None try: out = [] for seq in utils.listify(sequences): tokens = self.split(seq) + if add_start and tokens[0] != self.start_token: + tokens.insert(0, self.start_token) + if add_stop and tokens[-1] != self.stop_token: tokens.append(self.stop_token) @@ -86,9 +111,6 @@ def tokenize( if to_strings: return out - if isinstance(sequences, str): - return out[0] - return nn.utils.rnn.pad_sequence(out, batch_first=True) except KeyError as err: raise ValueError("Unrecognized token") from err @@ -97,6 +119,7 @@ def detokenize( self, tokens: torch.Tensor, join: bool = True, + trim_start_token: bool = True, trim_stop_token: bool = True, ) -> list[str] | list[list[str]]: """Retreive sequences from tokens. @@ -107,6 +130,8 @@ def detokenize( The zero-padded tensor of integerized tokens to decode. join : bool, optional Join tokens into strings? + trim_start_token : bool, optional + Remove the start token from the beginning of a sequence. trim_stop_token : bool, optional Remove the stop token from the end of a sequence. @@ -114,6 +139,7 @@ def detokenize( ------- list[str] or list[list[str]] The decoded sequences each as a string or list or strings. + """ decoded = [] for row in tokens: @@ -123,6 +149,8 @@ def detokenize( if self.reverse_index[i] is not None ] + if trim_start_token and seq[0] == self.start_token: + seq.pop(0) if trim_stop_token and seq[-1] == self.stop_token: seq.pop(-1) diff --git a/depthcharge/transformers/__init__.py b/depthcharge/transformers/__init__.py index 04b3b7d..0f5f741 100644 --- a/depthcharge/transformers/__init__.py +++ b/depthcharge/transformers/__init__.py @@ -1,6 +1,7 @@ """Transformer models.""" -from .peptides import ( - PeptideTransformerDecoder, - PeptideTransformerEncoder, + +from .analytes import ( + AnalyteTransformerDecoder, + AnalyteTransformerEncoder, ) from .spectra import SpectrumTransformerEncoder diff --git a/depthcharge/transformers/analytes.py b/depthcharge/transformers/analytes.py new file mode 100644 index 0000000..8d95253 --- /dev/null +++ b/depthcharge/transformers/analytes.py @@ -0,0 +1,478 @@ +"""Transformer models for peptides and small molecules.""" + +import warnings + +import torch + +from .. import utils +from ..encoders import PositionalEncoder +from ..mixins import ModelMixin, TransformerMixin +from ..tokenizers import Tokenizer + + +class _AnalyteTransformer(torch.nn.Module, ModelMixin, TransformerMixin): + """A transformer base class for analyte sequences. + + Parameters + ---------- + n_tokens : int or Tokenizer + The number of tokens used to tokenize molecular sequences. + d_model : int + The latent dimensionality to represent each element in the molecular + sequence. + nhead : int, optional + The number of attention heads in each layer. ``d_model`` must be + divisible by ``nhead``. + dim_feedforward : int, optional + The dimensionality of the fully connected layers in the Transformer + layers of the model. + n_layers : int, optional + The number of Transformer layers. + dropout : float, optional + The dropout probability for all layers. + positional_encoder : PositionalEncoder or bool, optional + The positional encodings to use for the elements of the sequence. If + ``True``, the default positional encoder is used. ``False`` disables + positional encodings, typically only for ablation tests. + padding_int : int, optional + The index that represents padding in the input sequence. Required + only if ``n_tokens`` was provided as an ``int``. + + """ + + def __init__( + self, + n_tokens: int | Tokenizer, + d_model: int, + nhead: int, + dim_feedforward: int, + n_layers: int, + dropout: float, + positional_encoder: PositionalEncoder | bool, + padding_int: int | None, + ) -> None: + """Initialize an AnalyteTransformer.""" + super().__init__() + self._d_model = d_model + self._nhead = nhead + self._dim_feedforward = dim_feedforward + self._n_layers = n_layers + self._dropout = dropout + + try: + self._n_tokens = len(n_tokens) + self._padding_int = n_tokens.padding_int + except TypeError: + self._n_tokens = n_tokens + self._padding_int = padding_int + + if padding_int is not None and padding_int != self._padding_int: + warnings.warn( + "The provided padding_int differs from the " + "Tokenizer.padding_int. The padding_int is being overridden." + ) + elif padding_int is None and self._padding_int is None: + raise ValueError( + "padding_int must be specified when n_tokens is an int.", + ) + + if callable(positional_encoder): + self.positional_encoder = positional_encoder + elif positional_encoder: + self.positional_encoder = PositionalEncoder(d_model) + else: + self.positional_encoder = torch.nn.Identity() + + self.token_encoder = torch.nn.Embedding( + self._n_tokens + 1, + d_model, + padding_idx=self._padding_int, + ) + + def global_token_hook( + self, + tokens: torch.Tensor, + *args: torch.Tensor, + **kwargs: dict, + ) -> torch.Tensor: + """Define how additional information in the batch may be used. + + Overwrite this method to define custom functionality dependent on + information in the batch. Examples would be to incorporate any + combination of the mass, charge, retention time, or + ion mobility of an analyte. + + The representation returned by this method is preprended to the + peak representations that are fed into the Transformer and + ultimately contribute to the analyte representation that is the + first element of the sequence in the model output. + + By default, this method returns a tensor of zeros. + + Parameters + ---------- + tokens : list of str, torch.Tensor, or None + The partial molecular sequences for which to predict the next + token. Optionally, these may be the token indices instead + of a string. + *args : torch.Tensor + Additional data passed with the batch. + **kwargs : dict + Additional data passed with the batch. + + Returns + ------- + torch.Tensor of shape (batch_size, d_model) + The global token representations. + + """ + return torch.zeros((tokens.shape[0], self.d_model)).type_as( + self.token_encoder.weight + ) + + +class AnalyteTransformerEncoder(_AnalyteTransformer): + """A transformer encoder for peptide and small molecule analytes. + + Parameters + ---------- + n_tokens : int or Tokenizer + The number of tokens used to tokenize molecular sequences. + d_model : int + The latent dimensionality to represent each element in the molecular + sequence. + nhead : int, optional + The number of attention heads in each layer. ``d_model`` must be + divisible by ``nhead``. + dim_feedforward : int, optional + The dimensionality of the fully connected layers in the Transformer + layers of the model. + n_layers : int, optional + The number of Transformer layers. + dropout : float, optional + The dropout probability for all layers. + positional_encoder : PositionalEncoder or bool, optional + The positional encodings to use for the elements of the sequence. If + ``True``, the default positional encoder is used. ``False`` disables + positional encodings, typically only for ablation tests. + padding_int : int, optional + The index that represents padding in the input sequence. Required + only if ``n_tokens`` was provided as an ``int``. + + """ + + def __init__( + self, + n_tokens: int | Tokenizer, + d_model: int = 128, + nhead: int = 8, + dim_feedforward: int = 1024, + n_layers: int = 1, + dropout: float = 0, + positional_encoder: PositionalEncoder | bool = True, + padding_int: int | None = None, + ) -> None: + """Initialize an AnalyteEncoder.""" + super().__init__( + n_tokens=n_tokens, + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + n_layers=n_layers, + dropout=dropout, + positional_encoder=positional_encoder, + padding_int=padding_int, + ) + + # The Transformer layers: + layer = torch.nn.TransformerEncoderLayer( + d_model=self.d_model, + nhead=self.nhead, + dim_feedforward=self.dim_feedforward, + batch_first=True, + dropout=self.dropout, + ) + + self.transformer_encoder = torch.nn.TransformerEncoder( + layer, + num_layers=n_layers, + ) + + def forward( + self, + tokens: torch.Tensor, + *args: torch.Tensor, + mask: torch.Tensor = None, + **kwargs: dict, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Encode a collection of sequences. + + Parameters + ---------- + tokens : torch.Tensor of size (batch_size, len_sequence) + The integer tokens describing each analyte sequence, padded + to the maximum analyte length in the batch with 0s. + *args : torch.Tensor, optional + Additional data. These may be used by overwriting the + `global_token_hook()` method in a subclass. + mask : torch.Tensor + Passed to `torch.nn.TransformerEncoder.forward()`. The mask + for the sequence. + **kwargs : dict + Additional data fields. These may be used by overwriting + the `global_token_hook()` method in a subclass. + + Returns + ------- + latent : torch.Tensor of shape (batch_size, len_sequence, d_model) + The latent representations for the spectrum and each of its + peaks. + mem_mask : torch.Tensor + The memory mask specifying which elements were padding in X. + + """ + # Encode everything: + encoded = self.token_encoder(tokens) + global_token = self.global_token_hook(tokens, *args, **kwargs) + encoded = torch.cat([global_token[:, None, :], encoded], dim=1) + + # Create mask + src_key_padding_mask = ~encoded.sum(dim=2).bool() + src_key_padding_mask[:, 0] = False + + # Add positional encodings + encoded = self.positional_encoder(encoded) + + # Run through the model: + latent = self.transformer_encoder( + encoded, + mask=mask, + src_key_padding_mask=src_key_padding_mask, + ) + return latent, src_key_padding_mask + + +class AnalyteTransformerDecoder(_AnalyteTransformer): + """A transformer decoder for peptide or small molecule sequences. + + Parameters + ---------- + n_tokens : int or Tokenizer + The number of tokens used to tokenize molecular sequences. + d_model : int, optional + The latent dimensionality to represent elements of the sequence. + nhead : int, optional + The number of attention heads in each layer. ``d_model`` must be + divisible by ``nhead``. + dim_feedforward : int, optional + The dimensionality of the fully connected layers in the Transformer + layers of the model. + n_layers : int, optional + The number of Transformer layers. + dropout : float, optional + The dropout probability for all layers. + positional_encoder : PositionalEncoder or bool, optional + The positional encodings to use for the molecular sequence. If + ``True``, the default positional encoder is used. ``False`` disables + positional encodings, typically only for ablation tests. + padding_int : int, optional + The index that represents padding in the input sequence. Required + only if ``n_tokens`` was provided as an ``int``. + + """ + + def __init__( + self, + n_tokens: int | Tokenizer, + d_model: int = 128, + nhead: int = 8, + dim_feedforward: int = 1024, + n_layers: int = 1, + dropout: float = 0, + positional_encoder: PositionalEncoder | bool = True, + padding_int: int | None = None, + ) -> None: + """Initialize a AnalyteDecoder.""" + super().__init__( + n_tokens=n_tokens, + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + n_layers=n_layers, + dropout=dropout, + positional_encoder=positional_encoder, + padding_int=padding_int, + ) + + # Additional model components + layer = torch.nn.TransformerDecoderLayer( + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + batch_first=True, + dropout=dropout, + ) + + self.transformer_decoder = torch.nn.TransformerDecoder( + layer, + num_layers=n_layers, + ) + + self.final = torch.nn.Linear( + d_model, + self.token_encoder.num_embeddings - 1, + ) + + def embed( + self, + tokens: torch.Tensor | None, + *args: torch.Tensor, + memory: torch.Tensor | None, + memory_key_padding_mask: torch.Tensor | None = None, + memory_mask: torch.Tensor | None = None, + tgt_mask: torch.Tensor | None = None, + **kwargs: dict, + ) -> torch.Tensor: + """Embed a collection of sequences. + + Parameters + ---------- + tokens : list of str, torch.Tensor, or None + The partial molecular sequences for which to predict the next + token. Optionally, these may be the token indices instead + of a string. + *args : torch.Tensor, optional + Additional data. These may be used by overwriting the + `global_token_hook()` method in a subclass. + memory : torch.Tensor of shape (batch_size, len_seq, d_model) + The representations from a ``TransformerEncoder``, such as a + ``SpectrumTransformerEncoder``. + memory_key_padding_mask : torch.Tensor of shape (batch_size, len_seq) + Passed to `torch.nn.TransformerEncoder.forward()`. The mask that + indicates which elements of ``memory`` are padding. + memory_mask : torch.Tensor + Passed to `torch.nn.TransformerEncoder.forward()`. The mask + for the memory sequence. + tgt_mask : torch.Tensor or None + Passed to `torch.nn.TransformerEncoder.forward()`. The default + is a mask that is suitable for predicting the next element in + the sequence. + **kwargs : dict + Additional data fields. These may be used by overwriting + the `global_token_hook()` method in a subclass. + + Returns + ------- + embeddings : torch.Tensor of size (batch_size, len_sequence, d_model) + The output of the Transformer layer containing the embeddings + of the tokens in the sequence. These may be tranformed to yield + scores for token predictions using the `.score_embeddings()` + method. + + """ + # Prepare sequences + if tokens is None: + tokens = torch.tensor([[]]).to(self.device) + + # Encode everything: + encoded = self.token_encoder(tokens) + + # Add the global token + global_token = self.global_token_hook(tokens, *args, **kwargs) + encoded = torch.cat([global_token[:, None, :], encoded], dim=1) + + # Create the padding mask: + tgt_key_padding_mask = encoded.sum(axis=2) == 0 + tgt_key_padding_mask[:, 0] = False + + # Feed through model: + encoded = self.positional_encoder(encoded) + + if tgt_mask is None: + tgt_mask = utils.generate_tgt_mask(encoded.shape[1]).to( + self.device + ) + + return self.transformer_decoder( + tgt=encoded, + memory=memory, + tgt_mask=tgt_mask, + tgt_key_padding_mask=tgt_key_padding_mask, + memory_key_padding_mask=memory_key_padding_mask, + memory_mask=memory_mask, + ) + + def score_embeddings(self, embeddings: torch.Tensor) -> torch.Tensor: + """Score the embeddings to find the most confident tokens. + + Parameters + ---------- + embeddings: torch.Tensor of shape (batch_size, len_seq, d_model) + The embeddings from the Transformer layer. + + Returns + ------- + scores : torch.Tensor of size (batch_size, len_sequence, n_tokens) + The raw output for the final linear layer. These can be Softmax + transformed to yield the probability of each token for the + prediction. + + """ + return self.final(embeddings) + + def forward( + self, + tokens: torch.Tensor | None, + *args: torch.Tensor, + memory: torch.Tensor | None, + memory_key_padding_mask: torch.Tensor | None = None, + memory_mask: torch.Tensor | None = None, + tgt_mask: torch.Tensor | None = None, + **kwargs: dict, + ) -> torch.Tensor: + """Decode a collection of sequences. + + Parameters + ---------- + tokens : list of str, torch.Tensor, or None + The partial molecular sequences for which to predict the next + token. Optionally, these may be the token indices instead + of a string. + *args : torch.Tensor, optional + Additional data. These may be used by overwriting the + `global_token_hook()` method in a subclass. + memory : torch.Tensor of shape (batch_size, len_seq, d_model) + The representations from a ``TransformerEncoder``, such as a + ``SpectrumTransformerEncoder``. + memory_key_padding_mask : torch.Tensor of shape (batch_size, len_seq) + Passed to `torch.nn.TransformerEncoder.forward()`. The mask that + indicates which elements of ``memory`` are padding. + memory_mask : torch.Tensor + Passed to `torch.nn.TransformerEncoder.forward()`. The mask + for the memory sequence. + tgt_mask : torch.Tensor or None + Passed to `torch.nn.TransformerEncoder.forward()`. The default + is a mask that is suitable for predicting the next element in + the sequence. + **kwargs : dict + Additional data fields. These may be used by overwriting + the `global_token_hook()` method in a subclass. + + Returns + ------- + scores : torch.Tensor of size (batch_size, len_sequence, n_tokens) + The raw output for the final linear layer. These can be Softmax + transformed to yield the probability of each token for the + prediction. + + """ + emb = self.embed( + tokens, + *args, + memory=memory, + memory_key_padding_mask=memory_key_padding_mask, + memory_mask=memory_mask, + tgt_mask=tgt_mask, + **kwargs, + ) + return self.score_embeddings(emb) diff --git a/depthcharge/transformers/peptides.py b/depthcharge/transformers/peptides.py deleted file mode 100644 index 9bf2c7c..0000000 --- a/depthcharge/transformers/peptides.py +++ /dev/null @@ -1,289 +0,0 @@ -"""Transformer models for peptides.""" -import torch - -from ..encoders import FloatEncoder, PositionalEncoder -from ..tokenizers import PeptideTokenizer - - -class _PeptideTransformer(torch.nn.Module): - """A transformer base class for peptide sequences. - - Parameters - ---------- - n_tokens : int or PeptideTokenizer - The number of tokens used to tokenize peptide sequences. - d_model : int - The latent dimensionality to represent the amino acids in a peptide - sequence. - positional_encoder : PositionalEncoder or bool - The positional encodings to use for the amino acid sequence. If - ``True``, the default positional encoder is used. ``False`` disables - positional encodings, typically only for ablation tests. - max_charge : int - The maximum precursor charge to embed. - """ - - def __init__( - self, - n_tokens: int | PeptideTokenizer, - d_model: int, - positional_encoder: PositionalEncoder | bool, - max_charge: int, - ) -> None: - super().__init__() - try: - n_tokens = len(n_tokens) - except TypeError: - pass - - if callable(positional_encoder): - self.positional_encoder = positional_encoder - elif positional_encoder: - self.positional_encoder = PositionalEncoder(d_model) - else: - self.positional_encoder = torch.nn.Identity() - - self.charge_encoder = torch.nn.Embedding(max_charge + 1, d_model) - self.aa_encoder = torch.nn.Embedding( - n_tokens + 1, - d_model, - padding_idx=0, - ) - - @property - def device(self) -> torch.device: - """The current device for the model.""" - return next(self.parameters()).device - - -class PeptideTransformerEncoder(_PeptideTransformer): - """A transformer encoder for peptide sequences. - - Parameters - ---------- - n_tokens : int or PeptideTokenizer - The number of tokens used to tokenize peptide sequences. - d_model : int - The latent dimensionality to represent the amino acids in a peptide - sequence. - nhead : int, optional - The number of attention heads in each layer. ``d_model`` must be - divisible by ``nhead``. - dim_feedforward : int, optional - The dimensionality of the fully connected layers in the Transformer - layers of the model. - n_layers : int, optional - The number of Transformer layers. - dropout : float, optional - The dropout probability for all layers. - positional_encoder : PositionalEncoder or bool, optional - The positional encodings to use for the amino acid sequence. If - ``True``, the default positional encoder is used. ``False`` disables - positional encodings, typically only for ablation tests. - max_charge : int, optional - The maximum charge state for peptide sequences. - """ - - def __init__( - self, - n_tokens: int | PeptideTokenizer, - d_model: int = 128, - nhead: int = 8, - dim_feedforward: int = 1024, - n_layers: int = 1, - dropout: float = 0, - positional_encoder: PositionalEncoder | bool = True, - max_charge: int = 5, - ) -> None: - """Initialize a PeptideEncoder.""" - super().__init__( - n_tokens=n_tokens, - d_model=d_model, - positional_encoder=positional_encoder, - max_charge=max_charge, - ) - - # The Transformer layers: - layer = torch.nn.TransformerEncoderLayer( - d_model=d_model, - nhead=nhead, - dim_feedforward=dim_feedforward, - batch_first=True, - dropout=dropout, - ) - - self.transformer_encoder = torch.nn.TransformerEncoder( - layer, - num_layers=n_layers, - ) - - def forward( - self, - tokens: torch.Tensor, - charges: torch.Tensor, - ) -> tuple[torch.Tensor, torch.Tensor]: - """Predict the next amino acid for a collection of sequences. - - Parameters - ---------- - tokens : torch.Tensor of size (batch_size, peptide_length) - The integer tokens describing each peptide sequence, padded - to the maximum peptide length in the batch with 0s. - charges : torch.Tensor of size (batch_size,) - The charge state of each peptide. - - Returns - ------- - latent : torch.Tensor of shape (n_sequences, len_sequence, d_model) - The latent representations for the spectrum and each of its - peaks. - mem_mask : torch.Tensor - The memory mask specifying which elements were padding in X. - """ - # Encode everything: - encoded = self.aa_encoder(tokens) - charges = self.charge_encoder(charges)[:, None] - encoded = torch.cat([charges, encoded], dim=1) - - # Create mask - mask = ~encoded.sum(dim=2).bool() - - # Add positional encodings - encoded = self.positional_encoder(encoded) - - # Run through the model: - latent = self.transformer_encoder(encoded, src_key_padding_mask=mask) - return latent, mask - - -class PeptideTransformerDecoder(_PeptideTransformer): - """A transformer decoder for peptide sequences. - - Parameters - ---------- - n_tokens : int or PeptideTokenizer - The number of tokens used to tokenize peptide sequences. - d_model : int, optional - The latent dimensionality to represent peaks in the mass spectrum. - nhead : int, optional - The number of attention heads in each layer. ``d_model`` must be - divisible by ``nhead``. - dim_feedforward : int, optional - The dimensionality of the fully connected layers in the Transformer - layers of the model. - n_layers : int, optional - The number of Transformer layers. - dropout : float, optional - The dropout probability for all layers. - positional_encoder : PositionalEncoder or bool, optional - The positional encodings to use for the amino acid sequence. If - ``True``, the default positional encoder is used. ``False`` disables - positional encodings, typically only for ablation tests. - max_charge : int, optional - The maximum charge state for peptide sequences. - """ - - def __init__( - self, - n_tokens: int | PeptideTokenizer, - d_model: int = 128, - nhead: int = 8, - dim_feedforward: int = 1024, - n_layers: int = 1, - dropout: float = 0, - positional_encoder: PositionalEncoder | bool = True, - max_charge: int = 5, - ) -> None: - """Initialize a PeptideDecoder.""" - super().__init__( - n_tokens=n_tokens, - d_model=d_model, - positional_encoder=positional_encoder, - max_charge=max_charge, - ) - - # Additional model components - self.mass_encoder = FloatEncoder(d_model) - layer = torch.nn.TransformerDecoderLayer( - d_model=d_model, - nhead=nhead, - dim_feedforward=dim_feedforward, - batch_first=True, - dropout=dropout, - ) - - self.transformer_decoder = torch.nn.TransformerDecoder( - layer, - num_layers=n_layers, - ) - - self.final = torch.nn.Linear( - d_model, - self.aa_encoder.num_embeddings - 1, - ) - - def forward( - self, - tokens: torch.Tensor | None, - precursors: torch.Tensor, - memory: torch.Tensor, - memory_key_padding_mask: torch.Tensor, - ) -> tuple[torch.Tensor, torch.Tensor]: - """Predict the next amino acid for a collection of sequences. - - Parameters - ---------- - tokens : list of str, torch.Tensor, or None - The partial peptide sequences for which to predict the next - amino acid. Optionally, these may be the token indices instead - of a string. - precursors : torch.Tensor of size (batch_size, 2) - The precursor mass (axis 0) and charge (axis 1). - memory : torch.Tensor of shape (batch_size, n_peaks, d_model) - The representations from a ``TransformerEncoder``, such as a - ``SpectrumEncoder``. - memory_key_padding_mask : torch.Tensor of shape (batch_size, n_peaks) - The mask that indicates which elements of ``memory`` are padding. - - Returns - ------- - scores : torch.Tensor of size (batch_size, len_sequence, n_amino_acids) - The raw output for the final linear layer. These can be Softmax - transformed to yield the probability of each amino acid for the - prediction. - - """ - # Prepare sequences - if tokens is None: - tokens = torch.tensor([[]]).to(self.device) - - # Encode everything: - tokens = self.aa_encoder(tokens) - masses = self.mass_encoder(precursors[:, None, 0]) - charges = self.charge_encoder(precursors[:, 1].int() - 1) - precursors = masses + charges[:, None, :] - - # Feed through model: - tgt = torch.cat([precursors, tokens], dim=1) - tgt_key_padding_mask = tgt.sum(axis=2) == 0 - tgt = self.positional_encoder(tgt) - tgt_mask = generate_tgt_mask(tgt.shape[1]).to(self.device) - preds = self.transformer_decoder( - tgt=tgt, - memory=memory, - tgt_mask=tgt_mask, - tgt_key_padding_mask=tgt_key_padding_mask, - memory_key_padding_mask=memory_key_padding_mask.to(self.device), - ) - return self.final(preds) - - -def generate_tgt_mask(sz: int) -> torch.Tensor: - """Generate a square mask for the sequence. - - Parameters - ---------- - sz : int - The length of the target sequence. - """ - return ~torch.triu(torch.ones(sz, sz, dtype=torch.bool)).transpose(0, 1) diff --git a/depthcharge/transformers/spectra.py b/depthcharge/transformers/spectra.py index 3f5b826..3c4282e 100644 --- a/depthcharge/transformers/spectra.py +++ b/depthcharge/transformers/spectra.py @@ -1,19 +1,23 @@ """Tranformer models to handle mass spectra.""" + from collections.abc import Callable import torch from ..encoders import PeakEncoder +from ..mixins import ModelMixin, TransformerMixin -class SpectrumTransformerEncoder(torch.nn.Module): +class SpectrumTransformerEncoder( + torch.nn.Module, ModelMixin, TransformerMixin +): """A Transformer encoder for input mass spectra. Use this PyTorch module to embed mass spectra. By default, nothing other than the m/z and intensity arrays for each mass spectrum are considered. However, arbitrary information can be integrated into the spectrum representation by subclassing this class and overwriting the - `precursor_hook()` method. + `global_token_hook()` method. Parameters ---------- @@ -46,6 +50,7 @@ class SpectrumTransformerEncoder(torch.nn.Module): spectrum. transformer_encoder : torch.nn.TransformerEncoder The Transformer encoder layers. + """ def __init__( @@ -54,7 +59,7 @@ def __init__( nhead: int = 8, dim_feedforward: int = 1024, n_layers: int = 1, - dropout: float = 0, + dropout: float = 0.0, peak_encoder: PeakEncoder | Callable | bool = True, ) -> None: """Initialize a SpectrumEncoder.""" @@ -74,47 +79,24 @@ def __init__( # The Transformer layers: layer = torch.nn.TransformerEncoderLayer( - d_model=d_model, - nhead=nhead, - dim_feedforward=dim_feedforward, + d_model=self.d_model, + nhead=self.nhead, + dim_feedforward=self.dim_feedforward, batch_first=True, - dropout=dropout, + dropout=self.dropout, ) self.transformer_encoder = torch.nn.TransformerEncoder( layer, - num_layers=n_layers, + num_layers=self.n_layers, ) - @property - def d_model(self) -> int: - """The latent dimensionality of the model.""" - return self._d_model - - @property - def nhead(self) -> int: - """The number of attention heads.""" - return self._nhead - - @property - def dim_feedforward(self) -> int: - """The dimensionality of the Transformer feedforward layers.""" - return self._dim_feedforward - - @property - def n_layers(self) -> int: - """The number of Transformer layers.""" - return self._n_layers - - @property - def dropout(self) -> float: - """The dropout for the transformer layers.""" - return self._dropout - def forward( self, mz_array: torch.Tensor, intensity_array: torch.Tensor, + *args: torch.Tensor, + mask: torch.Tensor | None = None, **kwargs: dict, ) -> tuple[torch.Tensor, torch.Tensor]: """Embed a batch of mass spectra. @@ -125,9 +107,15 @@ def forward( The zero-padded m/z dimension for a batch of mass spectra. intensity_array : torch.Tensor of shape (n_spectra, n_peaks) The zero-padded intensity dimension for a batch of mass spctra. + *args : torch.Tensor + Additional data. These may be used by overwriting the + `global_token_hook()` method in a subclass. + mask : torch.Tensor + Passed to `torch.nn.TransformerEncoder.forward()`. The mask + for the sequence. **kwargs : dict - Additional fields provided by the data loader. These may be - used by overwriting the `precursor_hook()` method in a subclass. + Additional data fields. These may be used by overwriting + the `global_token_hook()` method in a subclass. Returns ------- @@ -136,29 +124,43 @@ def forward( peaks. mem_mask : torch.Tensor The memory mask specifying which elements were padding in X. + """ spectra = torch.stack([mz_array, intensity_array], dim=2) - n_batch = spectra.shape[0] - zeros = ~spectra.sum(dim=2).bool() - mask = torch.cat( - [torch.tensor([[False]] * n_batch).type_as(zeros), zeros], dim=1 + + # Create the padding mask: + src_key_padding_mask = spectra.sum(dim=2) == 0 + global_token_mask = torch.tensor([[False]] * spectra.shape[0]).type_as( + src_key_padding_mask ) + src_key_padding_mask = torch.cat( + [global_token_mask, src_key_padding_mask], dim=1 + ) + + # Encode the peaks peaks = self.peak_encoder(spectra) # Add the precursor information: - latent_spectra = self.precursor_hook( + latent_spectra = self.global_token_hook( + *args, mz_array=mz_array, intensity_array=intensity_array, **kwargs, ) peaks = torch.cat([latent_spectra[:, None, :], peaks], dim=1) - return self.transformer_encoder(peaks, src_key_padding_mask=mask), mask + out = self.transformer_encoder( + peaks, + mask=mask, + src_key_padding_mask=src_key_padding_mask, + ) + return out, src_key_padding_mask - def precursor_hook( + def global_token_hook( self, mz_array: torch.Tensor, intensity_array: torch.Tensor, + *args: torch.Tensor, **kwargs: dict, ) -> torch.Tensor: """Define how additional information in the batch may be used. @@ -181,17 +183,15 @@ def precursor_hook( The zero-padded m/z dimension for a batch of mass spectra. intensity_array : torch.Tensor of shape (n_spectra, n_peaks) The zero-padded intensity dimension for a batch of mass spctra. + *args : torch.Tensor + Additional data passed with the batch. **kwargs : dict - The additional data passed with the batch. + Additional data passed with the batch. Returns ------- torch.Tensor of shape (batch_size, d_model) The precursor representations. + """ return torch.zeros((mz_array.shape[0], self.d_model)).type_as(mz_array) - - @property - def device(self) -> torch.device: - """The current device for the model.""" - return next(self.parameters()).device diff --git a/depthcharge/utils.py b/depthcharge/utils.py index 3b3255b..0f78e59 100644 --- a/depthcharge/utils.py +++ b/depthcharge/utils.py @@ -1,7 +1,9 @@ """Common utility functions.""" + from typing import Any import polars as pl +import torch def listify(obj: Any) -> list[Any]: # noqa: ANN401 @@ -16,3 +18,15 @@ def listify(obj: Any) -> list[Any]: # noqa: ANN401 obj = [obj] return list(obj) + + +def generate_tgt_mask(sz: int) -> torch.Tensor: + """Generate a square mask for the sequence. + + Parameters + ---------- + sz : int + The length of the target sequence. + + """ + return ~torch.triu(torch.ones(sz, sz, dtype=torch.bool)).transpose(0, 1) diff --git a/depthcharge/version.py b/depthcharge/version.py index f8771ad..80e7222 100644 --- a/depthcharge/version.py +++ b/depthcharge/version.py @@ -1,4 +1,5 @@ """Get the version information.""" + from importlib.metadata import PackageNotFoundError, version diff --git a/pyproject.toml b/pyproject.toml index 07d7065..dbf3713 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ line-length = 79 target-version = "py310" [tool.ruff.lint] -select = ["E", "F", "W", "C", "I", "D", "UP", "N", "ANN", "T20"] +select = ["E", "F", "W", "C", "I", "D", "UP", "N", "T20"] # ANN101 Missing type annotation for `self` in method # D213 Multi-line docstring summary should start at the second lin diff --git a/tests/conftest.py b/tests/conftest.py index aa19076..07a51d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ """Pytest fixtures.""" + from pathlib import Path import numpy as np @@ -55,6 +56,7 @@ def _create_mgf_entry(peptide, charge=2): ------- str The PSM entry in an MGF file format. + """ missing = not charge charge = 2 if not charge else charge @@ -108,6 +110,7 @@ def _create_mgf(peptides, mgf_file, add_problems=False, random_state=42): ------- PathLike The MGF file. + """ rng = np.random.default_rng(random_state) peptides = list(peptides) @@ -146,6 +149,7 @@ def _random_peptides(n_peptides, random_state=42): ------ str A peptide sequence + """ rng = np.random.default_rng(random_state) residues = "ACDEFGHIKLMNPQRSTUVWY" diff --git a/tests/unit_tests/test_data/test_arrow.py b/tests/unit_tests/test_data/test_arrow.py index 540d5f8..0c4f884 100644 --- a/tests/unit_tests/test_data/test_arrow.py +++ b/tests/unit_tests/test_data/test_arrow.py @@ -1,4 +1,5 @@ """Test the arrow functionality.""" + import polars as pl import pyarrow as pa import pytest diff --git a/tests/unit_tests/test_data/test_datasets.py b/tests/unit_tests/test_data/test_datasets.py index a2a7be8..b3435e7 100644 --- a/tests/unit_tests/test_data/test_datasets.py +++ b/tests/unit_tests/test_data/test_datasets.py @@ -1,4 +1,5 @@ """Test the datasets.""" + import pickle import shutil @@ -7,15 +8,15 @@ import torch from depthcharge.data import ( + AnalyteDataset, AnnotatedSpectrumDataset, CustomField, - PeptideDataset, SpectrumDataset, StreamingSpectrumDataset, arrow, ) from depthcharge.testing import assert_dicts_equal -from depthcharge.tokenizers import PeptideTokenizer +from depthcharge.tokenizers import MoleculeTokenizer, PeptideTokenizer @pytest.fixture(scope="module") @@ -135,24 +136,22 @@ def test_streaming_spectra(mgf_small): assert_dicts_equal(spec, expected) -def test_peptide_dataset(tokenizer): +def test_analyte_dataset(tokenizer): """Test the peptide dataset.""" seqs = ["LESLIEK", "EDITHR"] charges = torch.tensor([2, 3]) - dset = PeptideDataset(tokenizer, seqs, charges) - torch.testing.assert_close(dset[0][0], tokenizer.tokenize("LESLIEK")) - torch.testing.assert_close(dset[1][0][:6], tokenizer.tokenize("EDITHR")) - assert dset[0][1].item() == 2 - assert dset[1][1].item() == 3 + dset = AnalyteDataset(tokenizer, seqs) + torch.testing.assert_close(dset[0][0], tokenizer.tokenize("LESLIEK")[0]) + torch.testing.assert_close(dset[1][0][:6], tokenizer.tokenize("EDITHR")[0]) assert len(dset) == 2 seqs = ["LESLIEK", "EDITHR"] charges = torch.tensor([2, 3]) target = torch.tensor([1.1, 2.2]) other = torch.tensor([[1, 1], [2, 2]]) - dset = PeptideDataset(tokenizer, seqs, charges, target, other) - torch.testing.assert_close(dset[0][0], tokenizer.tokenize("LESLIEK")) - torch.testing.assert_close(dset[1][0][:6], tokenizer.tokenize("EDITHR")) + dset = AnalyteDataset(tokenizer, seqs, charges, target, other) + torch.testing.assert_close(dset[0][0], tokenizer.tokenize("LESLIEK")[0]) + torch.testing.assert_close(dset[1][0][:6], tokenizer.tokenize("EDITHR")[0]) assert dset[0][1].item() == 2 assert dset[1][1].item() == 3 torch.testing.assert_close(dset[0][2], torch.tensor(1.1)) @@ -160,7 +159,17 @@ def test_peptide_dataset(tokenizer): assert len(dset) == 2 torch.testing.assert_close(dset.tokens, tokenizer.tokenize(seqs)) - torch.testing.assert_close(dset.charges, charges) + torch.testing.assert_close(dset.tensors[1], charges) + + +def test_with_molecule_tokenizer(): + """Test analyte dataset with a molecule tokenizer.""" + tokenizer = MoleculeTokenizer() + smiles = ["Cn1cnc2c1c(=O)n(C)c(=O)n2C", "CC=CC(=O)C1=C(CCCC1(C)C)C"] + tokens = tokenizer.tokenize(smiles) + dset = AnalyteDataset(tokenizer, smiles) + + torch.testing.assert_close(dset.tokens, tokens) def test_pickle(tokenizer, tmp_path, mgf_small): diff --git a/tests/unit_tests/test_data/test_loaders.py b/tests/unit_tests/test_data/test_loaders.py index 18d9779..751f424 100644 --- a/tests/unit_tests/test_data/test_loaders.py +++ b/tests/unit_tests/test_data/test_loaders.py @@ -1,12 +1,13 @@ """Test PyTorch DataLoaders.""" + import pyarrow as pa import pytest import torch from depthcharge.data import ( + AnalyteDataset, AnnotatedSpectrumDataset, CustomField, - PeptideDataset, SpectrumDataset, StreamingSpectrumDataset, ) @@ -67,12 +68,12 @@ def test_ann_spectrum_loader(mgf_small): dset.loader(collate_fn=torch.utils.data.default_collate) -def test_peptide_loader(): +def test_analyte_loader(): """Test our peptid data loader.""" seqs = ["LESLIE", "EDITH", "PEPTIDE"] charges = torch.tensor([5, 3, 1]) tokenizer = PeptideTokenizer() - dset = PeptideDataset(tokenizer, seqs, charges) + dset = AnalyteDataset(tokenizer, seqs, charges) loader = dset.loader(batch_size=2, num_workers=0) batch = next(iter(loader)) @@ -84,7 +85,7 @@ def test_peptide_loader(): torch.testing.assert_close(batch[1], charges[:2]) args = (torch.tensor([1, 2, 3]), torch.tensor([[1, 1], [2, 2], [3, 3]])) - dset = PeptideDataset(tokenizer, seqs, charges, *args) + dset = AnalyteDataset(tokenizer, seqs, charges, *args) loader = dset.loader(batch_size=2, num_workers=0) batch = next(iter(loader)) diff --git a/tests/unit_tests/test_data/test_parsers.py b/tests/unit_tests/test_data/test_parsers.py index cfec339..49e6650 100644 --- a/tests/unit_tests/test_data/test_parsers.py +++ b/tests/unit_tests/test_data/test_parsers.py @@ -1,4 +1,5 @@ """Test that parsers work.""" + import polars as pl import pyarrow as pa import pytest diff --git a/tests/unit_tests/test_encoders/test_sinusoidal.py b/tests/unit_tests/test_encoders/test_sinusoidal.py index 0292bfa..bebdb63 100644 --- a/tests/unit_tests/test_encoders/test_sinusoidal.py +++ b/tests/unit_tests/test_encoders/test_sinusoidal.py @@ -1,4 +1,5 @@ """Test the encoders.""" + import numpy as np import pytest import torch diff --git a/tests/unit_tests/test_feedforward.py b/tests/unit_tests/test_feedforward.py index 51fd9d5..e55403a 100644 --- a/tests/unit_tests/test_feedforward.py +++ b/tests/unit_tests/test_feedforward.py @@ -1,4 +1,5 @@ """Test the feedforward model.""" + import torch from depthcharge.feedforward import FeedForward diff --git a/tests/unit_tests/test_primitives.py b/tests/unit_tests/test_primitives.py index 13b579b..fb1fd8a 100644 --- a/tests/unit_tests/test_primitives.py +++ b/tests/unit_tests/test_primitives.py @@ -1,4 +1,5 @@ """Test that our fundamental dataclasses work.""" + import numpy as np import pytest import torch diff --git a/tests/unit_tests/test_testing.py b/tests/unit_tests/test_testing.py index 5d475d1..6992362 100644 --- a/tests/unit_tests/test_testing.py +++ b/tests/unit_tests/test_testing.py @@ -1,4 +1,5 @@ """Ironically test that the testing functions are working.""" + import numpy as np import pytest import torch diff --git a/tests/unit_tests/test_tokenizers/test_molecules.py b/tests/unit_tests/test_tokenizers/test_molecules.py new file mode 100644 index 0000000..a4f1433 --- /dev/null +++ b/tests/unit_tests/test_tokenizers/test_molecules.py @@ -0,0 +1,69 @@ +"""Test the molecule tokenizer.""" + +import pytest + +from depthcharge.tokenizers import MoleculeTokenizer + + +@pytest.mark.parametrize( + ["mode", "vocab", "len_vocab"], + [ + ("basic", None, 69), + ("basic", ["x", "y"], 2), + ("selfies", ["[C][O][C]", "[F][C][F]", "[O][=O]"], 4), + ("selfies", "[C][O]", 2), + ("smiles", "CN1C=NC2=C1C(=O)N(C(=O)N2C)C", 8), + ("smiles", ["CN", "CC(=O)O"], 5), + ], +) +def test_init(mode, vocab, len_vocab): + """Test initialization.""" + if mode == "smiles": + tokenizer = MoleculeTokenizer.from_smiles(vocab) + elif mode == "selfies": + tokenizer = MoleculeTokenizer.from_selfies(vocab) + else: + tokenizer = MoleculeTokenizer(vocab) + + assert len(tokenizer.selfies_vocab) == len_vocab + + +@pytest.mark.parametrize( + "molecule", + [ + "Cn1cnc2c1c(=O)n(C)c(=O)n2C", + "[C][N][C][=N][C][=C][Ring1][Branch1][C][=Branch1][C][=O][N][Branch1]" + "[C][C][C][=Branch1][C][=O][N][Ring1][=Branch2][C]", + ], +) +def test_split(molecule): + """Test that split works as expected.""" + expected = [ + "[C]", + "[N]", + "[C]", + "[=N]", + "[C]", + "[=C]", + "[Ring1]", + "[Branch1]", + "[C]", + "[=Branch1]", + "[C]", + "[=O]", + "[N]", + "[Branch1]", + "[C]", + "[C]", + "[C]", + "[=Branch1]", + "[C]", + "[=O]", + "[N]", + "[Ring1]", + "[=Branch2]", + "[C]", + ] + + tokenizer = MoleculeTokenizer() + assert expected == tokenizer.split(molecule) diff --git a/tests/unit_tests/test_tokenizers/test_peptides.py b/tests/unit_tests/test_tokenizers/test_peptides.py index 94a891c..bfd8785 100644 --- a/tests/unit_tests/test_tokenizers/test_peptides.py +++ b/tests/unit_tests/test_tokenizers/test_peptides.py @@ -1,6 +1,4 @@ """Test peptide tokenizers.""" -import math -from functools import partial import pytest import torch @@ -8,46 +6,6 @@ from depthcharge.tokenizers.peptides import PeptideTokenizer -# Calculated using Pyteomics: -# These are [b_ions, y_ions] -LESLIEK_PLUS_ONE = [ - [ - 114.09134044390001, - 243.13393353187, - 330.16596193614004, - 443.25002591327006, - 556.3340898904, - 685.37668297837, - ], - [ - 147.11280416447, - 276.15539725243997, - 389.23946122957, - 502.3235252067, - 589.3555536109699, - 718.39814669894, - ], -] - -LESLIEK_PLUS_TWO = [ - [ - 57.54930845533501, - 122.07060499932, - 165.58661920145502, - 222.12865119002004, - 278.670683178585, - 343.19197972257, - ], - [ - 74.06004031561999, - 138.581336859605, - 195.12336884817, - 251.66540083673502, - 295.18141503886994, - 359.702711582855, - ], -] - def test_proforma_init(): """Test initialization.""" @@ -65,7 +23,10 @@ def test_proforma_init(): ) for key, val in expected_tokens: - assert proforma.residues[key] == val + if isinstance(proforma, PeptideTokenizer): + assert key in proforma.residues + else: + assert proforma.residues[key] == val tokens = proforma.tokenize(seqs, to_strings=True)[0] expected = [ @@ -103,9 +64,13 @@ def test_proforma_init(): orig = proforma.detokenize(tokens) assert orig == ["KEILSEL"] - tokens = proforma.tokenize("LESLIEK", True, True)[0] + tokens = proforma.tokenize("LESLIEK", True, True, True)[0] assert "".join(tokens) == "KEILSEL$" + # Test a non-canonical AA: + with pytest.raises(KeyError): + PeptideTokenizer.from_proforma("TOBIN") + def test_mskb_init(): """Test that the MassIVE-KB dataset works.""" @@ -115,65 +80,32 @@ def test_mskb_init(): assert tokens == ["[Acetyl]-", "E", "D", "I", "T", "H"] -def test_precursor_ions(): - """Test calculation of precurosr m/z.""" - tokenizer = PeptideTokenizer() - - aa_mass = dict(mass.std_aa_mass) - aa_mass["a"] = 42.010565 - aa_mass["o"] = 15.994915 - pymass = partial(mass.fast_mass, ion_type="M", aa_mass=aa_mass) - close = partial(math.isclose, rel_tol=1e-6) - - seq = "LESLIEK" - assert close(tokenizer.ions(seq, 1)[0].precursor, pymass(seq, charge=1)) - assert close(tokenizer.ions(seq, 2)[0].precursor, pymass(seq, charge=2)) - assert close(tokenizer.ions(seq, 3)[0].precursor, pymass(seq, charge=3)) - - seq = "[Acetyl]-LESLIM[Oxidation]K" - with pytest.raises(ValueError): - tokenizer.ions(seq, 1) - - tokenizer = PeptideTokenizer.from_proforma([seq]) - seq2 = "aLESLIMoK" - assert close(tokenizer.ions(seq, 1)[0].precursor, pymass(seq2, charge=1)) - assert close(tokenizer.ions(seq, 2)[0].precursor, pymass(seq2, charge=2)) - assert close(tokenizer.ions(seq, 3)[0].precursor, pymass(seq2, charge=3)) - - -def test_fragment_ions(): - """Test ion calculations.""" - tokenizer = PeptideTokenizer() - ions = tokenizer.ions(["LESLIEK"], [1])[0] - expected = torch.tensor(LESLIEK_PLUS_ONE)[:, :, None] - torch.testing.assert_close(ions.fragments, expected, check_dtype=False) - - ions = tokenizer.ions(["LESLIEK"], [2])[0] - expected = torch.cat( +def test_torch_precursor_ions(): + """Test the calculation of the precursor m/z.""" + seqs = ["LESLIEK", "EDITHR"] + charges = torch.tensor([2, 3]) + tokenizer = PeptideTokenizer.from_proforma(seqs) + expected = torch.tensor( [ - torch.tensor(LESLIEK_PLUS_ONE)[:, :, None], - torch.tensor(LESLIEK_PLUS_TWO)[:, :, None], - ], - dim=2, + mass.fast_mass(s, charge=z, ion_type="M") + for s, z in zip(seqs, charges) + ] ) - torch.testing.assert_close(ions.fragments, expected, check_dtype=False) - ions = tokenizer.ions(["LESLIEK/1"], None)[0] - expected = torch.tensor(LESLIEK_PLUS_ONE)[:, :, None] - torch.testing.assert_close(ions.fragments, expected, check_dtype=False) + ions = tokenizer.calculate_precursor_ions(seqs, charges) + torch.testing.assert_close(ions, expected) - ions = tokenizer.ions(["LESLIEK/3"], None)[0] - expected = torch.cat( - [ - torch.tensor(LESLIEK_PLUS_ONE)[:, :, None], - torch.tensor(LESLIEK_PLUS_TWO)[:, :, None], - ], - dim=2, - ) - torch.testing.assert_close(ions.fragments, expected, check_dtype=False) + tokens = tokenizer.tokenize(seqs) + ions = tokenizer.calculate_precursor_ions(tokens, charges) + torch.testing.assert_close(ions, expected) + + +def test_single_peptide(): + """Test proforma from a single peptide.""" + tokenizer = PeptideTokenizer.from_proforma("[+10]-EDITHR") + out = tokenizer.tokenize("LESLIEK") + assert out.shape == (1, 7) - tokenizer = PeptideTokenizer.from_proforma(["[+10]-LESLIEK"]) - ions = tokenizer.ions(["[+10.000000]-LESLIEK"], 1)[0] - expected = torch.tensor(LESLIEK_PLUS_ONE)[:, :, None] - expected[0, :, :] += 10 - torch.testing.assert_close(ions.fragments, expected, check_dtype=False) + ion = tokenizer.calculate_precursor_ions("LESLIEK", 2) + expected = mass.fast_mass("LESLIEK", charge=2, ion_type="M") + torch.testing.assert_close(ion, torch.tensor([expected])) diff --git a/tests/unit_tests/test_transformers/test_peptide_transformers.py b/tests/unit_tests/test_transformers/test_analyte_transformers.py similarity index 50% rename from tests/unit_tests/test_transformers/test_peptide_transformers.py rename to tests/unit_tests/test_transformers/test_analyte_transformers.py index d9debae..8bb4c01 100644 --- a/tests/unit_tests/test_transformers/test_peptide_transformers.py +++ b/tests/unit_tests/test_transformers/test_analyte_transformers.py @@ -1,22 +1,38 @@ """Test the peptide transformers.""" + +import pytest import torch from depthcharge.tokenizers import PeptideTokenizer from depthcharge.transformers import ( - PeptideTransformerDecoder, - PeptideTransformerEncoder, + AnalyteTransformerDecoder, + AnalyteTransformerEncoder, SpectrumTransformerEncoder, ) -def test_peptide_encoder(): +@pytest.mark.filterwarnings("error") +@pytest.mark.parametrize( + "model", [AnalyteTransformerEncoder, AnalyteTransformerDecoder] +) +def test_init(model): + """Test that initializtion warns and errors as we expect it to.""" + with pytest.raises(ValueError): + model(1) + + tokenizer = PeptideTokenizer() + with pytest.warns(UserWarning): + model(tokenizer, padding_int=5) + + model(tokenizer) + + +def test_analyte_encoder(): """Test that a peptide encoder will run.""" tokenizer = PeptideTokenizer() peptides = tokenizer.tokenize(["LESLIEK", "PEPTIDER", "EDITHYKK"]) - charges = torch.tensor([2, 3, 3]) - - model = PeptideTransformerEncoder(tokenizer, 8, 1, 12, max_charge=3) - emb, mask = model(peptides, charges) + model = AnalyteTransformerEncoder(tokenizer, 8, 2, 12) + emb, mask = model(peptides) # Axis 1 should be 1 longer than the longest peptide. assert emb.shape == (3, 9, 8) @@ -26,7 +42,7 @@ def test_peptide_encoder(): assert (res[1, :] != res[2, :]).all() -def test_peptide_decoder(): +def test_analyte_decoder(): """Test that a peptide decoder will run.""" tokenizer = PeptideTokenizer() n_tokens = len(tokenizer) @@ -39,11 +55,12 @@ def test_peptide_decoder(): ) peptides = tokenizer.tokenize(["LESLIEK", "PEPTIDER"]) - precursors = torch.tensor([[100.0, 2], [200.0, 3]]) - - encoder = SpectrumTransformerEncoder(8, 1, 12) + encoder = SpectrumTransformerEncoder(8, 2, 12) memory, mem_mask = encoder(spectra[:, :, 0], spectra[:, :, 1]) - decoder = PeptideTransformerDecoder(n_tokens, 8, 1, 12, max_charge=3) - scores = decoder(peptides, precursors, memory, mem_mask) + decoder = AnalyteTransformerDecoder(n_tokens, 8, 2, 12, padding_int=0) + scores = decoder(peptides, memory=memory, memory_key_padding_mask=mem_mask) + assert scores.shape == (2, 9, len(tokenizer)) + + scores = decoder(peptides, memory=memory) assert scores.shape == (2, 9, len(tokenizer)) diff --git a/tests/unit_tests/test_transformers/test_spectrum_transformers.py b/tests/unit_tests/test_transformers/test_spectrum_transformers.py index d3fdc01..88bf43c 100644 --- a/tests/unit_tests/test_transformers/test_spectrum_transformers.py +++ b/tests/unit_tests/test_transformers/test_spectrum_transformers.py @@ -1,4 +1,5 @@ """Test the spectrum transformers.""" + import pytest import torch @@ -27,38 +28,38 @@ def batch(): def test_spectrum_encoder(batch): """Test that a spectrum encoder will run.""" - model = SpectrumTransformerEncoder(8, 1, 12) + model = SpectrumTransformerEncoder(8, 2, 12) emb, mask = model(**batch) assert emb.shape == (2, 4, 8) assert mask.sum() == 1 - model = SpectrumTransformerEncoder(8, 1, 12, peak_encoder=PeakEncoder(8)) + model = SpectrumTransformerEncoder(8, 2, 12, peak_encoder=PeakEncoder(8)) emb, mask = model(**batch) assert emb.shape == (2, 4, 8) assert mask.sum() == 1 - model = SpectrumTransformerEncoder(8, 1, 12, peak_encoder=False) + model = SpectrumTransformerEncoder(8, 2, 12, peak_encoder=False) emb, mask = model(**batch) assert emb.shape == (2, 4, 8) assert mask.sum() == 1 -def test_precursor_hook(batch): +def test_global_token_hook(batch): """Test that the hook works.""" class MyEncoder(SpectrumTransformerEncoder): """A silly class.""" - def precursor_hook(self, mz_array, intensity_array, **kwargs): + def global_token_hook(self, mz_array, intensity_array, **kwargs): """A silly hook.""" return kwargs["charge"].expand(self.d_model, -1).T - model1 = MyEncoder(8, 1, 12) + model1 = MyEncoder(8, 2, 12) emb1, mask1 = model1(**batch) assert emb1.shape == (2, 4, 8) assert mask1.sum() == 1 - model2 = SpectrumTransformerEncoder(8, 1, 12) + model2 = SpectrumTransformerEncoder(8, 2, 12) emb2, mask2 = model2(**batch) assert emb2.shape == (2, 4, 8) assert mask2.sum() == 1 diff --git a/tests/unit_tests/test_version.py b/tests/unit_tests/test_version.py index 7a7f932..b5579eb 100644 --- a/tests/unit_tests/test_version.py +++ b/tests/unit_tests/test_version.py @@ -1,4 +1,5 @@ """Test getting the version.""" + from importlib.metadata import PackageNotFoundError import depthcharge