Skip to content

Commit

Permalink
Merge pull request #18 from mzmine/validation_pipeline
Browse files Browse the repository at this point in the history
First implementation of SpectrumValidator and Modification
  • Loading branch information
omokshyna authored Nov 1, 2023
2 parents d0fe8e6 + 292c771 commit 4d30d73
Showing 1 changed file with 79 additions and 6 deletions.
85 changes: 79 additions & 6 deletions library_spectra_validation/validation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,84 @@
Add index handling - when a spectrum is processed, its id is added to the corresponding list
"""

class ValidationPipeline:
import logging
from typing import Iterable, List, Optional, Union
from matchms.filtering.SpectrumProcessor import SpectrumProcessor

def __init__(self, spectrum) -> None:
pass
logger = logging.getLogger("matchms")

def run(spectrum):
#here run all the checks and modifications
return modifications #{'quality':, 'metadata adduct': {'previous':..., 'updated':....}}
class Modification:
def __init__(self, metadata_field, before, after, logging_message, validated_by_user):
self.metadata_field = metadata_field
self.before = before
self.after = after
# self.original =
self.logging_message = logging_message
self.validated_by_user = validated_by_user


def find_modifications(spectrum_old, spectrum_new, logging_message: str):
"""Checks which modifications have been made in a filter step"""
modifications = []
metadata_fields_to_check = ["parent_mass", "precursor_mz", "adduct", "smiles",
"compound_name", "inchi", "inchikey", "charge", "ionmode"]
for metadata_field in metadata_fields_to_check:
if spectrum_old.get(metadata_field) != spectrum_new.get(metadata_field):
modifications.append(
Modification(metadata_field=metadata_field,
before=spectrum_old.get(metadata_field),
after=spectrum_new(metadata_field),
logging_message=logging_message,
validated_by_user=False))
return modifications


class SpectrumRepairer(SpectrumProcessor):
def __init__(self, predefined_pipeline: Optional[str] = 'default',
additional_filters: Iterable[Union[str, List[dict]]] = ()):
super().__init__(predefined_pipeline,
additional_filters)

def process_spectrum(self, spectrum,
processing_report=None):
raise AttributeError("process spectrum is not a valid method of SpectrumValidator")

def process_spectrum_store_modifications(self, spectrum) -> List[Modification]:
if not self.filters:
raise TypeError("No filters to process")
modifications = []
for filter_func in self.filters:
# todo capture logging
logging_message = ""
spectrum_out = filter_func(spectrum)
modifications += find_modifications(spectrum_old=spectrum,
spectrum_new=spectrum_out,
logging_message=logging_message)
if spectrum_out is None:
raise AttributeError("SpectrumRepairer is only expected to repair spectra, not set to None")
spectrum = spectrum_out
return modifications


class SpectrumValidator(SpectrumProcessor):
def __init__(self):
# todo add the fields each requirement checks.
fields_checked_by_filter = {filter_name: [fields_checked]}
super().__init__(predefined_pipeline=None,
additional_filters=list(fields_checked_by_filter.keys()))

def process_spectrum(self, spectrum,
processing_report=None):
raise AttributeError("process spectrum is not a valid method of SpectrumValidator")

def process_spectrum_store_failed_filters(self, spectrum) -> List[Modification]:
if not self.filters:
raise TypeError("No filters to process")
failed_requirements = []
for filter_func in self.filters:
# todo capture logging
logging_message = ""
spectrum_out = filter_func(spectrum)
if spectrum_out is None:
failed_requirements += logging_message
return failed_requirements

0 comments on commit 4d30d73

Please sign in to comment.