From 232ae41d937491b3d88a0134a6bf5f3595fa67e2 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 4 Aug 2023 16:00:55 +0200 Subject: [PATCH 1/4] Commit WIP code for experimental fragment branch. --- pyproject.toml | 2 +- quartical/apps/summary.py | 31 +++++++++++++++------------ quartical/data_handling/angles.py | 8 +++---- quartical/data_handling/chunking.py | 9 +++++--- quartical/data_handling/ms_handler.py | 29 +++++++++++++++++-------- quartical/data_handling/predict.py | 20 ++++++++++------- 6 files changed, 60 insertions(+), 39 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ceee48d2..df3ca683 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ columnar = "^1.4.1" "ruamel.yaml" = "^0.17.26" dask = {extras = ["diagnostics"], version = "^2023.1.0"} distributed = "^2023.1.0" -dask-ms = {extras = ["s3", "xarray", "zarr"], version = "^0.2.16"} +dask-ms = {git = "https://github.com/ratt-ru/dask-ms.git", branch = "multisource-experimental", extras = ["s3", "xarray", "zarr"]} codex-africanus = {extras = ["dask", "scipy", "astropy", "python-casacore"], version = "^0.3.4"} astro-tigger-lsm = "^1.7.2" loguru = "^0.7.0" diff --git a/quartical/apps/summary.py b/quartical/apps/summary.py index c6e5d915..1d0a8dd2 100644 --- a/quartical/apps/summary.py +++ b/quartical/apps/summary.py @@ -1,6 +1,9 @@ import argparse from pathlib import Path -from daskms import xds_from_storage_ms, xds_from_storage_table +from daskms.experimental.multisource import ( + xds_from_ms_fragment, + xds_from_table_fragment +) from daskms.fsspec_store import DaskMSStore import numpy as np import dask.array as da @@ -44,7 +47,7 @@ def configure_loguru(output_dir): def antenna_info(path): # NOTE: Assume one dataset for now. - ant_xds = xds_from_storage_table(path + "::ANTENNA")[0] + ant_xds = xds_from_table_fragment(path + "::ANTENNA")[0] antenna_names = ant_xds.NAME.values antenna_mounts = ant_xds.MOUNT.values @@ -64,7 +67,7 @@ def antenna_info(path): def data_desc_info(path): - dd_xds_list = xds_from_storage_table( # noqa + dd_xds_list = xds_from_table_fragment( # noqa path + "::DATA_DESCRIPTION", group_cols=["__row__"], chunks={"row": 1, "chan": -1} @@ -76,7 +79,7 @@ def data_desc_info(path): def feed_info(path): - feed_xds_list = xds_from_storage_table( + feed_xds_list = xds_from_table_fragment( path + "::FEED", group_cols=["SPECTRAL_WINDOW_ID"], chunks={"row": -1} @@ -106,7 +109,7 @@ def feed_info(path): def flag_cmd_info(path): - flag_cmd_xds = xds_from_storage_table(path + "::FLAG_CMD") # noqa + flag_cmd_xds = xds_from_table_fragment(path + "::FLAG_CMD") # noqa # Not printing any summary information for this subtable yet - not sure # what is relevant. @@ -114,7 +117,7 @@ def flag_cmd_info(path): def field_info(path): - field_xds = xds_from_storage_table(path + "::FIELD")[0] + field_xds = xds_from_table_fragment(path + "::FIELD")[0] ids = [i for i in field_xds.SOURCE_ID.values] names = [n for n in field_xds.NAME.values] @@ -141,7 +144,7 @@ def field_info(path): def history_info(path): - history_xds = xds_from_storage_table(path + "::HISTORY")[0] # noqa + history_xds = xds_from_table_fragment(path + "::HISTORY")[0] # noqa # Not printing any summary information for this subtable yet - not sure # what is relevant. @@ -149,7 +152,7 @@ def history_info(path): def observation_info(path): - observation_xds = xds_from_storage_table(path + "::OBSERVATION")[0] # noqa + observation_xds = xds_from_table_fragment(path + "::OBSERVATION")[0] # noqa # Not printing any summary information for this subtable yet - not sure # what is relevant. @@ -157,7 +160,7 @@ def observation_info(path): def polarization_info(path): - polarization_xds = xds_from_storage_table(path + "::POLARIZATION")[0] + polarization_xds = xds_from_table_fragment(path + "::POLARIZATION")[0] corr_types = polarization_xds.CORR_TYPE.values @@ -175,7 +178,7 @@ def polarization_info(path): def processor_info(path): - processor_xds = xds_from_storage_table(path + "::PROCESSOR")[0] # noqa + processor_xds = xds_from_table_fragment(path + "::PROCESSOR")[0] # noqa # Not printing any summary information for this subtable yet - not sure # what is relevant. @@ -183,7 +186,7 @@ def processor_info(path): def spw_info(path): - spw_xds_list = xds_from_storage_table( + spw_xds_list = xds_from_table_fragment( path + "::SPECTRAL_WINDOW", group_cols=["__row__"], chunks={"row": 1, "chan": -1} @@ -207,7 +210,7 @@ def spw_info(path): def state_info(path): - state_xds = xds_from_storage_table(path + "::STATE")[0] # noqa + state_xds = xds_from_table_fragment(path + "::STATE")[0] # noqa # Not printing any summary information for this subtable yet - not sure # what is relevant. @@ -226,7 +229,7 @@ def source_info(path): def pointing_info(path): - pointing_xds = xds_from_storage_table(path + "::POINTING")[0] # noqa + pointing_xds = xds_from_table_fragment(path + "::POINTING")[0] # noqa # Not printing any summary information for this subtable yet - not sure # what is relevant. @@ -355,7 +358,7 @@ def summary(): # Open the data, grouping by the usual columns. Use these datasets to # produce some useful summaries. - data_xds_list = xds_from_storage_ms( + data_xds_list = xds_from_ms_fragment( path, index_cols=("TIME",), columns=("TIME", "FLAG", "FLAG_ROW", "DATA"), diff --git a/quartical/data_handling/angles.py b/quartical/data_handling/angles.py index 343ab466..f7e6098e 100644 --- a/quartical/data_handling/angles.py +++ b/quartical/data_handling/angles.py @@ -2,7 +2,7 @@ import casacore.measures import casacore.quanta as pq -from daskms import xds_from_storage_table +from daskms.experimental.multisource import xds_from_table_fragment import dask.array as da import threading from dask.graph_manipulation import clone @@ -24,9 +24,9 @@ def make_parangle_xds_list(ms_path, data_xds_list): # This may need to be more sophisticated. TODO: Can we guarantee that # these only ever have one element? - anttab = xds_from_storage_table(ms_path + "::ANTENNA")[0] - feedtab = xds_from_storage_table(ms_path + "::FEED")[0] - fieldtab = xds_from_storage_table(ms_path + "::FIELD")[0] + anttab = xds_from_table_fragment(ms_path + "::ANTENNA")[0] + feedtab = xds_from_table_fragment(ms_path + "::FEED")[0] + fieldtab = xds_from_table_fragment(ms_path + "::FIELD")[0] # We do this eagerly to make life easier. feeds = feedtab.POLARIZATION_TYPE.values diff --git a/quartical/data_handling/chunking.py b/quartical/data_handling/chunking.py index ce6fe353..4003e737 100644 --- a/quartical/data_handling/chunking.py +++ b/quartical/data_handling/chunking.py @@ -1,7 +1,10 @@ import dask.delayed as dd import numpy as np import dask.array as da -from daskms import xds_from_storage_ms, xds_from_storage_table +from daskms.experimental.multisource import ( + xds_from_ms_fragment, + xds_from_table_fragment +) def compute_chunking(ms_opts, compute=True): @@ -10,7 +13,7 @@ def compute_chunking(ms_opts, compute=True): # necessary to determine initial chunking over row and chan. TODO: Test # multi-SPW/field cases. Implement a memory budget. - indexing_xds_list = xds_from_storage_ms( + indexing_xds_list = xds_from_ms_fragment( ms_opts.path, columns=("TIME", "INTERVAL"), index_cols=("TIME",), @@ -24,7 +27,7 @@ def compute_chunking(ms_opts, compute=True): compute=False ) - spw_xds_list = xds_from_storage_table( + spw_xds_list = xds_from_table_fragment( ms_opts.path + "::SPECTRAL_WINDOW", group_cols=["__row__"], columns=["CHAN_FREQ", "CHAN_WIDTH"], diff --git a/quartical/data_handling/ms_handler.py b/quartical/data_handling/ms_handler.py index 3b923432..edafa2fa 100644 --- a/quartical/data_handling/ms_handler.py +++ b/quartical/data_handling/ms_handler.py @@ -2,9 +2,12 @@ import warnings import dask.array as da import numpy as np -from daskms import (xds_from_storage_ms, - xds_from_storage_table, - xds_to_storage_table) +from daskms import xds_to_storage_table +from daskms.experimental.multisource import ( + xds_from_ms_fragment, + xds_from_table_fragment, + xds_to_table_fragment +) from dask.graph_manipulation import clone from loguru import logger from quartical.weights.weights import initialize_weights @@ -28,7 +31,7 @@ def read_xds_list(model_columns, ms_opts): data_xds_list: A list of appropriately chunked xarray datasets. """ - antenna_xds = xds_from_storage_table(ms_opts.path + "::ANTENNA")[0] + antenna_xds = xds_from_table_fragment(ms_opts.path + "::ANTENNA")[0] n_ant = antenna_xds.dims["row"] @@ -36,7 +39,7 @@ def read_xds_list(model_columns, ms_opts): "observation.", n_ant) # Determine the number/type of correlations present in the measurement set. - pol_xds = xds_from_storage_table(ms_opts.path + "::POLARIZATION")[0] + pol_xds = xds_from_table_fragment(ms_opts.path + "::POLARIZATION")[0] try: corr_types = [CORR_TYPES[ct] for ct in pol_xds.CORR_TYPE.values[0]] @@ -56,7 +59,7 @@ def read_xds_list(model_columns, ms_opts): # probably need to be done on a per xds basis. Can probably be accomplished # by merging the field xds grouped by DDID into data grouped by DDID. - field_xds = xds_from_storage_table(ms_opts.path + "::FIELD")[0] + field_xds = xds_from_table_fragment(ms_opts.path + "::FIELD")[0] phase_dir = np.squeeze(field_xds.PHASE_DIR.values) field_names = field_xds.NAME.values @@ -90,7 +93,7 @@ def read_xds_list(model_columns, ms_opts): schema[ms_opts.weight_column] = {'dims': ('chan', 'corr')} try: - data_xds_list = xds_from_storage_ms( + data_xds_list = xds_from_ms_fragment( ms_opts.path, columns=columns, index_cols=("TIME",), @@ -103,7 +106,7 @@ def read_xds_list(model_columns, ms_opts): f"Invalid/missing column specified. Underlying error: {e}." ) from e - spw_xds_list = xds_from_storage_table( + spw_xds_list = xds_from_table_fragment( ms_opts.path + "::SPECTRAL_WINDOW", group_cols=["__row__"], columns=["CHAN_FREQ", "CHAN_WIDTH"], @@ -213,7 +216,7 @@ def write_xds_list(xds_list, ref_xds_list, ms_path, output_opts): if not (output_opts.products or output_opts.flags): return [None] * len(xds_list) # Write nothing to the MS. - pol_xds = xds_from_storage_table(ms_path + "::POLARIZATION")[0] + pol_xds = xds_from_table_fragment(ms_path + "::POLARIZATION")[0] corr_types = [CORR_TYPES[ct] for ct in pol_xds.CORR_TYPE.values[0]] ms_n_corr = len(corr_types) @@ -302,6 +305,14 @@ def write_xds_list(xds_list, ref_xds_list, ms_path, output_opts): rechunk=True # Needed to ensure zarr chunks map correctly to disk. ) + # write_xds_list = xds_to_table_fragment( + # xds_list, + # "delta1.ms", + # ms_path, + # columns=output_cols, + # rechunk=True # Needed to ensure zarr chunks map correctly to disk. + # ) + return write_xds_list diff --git a/quartical/data_handling/predict.py b/quartical/data_handling/predict.py index 6bac1ef4..cc8bda04 100644 --- a/quartical/data_handling/predict.py +++ b/quartical/data_handling/predict.py @@ -7,7 +7,8 @@ import dask from xarray import DataArray, Dataset from dask.graph_manipulation import clone -from daskms import xds_from_storage_table +from daskms.experimental.multisource import xds_from_table_fragment + from loguru import logger import numpy as np import Tigger @@ -310,21 +311,24 @@ def get_support_tables(ms_path): "SPECTRAL_WINDOW", "POLARIZATION", "FEED")} # All rows at once - lazy_tables = {"ANTENNA": xds_from_storage_table(n["ANTENNA"]), - "FEED": xds_from_storage_table(n["FEED"])} + lazy_tables = {"ANTENNA": xds_from_table_fragment(n["ANTENNA"]), + "FEED": xds_from_table_fragment(n["FEED"])} compute_tables = { # NOTE: Even though this has a fixed shape, I have ammended it to # also group by row. This just makes life fractionally easier. - "DATA_DESCRIPTION": xds_from_storage_table(n["DATA_DESCRIPTION"], - group_cols="__row__"), + "DATA_DESCRIPTION": xds_from_table_fragment( + n["DATA_DESCRIPTION"], group_cols="__row__" + ), # Variably shaped, need a dataset per row "FIELD": - xds_from_storage_table(n["FIELD"], group_cols="__row__"), + xds_from_table_fragment(n["FIELD"], group_cols="__row__"), "SPECTRAL_WINDOW": - xds_from_storage_table(n["SPECTRAL_WINDOW"], group_cols="__row__"), + xds_from_table_fragment( + n["SPECTRAL_WINDOW"], group_cols="__row__" + ), "POLARIZATION": - xds_from_storage_table(n["POLARIZATION"], group_cols="__row__"), + xds_from_table_fragment(n["POLARIZATION"], group_cols="__row__"), } lazy_tables.update(dask.compute(compute_tables)[0]) From ec2f4967ea8e3ed5d0814b0e9b2db075d9dffe50 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 4 Aug 2023 16:24:14 +0200 Subject: [PATCH 2/4] Import from fragments. --- quartical/apps/summary.py | 2 +- quartical/data_handling/angles.py | 2 +- quartical/data_handling/chunking.py | 2 +- quartical/data_handling/ms_handler.py | 3 ++- quartical/data_handling/predict.py | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/quartical/apps/summary.py b/quartical/apps/summary.py index 1d0a8dd2..0d3dab90 100644 --- a/quartical/apps/summary.py +++ b/quartical/apps/summary.py @@ -1,6 +1,6 @@ import argparse from pathlib import Path -from daskms.experimental.multisource import ( +from daskms.experimental.fragments import ( xds_from_ms_fragment, xds_from_table_fragment ) diff --git a/quartical/data_handling/angles.py b/quartical/data_handling/angles.py index f7e6098e..7911a24f 100644 --- a/quartical/data_handling/angles.py +++ b/quartical/data_handling/angles.py @@ -2,7 +2,7 @@ import casacore.measures import casacore.quanta as pq -from daskms.experimental.multisource import xds_from_table_fragment +from daskms.experimental.fragments import xds_from_table_fragment import dask.array as da import threading from dask.graph_manipulation import clone diff --git a/quartical/data_handling/chunking.py b/quartical/data_handling/chunking.py index 4003e737..867a28a7 100644 --- a/quartical/data_handling/chunking.py +++ b/quartical/data_handling/chunking.py @@ -1,7 +1,7 @@ import dask.delayed as dd import numpy as np import dask.array as da -from daskms.experimental.multisource import ( +from daskms.experimental.fragments import ( xds_from_ms_fragment, xds_from_table_fragment ) diff --git a/quartical/data_handling/ms_handler.py b/quartical/data_handling/ms_handler.py index edafa2fa..e1682cfd 100644 --- a/quartical/data_handling/ms_handler.py +++ b/quartical/data_handling/ms_handler.py @@ -3,7 +3,7 @@ import dask.array as da import numpy as np from daskms import xds_to_storage_table -from daskms.experimental.multisource import ( +from daskms.experimental.fragments import ( xds_from_ms_fragment, xds_from_table_fragment, xds_to_table_fragment @@ -305,6 +305,7 @@ def write_xds_list(xds_list, ref_xds_list, ms_path, output_opts): rechunk=True # Needed to ensure zarr chunks map correctly to disk. ) + # TODO: This needs to be controlled in a sensible way. # write_xds_list = xds_to_table_fragment( # xds_list, # "delta1.ms", diff --git a/quartical/data_handling/predict.py b/quartical/data_handling/predict.py index cc8bda04..e3015e5e 100644 --- a/quartical/data_handling/predict.py +++ b/quartical/data_handling/predict.py @@ -7,7 +7,7 @@ import dask from xarray import DataArray, Dataset from dask.graph_manipulation import clone -from daskms.experimental.multisource import xds_from_table_fragment +from daskms.experimental.fragments import xds_from_table_fragment from loguru import logger import numpy as np From 1bb7448996ec60f273032df48876b4d54d0b2ee4 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 10 Aug 2023 14:55:40 +0200 Subject: [PATCH 3/4] Add output.fragment_path, which, if set, causes QC to write columns to a dask-ms fragment. --- quartical/config/argument_schema.yaml | 9 ++++++++ quartical/data_handling/ms_handler.py | 30 ++++++++++++++------------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/quartical/config/argument_schema.yaml b/quartical/config/argument_schema.yaml index ac7d768c..7fa416a0 100644 --- a/quartical/config/argument_schema.yaml +++ b/quartical/config/argument_schema.yaml @@ -193,6 +193,15 @@ output: Name of directory in which QuartiCal logging outputs will be stored. s3 is not currently supported for these outputs. + fragment_path: + dtype: Optional[str] + info: + If set, instead of mutating the input by e.g. writing flags, instead + writes a fragment to this location. A fragment is a zarr backed data + format that is read and dynamically combined with any parent datasets. + This allows QuartiCal to operate in an entirely read-only fashion. + This option is experimental. + log_to_terminal: default: true dtype: bool diff --git a/quartical/data_handling/ms_handler.py b/quartical/data_handling/ms_handler.py index e1682cfd..4768d62e 100644 --- a/quartical/data_handling/ms_handler.py +++ b/quartical/data_handling/ms_handler.py @@ -298,21 +298,23 @@ def write_xds_list(xds_list, ref_xds_list, ms_path, output_opts): with warnings.catch_warnings(): # We anticipate spurious warnings. warnings.simplefilter("ignore") - write_xds_list = xds_to_storage_table( - xds_list, - ms_path, - columns=output_cols, - rechunk=True # Needed to ensure zarr chunks map correctly to disk. - ) - # TODO: This needs to be controlled in a sensible way. - # write_xds_list = xds_to_table_fragment( - # xds_list, - # "delta1.ms", - # ms_path, - # columns=output_cols, - # rechunk=True # Needed to ensure zarr chunks map correctly to disk. - # ) + if output_opts.fragment_path: + write_xds_list = xds_to_table_fragment( + xds_list, + output_opts.fragment_path, + ms_path, + columns=output_cols, + rechunk=True # Ensure zarr chunks map correctly to disk. + ) + + else: + write_xds_list = xds_to_storage_table( + xds_list, + ms_path, + columns=output_cols, + rechunk=True # Ensure zarr chunks map correctly to disk. + ) return write_xds_list From d8a971969c0774705493ef69141c082b99c4d89b Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 21 Sep 2023 10:33:12 +0200 Subject: [PATCH 4/4] Depend on dask-ms master. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index cba975d1..af3e5f49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ columnar = "^1.4.1" "ruamel.yaml" = "^0.17.26" dask = {extras = ["diagnostics"], version = "^2023.1.0"} distributed = "^2023.1.0" -dask-ms = {git = "https://github.com/ratt-ru/dask-ms.git", branch = "multisource-experimental", extras = ["s3", "xarray", "zarr"]} +dask-ms = {git = "https://github.com/ratt-ru/dask-ms.git", extras = ["s3", "xarray", "zarr"]} codex-africanus = {extras = ["dask", "scipy", "astropy", "python-casacore"], version = "^0.3.4"} astro-tigger-lsm = "^1.7.2" loguru = "^0.7.0"