From 583bf3e632a62f69a9ca7c23bddfcd6725d51198 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Sun, 24 Nov 2024 04:00:31 +0100 Subject: [PATCH] finish first-pass at signal analysis program and integration with pipeline, #337 --- bin/cli/run_processing_pipeline.py | 21 +++- bin/cli/run_signal_analysis.py | 159 +++++++++++++++++++++-------- 2 files changed, 138 insertions(+), 42 deletions(-) diff --git a/bin/cli/run_processing_pipeline.py b/bin/cli/run_processing_pipeline.py index 19f03d84..f891eb86 100644 --- a/bin/cli/run_processing_pipeline.py +++ b/bin/cli/run_processing_pipeline.py @@ -10,7 +10,7 @@ import sys from typing import * -from expression import Result +from expression import Option, Result from gertils import ExtantFile, ExtantFolder import pandas as pd import pypiper @@ -41,6 +41,7 @@ from zip_spot_image_files_for_tracing import workflow as run_spot_zipping from tracing import workflow as run_chromatin_tracing from locus_spot_visualisation_data_preparation import workflow as get_locus_spot_data_file_src_dst_pairs +from run_processing_pipeline import workflow as signal_analysis_workflow DECON_STAGE_NAME = "deconvolution" @@ -434,10 +435,19 @@ def filter_spots_for_nuclei(rounds_config: ExtantFile, params_config: ExtantFile class LooptracePipeline(pypiper.Pipeline): """Main looptrace processing pipeline""" - def __init__(self, rounds_config: ExtantFile, params_config: ExtantFile, images_folder: ExtantFolder, output_folder: ExtantFolder, **pl_mgr_kwargs: Any) -> None: + def __init__( + self, + rounds_config: ExtantFile, + params_config: ExtantFile, + images_folder: ExtantFolder, + output_folder: ExtantFolder, + signal_config: Option[ExtantFile], + **pl_mgr_kwargs: Any, + ) -> None: self.rounds_config = rounds_config self.params_config = params_config self.images_folder = images_folder + self.signal_config = signal_config super(LooptracePipeline, self).__init__(name=PIPE_NAME, outfolder=str(output_folder.path), **pl_mgr_kwargs) def stages(self) -> list[pypiper.Stage]: @@ -544,6 +554,11 @@ def stages(self) -> list[pypiper.Stage]: func=run_locus_spot_viewing_prep, f_kwargs=rounds_params, ), + pypiper.Stage( + name="cross_channel_signal_analysis", + func=signal_analysis_workflow, + f_kwargs={**rounds_params, "maybe_signal_config": self.signal_config}, + ), ] @@ -553,6 +568,7 @@ def parse_cli(args: Iterable[str]) -> argparse.Namespace: parser.add_argument("--params-config", type=ExtantFile.from_string, required=True, help="Path to the parameters configuration file") parser.add_argument("-I", "--images-folder", type=ExtantFolder.from_string, required=True, help="Path to the root folder with imaging data to process") parser.add_argument("--pypiper-folder", type=ExtantFolder.from_string, required=True, help="Path to folder for pypiper output") + parser.add_argument("--signal-config", type=ExtantFile.from_string, help="Path to signal analysis config file, if using that feature") parser.add_argument(NO_TEE_LOGS_OPTNAME, action="store_true", help="Do not tee logging output from pypiper manager") parser = pypiper.add_pypiper_args( parser, @@ -566,6 +582,7 @@ def init(opts: argparse.Namespace) -> LooptracePipeline: kwargs = { "rounds_config": opts.rounds_config, "params_config": opts.params_config, + "signal_config": Option.of_obj(opts.signal_config), "images_folder": opts.images_folder, "output_folder": opts.pypiper_folder, } diff --git a/bin/cli/run_signal_analysis.py b/bin/cli/run_signal_analysis.py index 4d18a2b6..a59c39db 100644 --- a/bin/cli/run_signal_analysis.py +++ b/bin/cli/run_signal_analysis.py @@ -1,32 +1,60 @@ """Analyze pixel value statistics in regions of interest, across channels.""" import argparse -from collections import Counter +from collections import Counter, defaultdict from dataclasses import dataclass from enum import Enum +import json import logging +from operator import itemgetter +from pathlib import Path import sys -from typing import Iterable, Mapping, Optional, TypeAlias, TypeVar +from typing import TYPE_CHECKING, Iterable, Mapping, TypeAlias, TypeVar from expression import Option, Result, compose, snd from expression.collections import Seq, seq -from expression.core import result +from expression.core import option, result from gertils import ExtantFile, compute_pixel_statistics +from gertils.geometry import ImagePoint3D +from gertils.pixel_value_statistics import Numeric as PixelStatValue from gertils.types import ImagingChannel +if TYPE_CHECKING: + import numpy.typing as npt import pandas as pd +from spotfishing.roi_tools import get_centroid_from_record +from looptrace import FIELD_OF_VIEW_COLUMN from looptrace.ImageHandler import ImageHandler +from looptrace.SpotPicker import SpotPicker from looptrace.utilities import find_first_option, get_either, wrap_exception, wrap_error_message _A = TypeVar("_") +SIGNAL_CHANNEL_COLUMN = "signalChannel" + def _parse_cmdl(cmdl: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Analyze pixel value statistics in regions of interest, across channels.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument() + parser.add_argument( + "--rounds-config", + required=True, + type=ExtantFile.from_string, + help="Path to the imaging rounds configuration file", + ) + parser.add_argument( + "--params-config", + required=True, + type=ExtantFile.from_string, + help="Path to the looptrace parameters configuration file", + ) + parser.add_argument( + "--signal-config", + type=ExtantFile.from_string, + help="Path to the configuration file declaring ROI diameters and channels to analyse", + ) return parser.parse_args(cmdl) @@ -35,7 +63,7 @@ class RoiType(Enum): Regional = "spots_for_voxels_definition_file" @property - def image_handler_attribute(self) -> str: + def file_attribute_on_image_handler(self) -> str: return self.value @classmethod @@ -93,6 +121,79 @@ def from_mapping(cls, data: Mapping[str, object]) -> Result["AnalyticalSpecifica ))) +# TODO: refactor with https://github.com/gerlichlab/gertils/issues/32 +def run_cross_channel_signal_analysis( + *, + img: npt.ArrayLike, + roi_diameter: int, + channels: Iterable[ImagingChannel], + points: Iterable[ImagePoint3D], +) -> Iterable[dict[str, PixelStatValue]]: + for pt in points: + yield compute_pixel_statistics( + img=img, + pt=pt, + channels=channels, + diameter=roi_diameter, + channel_column=SIGNAL_CHANNEL_COLUMN, + ) + + +def workflow(*, rounds_config: ExtantFile, params_config: ExtantFile, maybe_signal_config: Option[ExtantFile]) -> None: + match maybe_signal_config: + case option.Option(tag="none", none=_): + logging.info("No signal config file, skipping cross-channel signal analysis") + return + case option.Option(tag="some", some=signal_config_file): + conf_path: Path = signal_config_file.path + logging.info("Parsing signal analysis configuration: %s", conf_path) + with conf_path.open(mode="r") as fh: + conf_data = json.load(fh) + if not isinstance(conf_data, list): + raise TypeError(f"Parsed signal config data (from {conf_path}) is {type(conf_data).__name__}") + try: + analysis_specs: list[AnalyticalSpecification] = list(map(AnalyticalSpecification.from_mapping, conf_data)) + except (TypeError, ValueError) as e: + logging.error("Failed to parse analytical specifications (from %s): %s", conf_path, e) + raise + + H = ImageHandler(rounds_config=rounds_config, params_config=params_config) + S = SpotPicker(H) + + for spec in analysis_specs: + # Get the ROIs of this type. + roi_type: RoiType = spec.roi_type + logging.info("Analyzing signal for ROI type '%s'", roi_type.name) + rois_file: Path = getattr(H, roi_type.file_attribute_on_image_handler) + all_rois: pd.DataFrame = pd.read_csv(rois_file, index_col=False) + + # Build up the records for this ROI type, for all FOVs. + by_raw_channel: Mapping[int, list[dict]] = defaultdict + for fov, img in S.iter_fov_img_pairs(): + logging.info(f"Analysing signal for FOV: {fov}") + rois: pd.DataFrame = all_rois[all_rois[FIELD_OF_VIEW_COLUMN] == fov] + logging.debug("ROI count: %d", rois.shape[0]) + for _, r in rois.iterrows(): + pt: ImagePoint3D = get_centroid_from_record(r) + for stats in compute_pixel_statistics( + img=img, + pt=pt, + channels=spec.channels, + diameter=spec.roi_diameter, + channel_column=SIGNAL_CHANNEL_COLUMN, + ): + ch: int = stats[SIGNAL_CHANNEL_COLUMN] + by_raw_channel[ch].append({**r.to_dict(), **stats}) + + # Write the output file for this ROI type, across all FOVs. + for raw_channel, records in sorted(by_raw_channel.items(), key=itemgetter(0)): + stats_frame: pd.DataFrame = pd.DataFrame(records) + fn = f"signal_analysis__rois_{roi_type.name}__channel_{raw_channel}.csv" + outfile: Path = Path(H.analysis_path) / fn + logging.info("Writing output file: %s", outfile) + stats_frame.to_csv(outfile, index=False) + + def _ensure_unique(items: Iterable[_A]) -> Result[set[_A], str]: try: counts = Counter(items) @@ -102,14 +203,6 @@ def _ensure_unique(items: Iterable[_A]) -> Result[set[_A], str]: return Result.Ok(set(counts.keys())) if not repeats else Result.Error(f"{len(repeats)} repeated item(s); counts: {repeats}") -def _parse_roi_diameter(obj: object) -> Result[int, str]: - match obj: - case int(): - return Result.Ok(obj) - case _: - return Result.Error(f"ROI diameter must be integer, not {type(obj).__name__} ({obj})") - - def _parse_imaging_channels(channels: object) -> Result[list[ImagingChannel], Seq[str]]: State: TypeAlias = Result[Seq[ImagingChannel], Seq[str]] @@ -130,6 +223,14 @@ def proc1(acc: State, obj: object) -> State: .map(lambda seq: seq.to_list()) +def _parse_roi_diameter(obj: object) -> Result[int, str]: + match obj: + case int(): + return Result.Ok(obj) + case _: + return Result.Error(f"ROI diameter must be integer, not {type(obj).__name__} ({obj})") + + @wrap_error_message("Getting list from object") @wrap_exception(TypeError) def _list_from_object(obj: object) -> Result[list[object], str]: @@ -142,35 +243,13 @@ def _unsafe_parse_imaging_channel(obj: object) -> Result[ImagingChannel, str]: return ImagingChannel(obj) -def run_cross_channel_signal_analysis( - rounds_config: ExtantFile, - params_config: ExtantFile, - signal_config: Optional[ExtantFile], - ) -> None: - if signal_config is None: - logging.info("No signal config file, skipping cross-channel signal analysis") - return - H = ImageHandler(rounds_config=rounds_config, params_config=params_config) - logging.info("Reading ROIs file: %s", str(H.region)) - rois = pd.read_csv(H.spots_for_voxels_definition_file, index_col=False) - # TODO: apply drift correction (at least coarse for now) - compute_pixel_statistics( - # TODO: image - # TODO: point - channels=H.iter_signal_analysis_channels(), - diameter=H.signal_analysis_roi_diameter, - channel_column="signalChannel", - ) - # TODO: write to output files - logging.info("Done with cross-channel signal analysis") - - -def workflow() -> None: - pass - - def main(cmdl: list[str]) -> None: opts: argparse.Namespace = _parse_cmdl(cmdl) + workflow( + rounds_config=opts.rounds_config, + params_config=opts.params_config, + maybe_signal_config=Option.of_obj(opts.signal_config), + ) if __name__ == "__main__":