From 346ed33234b346549fe9ae480a1c34f3ab3902ae Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Fri, 10 Nov 2023 14:59:38 +0000 Subject: [PATCH 1/6] up --- climetlab/loaders/__init__.py | 21 +++++++++++++++++++++ climetlab/scripts/create.py | 17 ++++++++++++++--- climetlab/sources/era5_accumulations.py | 6 +++++- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/climetlab/loaders/__init__.py b/climetlab/loaders/__init__.py index a401fec1..9cfd7d58 100644 --- a/climetlab/loaders/__init__.py +++ b/climetlab/loaders/__init__.py @@ -18,12 +18,29 @@ from functools import cached_property import numpy as np +import tqdm from climetlab.core.order import build_remapping # noqa:F401 from climetlab.utils import progress_bar from climetlab.utils.config import LoadersConfig from climetlab.utils.humanize import bytes, seconds + +def compute_directory_size(path): + if not os.path.isdir(path): + return None + size = 0 + n = 0 + for dirpath, _, filenames in tqdm.tqdm( + os.walk(path), desc="Computing size", leave=False + ): + for filename in filenames: + file_path = os.path.join(dirpath, filename) + size += os.path.getsize(file_path) + n += 1 + return size, n + + LOG = logging.getLogger(__name__) VERSION = "0.13" @@ -948,6 +965,10 @@ def print_info(self): except Exception as e: print(e) + def add_total_size(self, **kwargs): + size, n = compute_directory_size(self.path) + self.update_metadata(total_size=size, total_number_of_files=n) + def add_statistics(self, no_write, **kwargs): do_write = not no_write diff --git a/climetlab/scripts/create.py b/climetlab/scripts/create.py index 293326e9..c01ec805 100644 --- a/climetlab/scripts/create.py +++ b/climetlab/scripts/create.py @@ -49,6 +49,10 @@ class LoadersCmd: "--statistics", dict(action="store_true", help="Compute statistics."), ), + total_size=( + "--total-size", + dict(action="store_true", help="Compute total size."), + ), config=( "--config", dict( @@ -165,16 +169,16 @@ def callback(*msg): kwargs["print"] = callback loader_class = LOADERS[format] - lst = [args.load, args.statistics, args.init] + lst = [args.load, args.statistics, args.init, args.total_size] if sum(1 for x in lst if x) > 1: raise ValueError( "Too many options provided." - 'Must choose exactly one option in "--load", "--statistics", "--init"' + 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' ) if sum(1 for x in lst if x) < 1: raise ValueError( "Not enough options provided." - 'Must choose exactly one option in "--load", "--statistics", "--init"' + 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' ) if args.parts: assert args.load, "Use --parts only with --load" @@ -220,3 +224,10 @@ def dummy_context(): ), "--statistics requires only --target, no --config." loader = loader_class.from_zarr(**kwargs) loader.add_statistics(**kwargs) + + if args.total_size: + assert ( + args.config is None + ), "--total-size requires only --target, no --config." + loader = loader_class.from_zarr(**kwargs) + loader.add_total_size(**kwargs) diff --git a/climetlab/sources/era5_accumulations.py b/climetlab/sources/era5_accumulations.py index 05e086eb..ae851cc1 100644 --- a/climetlab/sources/era5_accumulations.py +++ b/climetlab/sources/era5_accumulations.py @@ -112,10 +112,14 @@ def __init__(self, *args, **kwargs): era_request = dict(**request) + type_ = request.get("type", "an") + if type_ == "an": + type_ = "fc" + era_request.update( { "class": "ea", - "type": "fc", + "type": type_, "levtype": "sfc", "date": [d.strftime("%Y-%m-%d") for d in dates], "time": sorted(times), From f97a880f1b37e185cc0d3dffe88850f340eecf60 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Fri, 10 Nov 2023 20:56:05 +0000 Subject: [PATCH 2/6] clean. remove dead code --- climetlab/loaders/__init__.py | 104 ++++----------- climetlab/scripts/create.py | 233 ---------------------------------- 2 files changed, 25 insertions(+), 312 deletions(-) delete mode 100644 climetlab/scripts/create.py diff --git a/climetlab/loaders/__init__.py b/climetlab/loaders/__init__.py index 9cfd7d58..ce450e1a 100644 --- a/climetlab/loaders/__init__.py +++ b/climetlab/loaders/__init__.py @@ -23,7 +23,7 @@ from climetlab.core.order import build_remapping # noqa:F401 from climetlab.utils import progress_bar from climetlab.utils.config import LoadersConfig -from climetlab.utils.humanize import bytes, seconds +from climetlab.utils.humanize import seconds def compute_directory_size(path): @@ -766,7 +766,6 @@ def _variables_names(self): def initialise(self): """Create empty zarr from self.main_config and self.path""" - import pandas as pd import zarr self.print("config loaded ok:") @@ -853,30 +852,40 @@ def initialise(self): metadata["frequency"] = frequency metadata["start_date"] = dates[0].isoformat() metadata["end_date"] = dates[-1].isoformat() - pd_dates_kwargs = dict( - start=metadata["start_date"], - end=metadata["end_date"], - freq=f"{metadata['frequency']}h", - unit="s", + + def rebuild_dates(start, end, frequency): + assert isinstance(frequency, int), frequency + start = np.datetime64(start) + end = np.datetime64(end) + delta = np.timedelta64(frequency, "h") + res = [] + while start <= end: + res.append(start) + start += delta + return np.array(res).astype("datetime64[s]") + + dates_ = rebuild_dates( + metadata["start_date"], + metadata["end_date"], + metadata["frequency"], ) - pd_dates = pd.date_range(**pd_dates_kwargs) - def check_dates(input_handler, pd_dates, total_shape): + def check_dates(input_handler, dates_, total_shape): for i, loop in enumerate(input_handler.loops): print(f"Loop {i}: ", loop._info) - if pd_dates.size != total_shape[0]: + if len(dates_) != total_shape[0]: raise ValueError( - f"Final date size {pd_dates.size} (from {pd_dates[0]} to {pd_dates[-1]}, " + f"Final date size {len(dates_)} (from {dates_[0]} to {dates_[-1]}, " f"{frequency=}) does not match data shape {total_shape[0]}. {total_shape=}" ) - if pd_dates.size != len(dates): + if len(dates_) != len(dates): raise ValueError( - f"Final date size {pd_dates.size} (from {pd_dates[0]} to {pd_dates[-1]}, " + f"Final date size {len(dates_)} (from {dates_[0]} to {dates_[-1]}, " f"{frequency=}) does not match data shape {len(dates)} (from {dates[0]} to " - f"{dates[-1]}). {pd_dates_kwargs}" + f"{dates[-1]})." ) - check_dates(self.input_handler, pd_dates, total_shape) + check_dates(self.input_handler, dates_, total_shape) metadata.update(self.main_config.get("force_metadata", {})) @@ -886,8 +895,7 @@ def check_dates(input_handler, pd_dates, total_shape): self.z.create_dataset("data", shape=total_shape, chunks=chunks, dtype=dtype) - np_dates = pd_dates.to_numpy() - self._add_dataset(name="dates", array=np_dates) + self._add_dataset(name="dates", array=dates_) self._add_dataset(name="latitudes", array=grid_points[0]) self._add_dataset(name="longitudes", array=grid_points[1]) @@ -1130,65 +1138,3 @@ def compute_statistics(self, ds, statistics_start, statistics_end): check_stats(**{k: v[i] for k, v in stats.items()}, msg=f"{i} {name}") return stats - - -class HDF5Loader: - def __init__(self, *args, **kwargs): - raise NotImplementedError() - - def append_array(self, *args, **kwargs): - raise NotImplementedError("Appending do HDF5 not yet implemented") - - def create_array( - self, - dataset, - shape, - chunks, - dtype, - metadata, - grid_points, - nloops, - ): - import h5py - - if not isinstance(chunks, tuple): - chunks = None - - print( - f"Creating HDD5 file '{self.path}', with {dataset=}, {shape=}, {chunks=} and {dtype=}" - ) - - self.h5 = h5py.File(self.path, mode="w") - array = self.h5.create_dataset( - dataset, - chunks=chunks, - maxshape=shape, - dtype=dtype, - data=np.empty( - shape - ) # Can we avoid that? Looks like its needed for chuncking - # data = h5py.Empty(dtype), - ) - return array - - def close(self): - self.h5.close() - del self.h5 - - def print_info(self): - import h5py - - def h5_tree(h5, depth=0): - for k, v in h5.items(): - if isinstance(v, h5py._hl.group.Group): - h5_tree(v, depth + 1) - else: - print(" " * (depth * 3), k, v) - for p, q in v.attrs.items(): - print(" " * (depth * 3 + 3), p, q) - - size = os.path.getsize(self.path) - print(f"HDF5 file {self.path}: {size:,} ({bytes(size)})") - with h5py.File(self.path, mode="r") as f: - print("Content:") - h5_tree(f, 1) diff --git a/climetlab/scripts/create.py b/climetlab/scripts/create.py deleted file mode 100644 index c01ec805..00000000 --- a/climetlab/scripts/create.py +++ /dev/null @@ -1,233 +0,0 @@ -# (C) Copyright 2023 ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. -# - -import os -import time -from contextlib import contextmanager - -from climetlab import settings -from climetlab.loaders import HDF5Loader, ZarrLoader -from climetlab.utils.humanize import list_to_human - -from .tools import parse_args - -LAST_CALLBACK = 0 -LAST_MESSAGE = "" - - -class LoadersCmd: - @parse_args( - # dataset=( - # "--dataset", - # dict( - # help="Name of the HDF5 dataset to use" - # " (default from config or 'dataset')" - # ), - # ), - target=( - "--target", - dict( - help="Where to store the final data. " - "Currently only a path to a new ZARR is supported." - ), - ), - init=( - "--init", - dict(action="store_true", help="Initialise zarr."), - ), - load=( - "--load", - dict(action="store_true", help="Load data into zarr."), - ), - statistics=( - "--statistics", - dict(action="store_true", help="Compute statistics."), - ), - total_size=( - "--total-size", - dict(action="store_true", help="Compute total size."), - ), - config=( - "--config", - dict( - help="Use with --init. A yaml file that describes which data to use as input" - " and how to organise them in the target." - ), - ), - parts=( - "--parts", - dict(nargs="+", help="Use with --load. Part(s) of the data to process."), - ), - no_write=( - "--no-write", - dict( - action="store_true", help="Only compute statistics, do not write them." - ), - ), - cache_dir=( - "--cache-dir", - dict( - help="Use with --load. Location of cache directory for temporary data." - ), - ), - format=( - "--format", - dict( - help="The format of the target storage into which to load the data" - " (default is inferred from target path extension)" - " only .zarr is currently supported." - ), - ), - no_check=( - "--no-check", - dict(action="store_true", help="Skip checks."), - ), - force=( - "--force", - dict(action="store_true", help="Overwrite if already exists."), - ), - timeout=( - "--timeout", - dict( - type=int, - default=0, - help="Stop with error (SIGALARM) after TIMEOUT seconds.", - ), - ), - ) - def do_create(self, args): - create(args) - - -def create(args): - format = args.format - - if args.timeout: - import signal - - signal.alarm(args.timeout) - - if format is None: - _, ext = os.path.splitext(args.target) - format = ext[1:] - assert format == "zarr", f"Unsupported format={format}" - - def no_callback(*args, **kwargs): - print(*args, **kwargs) - return - - if os.environ.get("CLIMETLAB_CREATE_SHELL_CALLBACK"): - - def callback(*msg): - global LAST_CALLBACK, LAST_MESSAGE - - msg = [str(_) for _ in msg] - msg = "\n".join(msg) - - if time.time() - LAST_CALLBACK < 10 and LAST_MESSAGE[:10] == msg[:10]: - return - - import shlex - import subprocess - import traceback - - cmd = os.environ.get("CLIMETLAB_CREATE_SHELL_CALLBACK") - cmd = cmd.format(msg) - try: - print(f"Running {cmd}") - args = shlex.split(cmd) # shlex honors the quotes - subprocess.Popen(args) - except Exception as e: - print(f"Exception when running {cmd}" + traceback.format_exc()) - print(e) - - LAST_CALLBACK = time.time() - LAST_MESSAGE = msg - - callback("Starting-zarr-loader.") - else: - callback = no_callback - - LOADERS = dict( - zarr=ZarrLoader, - h5=HDF5Loader, - hdf5=HDF5Loader, - hdf=HDF5Loader, - ) - if format not in LOADERS: - lst = list_to_human(list(LOADERS.keys()), "or") - raise ValueError(f"Invalid format '{format}', must be one of {lst}.") - - kwargs = vars(args) - kwargs["path"] = kwargs["target"] - kwargs["print"] = callback - loader_class = LOADERS[format] - - lst = [args.load, args.statistics, args.init, args.total_size] - if sum(1 for x in lst if x) > 1: - raise ValueError( - "Too many options provided." - 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' - ) - if sum(1 for x in lst if x) < 1: - raise ValueError( - "Not enough options provided." - 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' - ) - if args.parts: - assert args.load, "Use --parts only with --load" - if args.no_write: - assert args.statistics, "Use --no-write only with --statistics" - - @contextmanager - def dummy_context(): - yield - - context = dummy_context() - if kwargs["cache_dir"]: - context = settings.temporary("cache-directory", kwargs["cache_dir"]) - - with context: - if args.init: - assert args.config, "--init requires --config" - assert args.target, "--init requires --target" - - import zarr - - try: - zarr.open(args.target, "r") - if not args.force: - raise Exception( - f"{args.target} already exists. Use --force to overwrite." - ) - except zarr.errors.PathNotFoundError: - pass - - loader = loader_class.from_config(partial=True, **kwargs) - loader.initialise() - exit() - - if args.load: - assert args.config is None, "--load requires only a --target, no --config." - loader = loader_class.from_zarr(**kwargs) - loader.load(**kwargs) - - if args.statistics: - assert ( - args.config is None - ), "--statistics requires only --target, no --config." - loader = loader_class.from_zarr(**kwargs) - loader.add_statistics(**kwargs) - - if args.total_size: - assert ( - args.config is None - ), "--total-size requires only --target, no --config." - loader = loader_class.from_zarr(**kwargs) - loader.add_total_size(**kwargs) From 80a93156d2274bbd5ebf3cd54bb244707ac22c9c Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Mon, 13 Nov 2023 10:46:13 +0000 Subject: [PATCH 3/6] Revert "clean. remove dead code" This reverts commit f97a880f1b37e185cc0d3dffe88850f340eecf60. --- climetlab/loaders/__init__.py | 104 +++++++++++---- climetlab/scripts/create.py | 233 ++++++++++++++++++++++++++++++++++ 2 files changed, 312 insertions(+), 25 deletions(-) create mode 100644 climetlab/scripts/create.py diff --git a/climetlab/loaders/__init__.py b/climetlab/loaders/__init__.py index ce450e1a..9cfd7d58 100644 --- a/climetlab/loaders/__init__.py +++ b/climetlab/loaders/__init__.py @@ -23,7 +23,7 @@ from climetlab.core.order import build_remapping # noqa:F401 from climetlab.utils import progress_bar from climetlab.utils.config import LoadersConfig -from climetlab.utils.humanize import seconds +from climetlab.utils.humanize import bytes, seconds def compute_directory_size(path): @@ -766,6 +766,7 @@ def _variables_names(self): def initialise(self): """Create empty zarr from self.main_config and self.path""" + import pandas as pd import zarr self.print("config loaded ok:") @@ -852,40 +853,30 @@ def initialise(self): metadata["frequency"] = frequency metadata["start_date"] = dates[0].isoformat() metadata["end_date"] = dates[-1].isoformat() - - def rebuild_dates(start, end, frequency): - assert isinstance(frequency, int), frequency - start = np.datetime64(start) - end = np.datetime64(end) - delta = np.timedelta64(frequency, "h") - res = [] - while start <= end: - res.append(start) - start += delta - return np.array(res).astype("datetime64[s]") - - dates_ = rebuild_dates( - metadata["start_date"], - metadata["end_date"], - metadata["frequency"], + pd_dates_kwargs = dict( + start=metadata["start_date"], + end=metadata["end_date"], + freq=f"{metadata['frequency']}h", + unit="s", ) + pd_dates = pd.date_range(**pd_dates_kwargs) - def check_dates(input_handler, dates_, total_shape): + def check_dates(input_handler, pd_dates, total_shape): for i, loop in enumerate(input_handler.loops): print(f"Loop {i}: ", loop._info) - if len(dates_) != total_shape[0]: + if pd_dates.size != total_shape[0]: raise ValueError( - f"Final date size {len(dates_)} (from {dates_[0]} to {dates_[-1]}, " + f"Final date size {pd_dates.size} (from {pd_dates[0]} to {pd_dates[-1]}, " f"{frequency=}) does not match data shape {total_shape[0]}. {total_shape=}" ) - if len(dates_) != len(dates): + if pd_dates.size != len(dates): raise ValueError( - f"Final date size {len(dates_)} (from {dates_[0]} to {dates_[-1]}, " + f"Final date size {pd_dates.size} (from {pd_dates[0]} to {pd_dates[-1]}, " f"{frequency=}) does not match data shape {len(dates)} (from {dates[0]} to " - f"{dates[-1]})." + f"{dates[-1]}). {pd_dates_kwargs}" ) - check_dates(self.input_handler, dates_, total_shape) + check_dates(self.input_handler, pd_dates, total_shape) metadata.update(self.main_config.get("force_metadata", {})) @@ -895,7 +886,8 @@ def check_dates(input_handler, dates_, total_shape): self.z.create_dataset("data", shape=total_shape, chunks=chunks, dtype=dtype) - self._add_dataset(name="dates", array=dates_) + np_dates = pd_dates.to_numpy() + self._add_dataset(name="dates", array=np_dates) self._add_dataset(name="latitudes", array=grid_points[0]) self._add_dataset(name="longitudes", array=grid_points[1]) @@ -1138,3 +1130,65 @@ def compute_statistics(self, ds, statistics_start, statistics_end): check_stats(**{k: v[i] for k, v in stats.items()}, msg=f"{i} {name}") return stats + + +class HDF5Loader: + def __init__(self, *args, **kwargs): + raise NotImplementedError() + + def append_array(self, *args, **kwargs): + raise NotImplementedError("Appending do HDF5 not yet implemented") + + def create_array( + self, + dataset, + shape, + chunks, + dtype, + metadata, + grid_points, + nloops, + ): + import h5py + + if not isinstance(chunks, tuple): + chunks = None + + print( + f"Creating HDD5 file '{self.path}', with {dataset=}, {shape=}, {chunks=} and {dtype=}" + ) + + self.h5 = h5py.File(self.path, mode="w") + array = self.h5.create_dataset( + dataset, + chunks=chunks, + maxshape=shape, + dtype=dtype, + data=np.empty( + shape + ) # Can we avoid that? Looks like its needed for chuncking + # data = h5py.Empty(dtype), + ) + return array + + def close(self): + self.h5.close() + del self.h5 + + def print_info(self): + import h5py + + def h5_tree(h5, depth=0): + for k, v in h5.items(): + if isinstance(v, h5py._hl.group.Group): + h5_tree(v, depth + 1) + else: + print(" " * (depth * 3), k, v) + for p, q in v.attrs.items(): + print(" " * (depth * 3 + 3), p, q) + + size = os.path.getsize(self.path) + print(f"HDF5 file {self.path}: {size:,} ({bytes(size)})") + with h5py.File(self.path, mode="r") as f: + print("Content:") + h5_tree(f, 1) diff --git a/climetlab/scripts/create.py b/climetlab/scripts/create.py new file mode 100644 index 00000000..c01ec805 --- /dev/null +++ b/climetlab/scripts/create.py @@ -0,0 +1,233 @@ +# (C) Copyright 2023 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + +import os +import time +from contextlib import contextmanager + +from climetlab import settings +from climetlab.loaders import HDF5Loader, ZarrLoader +from climetlab.utils.humanize import list_to_human + +from .tools import parse_args + +LAST_CALLBACK = 0 +LAST_MESSAGE = "" + + +class LoadersCmd: + @parse_args( + # dataset=( + # "--dataset", + # dict( + # help="Name of the HDF5 dataset to use" + # " (default from config or 'dataset')" + # ), + # ), + target=( + "--target", + dict( + help="Where to store the final data. " + "Currently only a path to a new ZARR is supported." + ), + ), + init=( + "--init", + dict(action="store_true", help="Initialise zarr."), + ), + load=( + "--load", + dict(action="store_true", help="Load data into zarr."), + ), + statistics=( + "--statistics", + dict(action="store_true", help="Compute statistics."), + ), + total_size=( + "--total-size", + dict(action="store_true", help="Compute total size."), + ), + config=( + "--config", + dict( + help="Use with --init. A yaml file that describes which data to use as input" + " and how to organise them in the target." + ), + ), + parts=( + "--parts", + dict(nargs="+", help="Use with --load. Part(s) of the data to process."), + ), + no_write=( + "--no-write", + dict( + action="store_true", help="Only compute statistics, do not write them." + ), + ), + cache_dir=( + "--cache-dir", + dict( + help="Use with --load. Location of cache directory for temporary data." + ), + ), + format=( + "--format", + dict( + help="The format of the target storage into which to load the data" + " (default is inferred from target path extension)" + " only .zarr is currently supported." + ), + ), + no_check=( + "--no-check", + dict(action="store_true", help="Skip checks."), + ), + force=( + "--force", + dict(action="store_true", help="Overwrite if already exists."), + ), + timeout=( + "--timeout", + dict( + type=int, + default=0, + help="Stop with error (SIGALARM) after TIMEOUT seconds.", + ), + ), + ) + def do_create(self, args): + create(args) + + +def create(args): + format = args.format + + if args.timeout: + import signal + + signal.alarm(args.timeout) + + if format is None: + _, ext = os.path.splitext(args.target) + format = ext[1:] + assert format == "zarr", f"Unsupported format={format}" + + def no_callback(*args, **kwargs): + print(*args, **kwargs) + return + + if os.environ.get("CLIMETLAB_CREATE_SHELL_CALLBACK"): + + def callback(*msg): + global LAST_CALLBACK, LAST_MESSAGE + + msg = [str(_) for _ in msg] + msg = "\n".join(msg) + + if time.time() - LAST_CALLBACK < 10 and LAST_MESSAGE[:10] == msg[:10]: + return + + import shlex + import subprocess + import traceback + + cmd = os.environ.get("CLIMETLAB_CREATE_SHELL_CALLBACK") + cmd = cmd.format(msg) + try: + print(f"Running {cmd}") + args = shlex.split(cmd) # shlex honors the quotes + subprocess.Popen(args) + except Exception as e: + print(f"Exception when running {cmd}" + traceback.format_exc()) + print(e) + + LAST_CALLBACK = time.time() + LAST_MESSAGE = msg + + callback("Starting-zarr-loader.") + else: + callback = no_callback + + LOADERS = dict( + zarr=ZarrLoader, + h5=HDF5Loader, + hdf5=HDF5Loader, + hdf=HDF5Loader, + ) + if format not in LOADERS: + lst = list_to_human(list(LOADERS.keys()), "or") + raise ValueError(f"Invalid format '{format}', must be one of {lst}.") + + kwargs = vars(args) + kwargs["path"] = kwargs["target"] + kwargs["print"] = callback + loader_class = LOADERS[format] + + lst = [args.load, args.statistics, args.init, args.total_size] + if sum(1 for x in lst if x) > 1: + raise ValueError( + "Too many options provided." + 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' + ) + if sum(1 for x in lst if x) < 1: + raise ValueError( + "Not enough options provided." + 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' + ) + if args.parts: + assert args.load, "Use --parts only with --load" + if args.no_write: + assert args.statistics, "Use --no-write only with --statistics" + + @contextmanager + def dummy_context(): + yield + + context = dummy_context() + if kwargs["cache_dir"]: + context = settings.temporary("cache-directory", kwargs["cache_dir"]) + + with context: + if args.init: + assert args.config, "--init requires --config" + assert args.target, "--init requires --target" + + import zarr + + try: + zarr.open(args.target, "r") + if not args.force: + raise Exception( + f"{args.target} already exists. Use --force to overwrite." + ) + except zarr.errors.PathNotFoundError: + pass + + loader = loader_class.from_config(partial=True, **kwargs) + loader.initialise() + exit() + + if args.load: + assert args.config is None, "--load requires only a --target, no --config." + loader = loader_class.from_zarr(**kwargs) + loader.load(**kwargs) + + if args.statistics: + assert ( + args.config is None + ), "--statistics requires only --target, no --config." + loader = loader_class.from_zarr(**kwargs) + loader.add_statistics(**kwargs) + + if args.total_size: + assert ( + args.config is None + ), "--total-size requires only --target, no --config." + loader = loader_class.from_zarr(**kwargs) + loader.add_total_size(**kwargs) From 8a201a69332e305dcf8b269c7b9b3fa0971fca9a Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Tue, 14 Nov 2023 16:10:59 +0000 Subject: [PATCH 4/6] removing dependencies with zarr building code --- climetlab/core/__init__.py | 2 +- climetlab/core/index.py | 3 +- climetlab/indexing/database/sql.py | 2 +- climetlab/readers/grib/index/sql.py | 3 +- climetlab/scripts/create.py | 233 ----- climetlab/utils/config.py | 1246 --------------------------- 6 files changed, 4 insertions(+), 1485 deletions(-) delete mode 100644 climetlab/scripts/create.py delete mode 100644 climetlab/utils/config.py diff --git a/climetlab/core/__init__.py b/climetlab/core/__init__.py index 75a32d76..347b0d2d 100644 --- a/climetlab/core/__init__.py +++ b/climetlab/core/__init__.py @@ -11,7 +11,7 @@ from collections import defaultdict import climetlab -from climetlab.loaders import build_remapping +from climetlab.core.order import build_remapping LOG = logging.getLogger(__name__) diff --git a/climetlab/core/index.py b/climetlab/core/index.py index b19fb2f1..3251050a 100644 --- a/climetlab/core/index.py +++ b/climetlab/core/index.py @@ -15,9 +15,8 @@ from collections import defaultdict import climetlab as cml -from climetlab.core.order import normalize_order_by +from climetlab.core.order import build_remapping, normalize_order_by from climetlab.core.select import normalize_selection -from climetlab.loaders import build_remapping from climetlab.sources import Source LOG = logging.getLogger(__name__) diff --git a/climetlab/indexing/database/sql.py b/climetlab/indexing/database/sql.py index c0ab9b49..935a2edf 100644 --- a/climetlab/indexing/database/sql.py +++ b/climetlab/indexing/database/sql.py @@ -19,8 +19,8 @@ import numpy as np import climetlab as cml +from climetlab.core.order import build_remapping from climetlab.indexing.database.json import json_serialiser -from climetlab.loaders import build_remapping from climetlab.utils import tqdm from climetlab.utils.parts import Part diff --git a/climetlab/readers/grib/index/sql.py b/climetlab/readers/grib/index/sql.py index f64e0f8c..0101a107 100644 --- a/climetlab/readers/grib/index/sql.py +++ b/climetlab/readers/grib/index/sql.py @@ -11,7 +11,7 @@ from collections import namedtuple from climetlab.core.constants import DATETIME -from climetlab.core.order import normalize_order_by +from climetlab.core.order import build_remapping, normalize_order_by from climetlab.core.select import normalize_selection from climetlab.decorators import cached_method, normalize, normalize_grib_key_values from climetlab.indexing.database.sql import ( @@ -20,7 +20,6 @@ SqlRemapping, SqlSelection, ) -from climetlab.loaders import build_remapping from climetlab.readers.grib.index.db import FieldsetInFilesWithDBIndex from climetlab.utils.serialise import register_serialisation diff --git a/climetlab/scripts/create.py b/climetlab/scripts/create.py deleted file mode 100644 index c01ec805..00000000 --- a/climetlab/scripts/create.py +++ /dev/null @@ -1,233 +0,0 @@ -# (C) Copyright 2023 ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. -# - -import os -import time -from contextlib import contextmanager - -from climetlab import settings -from climetlab.loaders import HDF5Loader, ZarrLoader -from climetlab.utils.humanize import list_to_human - -from .tools import parse_args - -LAST_CALLBACK = 0 -LAST_MESSAGE = "" - - -class LoadersCmd: - @parse_args( - # dataset=( - # "--dataset", - # dict( - # help="Name of the HDF5 dataset to use" - # " (default from config or 'dataset')" - # ), - # ), - target=( - "--target", - dict( - help="Where to store the final data. " - "Currently only a path to a new ZARR is supported." - ), - ), - init=( - "--init", - dict(action="store_true", help="Initialise zarr."), - ), - load=( - "--load", - dict(action="store_true", help="Load data into zarr."), - ), - statistics=( - "--statistics", - dict(action="store_true", help="Compute statistics."), - ), - total_size=( - "--total-size", - dict(action="store_true", help="Compute total size."), - ), - config=( - "--config", - dict( - help="Use with --init. A yaml file that describes which data to use as input" - " and how to organise them in the target." - ), - ), - parts=( - "--parts", - dict(nargs="+", help="Use with --load. Part(s) of the data to process."), - ), - no_write=( - "--no-write", - dict( - action="store_true", help="Only compute statistics, do not write them." - ), - ), - cache_dir=( - "--cache-dir", - dict( - help="Use with --load. Location of cache directory for temporary data." - ), - ), - format=( - "--format", - dict( - help="The format of the target storage into which to load the data" - " (default is inferred from target path extension)" - " only .zarr is currently supported." - ), - ), - no_check=( - "--no-check", - dict(action="store_true", help="Skip checks."), - ), - force=( - "--force", - dict(action="store_true", help="Overwrite if already exists."), - ), - timeout=( - "--timeout", - dict( - type=int, - default=0, - help="Stop with error (SIGALARM) after TIMEOUT seconds.", - ), - ), - ) - def do_create(self, args): - create(args) - - -def create(args): - format = args.format - - if args.timeout: - import signal - - signal.alarm(args.timeout) - - if format is None: - _, ext = os.path.splitext(args.target) - format = ext[1:] - assert format == "zarr", f"Unsupported format={format}" - - def no_callback(*args, **kwargs): - print(*args, **kwargs) - return - - if os.environ.get("CLIMETLAB_CREATE_SHELL_CALLBACK"): - - def callback(*msg): - global LAST_CALLBACK, LAST_MESSAGE - - msg = [str(_) for _ in msg] - msg = "\n".join(msg) - - if time.time() - LAST_CALLBACK < 10 and LAST_MESSAGE[:10] == msg[:10]: - return - - import shlex - import subprocess - import traceback - - cmd = os.environ.get("CLIMETLAB_CREATE_SHELL_CALLBACK") - cmd = cmd.format(msg) - try: - print(f"Running {cmd}") - args = shlex.split(cmd) # shlex honors the quotes - subprocess.Popen(args) - except Exception as e: - print(f"Exception when running {cmd}" + traceback.format_exc()) - print(e) - - LAST_CALLBACK = time.time() - LAST_MESSAGE = msg - - callback("Starting-zarr-loader.") - else: - callback = no_callback - - LOADERS = dict( - zarr=ZarrLoader, - h5=HDF5Loader, - hdf5=HDF5Loader, - hdf=HDF5Loader, - ) - if format not in LOADERS: - lst = list_to_human(list(LOADERS.keys()), "or") - raise ValueError(f"Invalid format '{format}', must be one of {lst}.") - - kwargs = vars(args) - kwargs["path"] = kwargs["target"] - kwargs["print"] = callback - loader_class = LOADERS[format] - - lst = [args.load, args.statistics, args.init, args.total_size] - if sum(1 for x in lst if x) > 1: - raise ValueError( - "Too many options provided." - 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' - ) - if sum(1 for x in lst if x) < 1: - raise ValueError( - "Not enough options provided." - 'Must choose exactly one option in "--load", "--statistics", "--init", "--total-size"' - ) - if args.parts: - assert args.load, "Use --parts only with --load" - if args.no_write: - assert args.statistics, "Use --no-write only with --statistics" - - @contextmanager - def dummy_context(): - yield - - context = dummy_context() - if kwargs["cache_dir"]: - context = settings.temporary("cache-directory", kwargs["cache_dir"]) - - with context: - if args.init: - assert args.config, "--init requires --config" - assert args.target, "--init requires --target" - - import zarr - - try: - zarr.open(args.target, "r") - if not args.force: - raise Exception( - f"{args.target} already exists. Use --force to overwrite." - ) - except zarr.errors.PathNotFoundError: - pass - - loader = loader_class.from_config(partial=True, **kwargs) - loader.initialise() - exit() - - if args.load: - assert args.config is None, "--load requires only a --target, no --config." - loader = loader_class.from_zarr(**kwargs) - loader.load(**kwargs) - - if args.statistics: - assert ( - args.config is None - ), "--statistics requires only --target, no --config." - loader = loader_class.from_zarr(**kwargs) - loader.add_statistics(**kwargs) - - if args.total_size: - assert ( - args.config is None - ), "--total-size requires only --target, no --config." - loader = loader_class.from_zarr(**kwargs) - loader.add_total_size(**kwargs) diff --git a/climetlab/utils/config.py b/climetlab/utils/config.py deleted file mode 100644 index 9461535c..00000000 --- a/climetlab/utils/config.py +++ /dev/null @@ -1,1246 +0,0 @@ -# (C) Copyright 2023 ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. -# -import datetime -import itertools -import logging -import math -import os -import re -import time -import warnings -from collections import defaultdict -from copy import deepcopy -from functools import cached_property - -import numpy as np - -from climetlab.core.order import build_remapping, normalize_order_by -from climetlab.utils import load_json_or_yaml -from climetlab.utils.humanize import seconds - -LOG = logging.getLogger(__name__) - - -class DictObj(dict): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - for key, value in self.items(): - if isinstance(value, dict): - self[key] = DictObj(value) - continue - if isinstance(value, list): - self[key] = [ - DictObj(item) if isinstance(item, dict) else item for item in value - ] - continue - - def __getattr__(self, attr): - try: - return self[attr] - except KeyError: - raise AttributeError(attr) - - def __setattr__(self, attr, value): - self[attr] = value - - -class Config(DictObj): - def __init__(self, config): - if isinstance(config, str): - self.config_path = os.path.realpath(config) - config = load_json_or_yaml(config) - super().__init__(config) - - -class Inputs(list): - _do_load = None - - def __init__(self, args): - assert isinstance(args[0], (dict, Input)), args[0] - args = [c if isinstance(c, Input) else Input(c) for c in args] - super().__init__(args) - - def substitute(self, *args, **kwargs): - return Inputs([i.substitute(*args, **kwargs) for i in self]) - - def get_datetimes(self): - # get datetime from each input - # and make sure they are the same or None - datetimes = None - previous_name = None - for i in self: - new = i.get_datetimes() - if new is None: - continue - new = sorted(list(new)) - if datetimes is None: - datetimes = new - - if datetimes != new: - raise ValueError( - "Mismatch in datetimes", previous_name, datetimes, i.name, new - ) - previous_name = i.name - - if datetimes is None: - raise ValueError(f"No datetimes found in {self}") - - return datetimes - - def do_load(self, partial=False): - if self._do_load is None or self._do_load[1] != partial: - datasets = {} - for i in self: - i = i.substitute(vars=datasets) - ds = i.do_load(partial=partial) - datasets[i.name] = ds - - out = None - for ds in datasets.values(): - if out is None: - out = ds - else: - out += ds - - from climetlab.readers.grib.index import FieldSet - - assert isinstance(out, FieldSet), type(out) - self._do_load = (out, partial) - - return self._do_load[0] - - def __repr__(self) -> str: - return "\n".join(str(i) for i in self) - - -class DatetimeGetter: - def __init__(self, kwargs) -> None: - self.kwargs = kwargs - - @property - def values(self): - raise NotImplementedError(type(self)) - - -class MarsDatetimeGetter(DatetimeGetter): - def __init__(self, kwargs) -> None: - super().__init__(kwargs) - - date = self.kwargs.get("date", []) - hdate = self.kwargs.get("hdate", []) - time = self.kwargs.get("time", [0]) - step = self.kwargs.get("step", [0]) - - from climetlab.utils.dates import to_datetime_list - - date = to_datetime_list(date) - hdate = to_datetime_list(hdate) - time = make_list_int(time) - step = make_list_int(step) - - pass - - -class StandardMarsDatetimeGetter(MarsDatetimeGetter): - pass - - -class HindcastMarsDatetimeGetter(DatetimeGetter): - pass - - -class Era5AccumulationDatetimeGetter(DatetimeGetter): - pass - - -class ConstantDatetimeGetter(DatetimeGetter): - @property - def values(self): - return None - - -class Input: - _inheritance_done = False - _inheritance_others = None - _do_load = None - - def __init__(self, dic): - assert isinstance(dic, dict), dic - assert len(dic) == 1, dic - - self.name = list(dic.keys())[0] - self.config = dic[self.name] - - if self.name == "forcing" or self.name == "constants": - if "source_or_dataset" in self.config: - # add $ to source_or_dataset for constants source. - # climetlab will be refactored to remove this. - assert self.config["source_or_dataset"][0] != "$", self.config[ - "source_or_dataset" - ] - self.config["source_or_dataset"] = ( - "$" + self.config["source_or_dataset"] - ) - - self.kwargs = self.config.get("kwargs", {}) - self.inherit = self.config.get("inherit", []) - self.function = self.config.get("function", None) - - def get_datetimes(self): - name = self.kwargs.get("name", None) - - assert name in [ - "era5-accumulations", - "constants", - "mars", - ], f"{name} not implemented" - - if name == "constants": - return None - - if name == "era5-accumulations": - return None - - if name == "mars": - is_hindast = "hdate" in self.kwargs - - date = self.kwargs.get("date", []) - hdate = self.kwargs.get("hdate", []) - time = self.kwargs.get("time", [0]) - step = self.kwargs.get("step", [0]) - - from climetlab.utils.dates import to_datetime_list - - date = to_datetime_list(date) - hdate = to_datetime_list(hdate) - time = make_list_int(time) - step = make_list_int(step) - - assert isinstance(date, (list, tuple)), date - assert isinstance(time, (list, tuple)), time - assert isinstance(step, (list, tuple)), step - - if is_hindast: - assert isinstance(hdate, (list, tuple)), hdate - if len(date) > 1 and len(hdate) > 1: - raise NotImplementedError( - ( - f"Cannot have multiple dates in {self} " - "when using hindcast {date=}, {hdate=}" - ) - ) - date = hdate - del hdate - - if len(step) > 1 and len(time) > 1: - raise NotImplementedError( - f"Cannot have multiple steps and multiple times in {self}" - ) - - datetimes = set() - for d, t, s in itertools.product(date, time, step): - new = build_datetime(date=d, time=t, step=s) - if new in datetimes: - raise DuplicateDateTimeError( - f"Duplicate datetime '{new}' when processing << {self} >> already in {datetimes}" - ) - datetimes.add(new) - return sorted(list(datetimes)) - - raise ValueError(f"{name=} Cannot count number of elements in {self}") - - def do_load(self, partial=False): - if not self._do_load or self._do_load[1] != partial: - from climetlab import load_dataset, load_source - - func = { - None: load_source, - "load_source": load_source, - "load_dataset": load_dataset, - }[self.function] - - kwargs = dict(**self.kwargs) - - if partial: - if "date" in kwargs and isinstance(kwargs["date"], list): - kwargs["date"] = [kwargs["date"][0]] - - LOG.info(f"Loading {self.name} with {func} {kwargs}") - ds = func(**kwargs) - - LOG.info(f" Loading {self.name} of len {len(ds)}: {ds}") - self._do_load = (ds, partial) - return self._do_load[0] - - def get_first_field(self): - return self.do_load()[0] - - def process_inheritance(self, others): - for o in others: - if o == self: - continue - name = o.name - if name.startswith("$"): - name = name[1:] - if name not in self.inherit: - continue - if not o._inheritance_done: - o.process_inheritance(others) - - kwargs = {} - kwargs.update(o.kwargs) - kwargs.update(self.kwargs) # self.kwargs has priority - self.kwargs = kwargs - - self._inheritance_others = others - self._inheritance_done = True - - def __repr__(self) -> str: - def repr(v): - if isinstance(v, list): - return f"{'/'.join(str(x) for x in v)}" - return str(v) - - details = ", ".join(f"{k}={repr(v)}" for k, v in self.kwargs.items()) - return f"Input({self.name}, {details})<{self.inherit}" - - def substitute(self, *args, **kwargs): - new_kwargs = substitute(self.kwargs.copy(), *args, **kwargs) - i = Input( - { - self.name: dict( - kwargs=new_kwargs, - inherit=self.inherit, - function=self.function, - ) - } - ) - # if self._inheritance_others: - # i.process_inheritance(self._inheritance_others) - return i - - -def make_list_int(value): - if isinstance(value, str): - if "/" not in value: - return [value] - bits = value.split("/") - if len(bits) == 3 and bits[1].lower() == "to": - value = list(range(int(bits[0]), int(bits[2]) + 1, 1)) - - elif len(bits) == 5 and bits[1].lower() == "to" and bits[3].lower() == "by": - value = list(range(int(bits[0]), int(bits[2]) + int(bits[4]), int(bits[4]))) - - if isinstance(value, list): - return value - if isinstance(value, tuple): - return value - if isinstance(value, int): - return [value] - - raise ValueError(f"Cannot make list from {value}") - - -def build_datetime(date, time, step): - if isinstance(date, str): - from climetlab.utils.dates import to_datetime - - date = to_datetime(date) - - if isinstance(time, int): - if time < 24: - time = f"{time:02d}00" - else: - time = f"{time:04d}" - - assert isinstance(date, datetime.datetime), date - assert date.hour == 0 and date.minute == 0 and date.second == 0, date - - assert isinstance(time, str), time - assert len(time) == 4, time - assert int(time) >= 0 and int(time) < 2400, time - if 0 < int(time) < 100: - LOG.warning(f"{time=}, using time with minutes is unusual.") - - dt = datetime.datetime( - year=date.year, - month=date.month, - day=date.day, - hour=int(time[0:2]), - minute=int(time[2:4]), - ) - - if step: - dt += datetime.timedelta(hours=step) - - return dt - - -class InputHandler: - def __init__(self, loops, input, output, partial=False): - inputs = Inputs(input) - self.output = output - self.loops = [ - c - if isinstance(c, Loop) and c.inputs == inputs - else Loop(c, inputs, parent=self, partial=partial) - for c in loops - ] - if not self.loops: - raise NotImplementedError("No loop") - - def iter_cubes(self): - for loop in self.loops: - yield from loop.iterate() - - @cached_property - def n_iter_loops(self): - return sum([loop.n_iter_loops for loop in self.loops]) - - @property - def first_cube(self): - return self.first_cube_creator.to_cube() - - @property - def first_cube_creator(self): - for loop in self.loops: - for cube_creator in loop.iterate(): - return cube_creator - - @cached_property - def chunking(self): - return self.first_cube.chunking(self.output.chunking) - - @cached_property - def n_cubes(self): - n = 0 - for loop in self.loops: - for i in loop.iterate(): - n += 1 - return n - - @cached_property - def _info(self): - infos = [loop._info for loop in self.loops] - - # check all are the same - ref = infos[0] - for c in infos: - assert (np.array(ref.grid_points) == np.array(c.grid_points)).all(), ( - "grid_points mismatch", - c.grid_points, - ref.grid_points, - type(ref.grid_points), - ) - assert ref.resolution == c.resolution, ( - "resolution mismatch", - c.resolution, - ref.resolution, - ) - assert ref.variables == c.variables, ( - "variables mismatch", - c.variables, - ref.variables, - ) - - coords = deepcopy(ref.coords) - assert ( - "valid_datetime" in coords - ), f"valid_datetime not found in coords {coords}" - coords["valid_datetime"] = self.get_datetimes() - - for info in infos: - for name, values in info.coords.items(): - if name == "valid_datetime": - continue - assert values == ref.coords[name], (values, ref.coords[name]) - - return Info( - ref.first_field, - ref.grid_points, - ref.resolution, - coords, - ref.variables, - ref.data_request, - ) - - @property - def first_field(self): - return self._info.first_field - - @property - def grid_points(self): - return self._info.grid_points - - @property - def resolution(self): - return self._info.resolution - - @property - def data_request(self): - return self._info.data_request - - @property - def coords(self): - return self._info.coords - - @property - def variables(self): - return self._info.variables - - @property - def shape(self): - shape = [len(c) for c in self.coords.values()] - - field_shape = list(self.first_field.shape) - if self.output.flatten_grid: - field_shape = [math.prod(field_shape)] - - return shape + field_shape - - @cached_property - def _datetimes_and_frequency(self): - # merge datetimes from all loops and check there are no duplicates - datetimes = set() - for i in self.loops: - assert isinstance(i, Loop), i - new = i.get_datetimes() - for d in new: - assert d not in datetimes, (d, datetimes) - datetimes.add(d) - datetimes = sorted(list(datetimes)) - - def check(datetimes): - if not datetimes: - raise ValueError("No datetimes found.") - if len(datetimes) == 1: - raise ValueError("Only one datetime found.") - - delta = None - for i in range(1, len(datetimes)): - new = (datetimes[i] - datetimes[i - 1]).total_seconds() / 3600 - if not delta: - delta = new - continue - if new != delta: - raise ValueError( - f"Datetimes are not regularly spaced: " - f"delta={new} hours (date {i-1}={datetimes[i-1]} date {i}={datetimes[i]}) " - f"Expecting {delta} hours (date {0}={datetimes[0]} date {1}={datetimes[1]}) " - ) - - check(datetimes) - - freq = (datetimes[1] - datetimes[0]).total_seconds() / 3600 - assert round(freq) == freq, freq - assert int(freq) == freq, freq - frequency = int(freq) - - return datetimes, frequency - - @property - def frequency(self): - return self._datetimes_and_frequency[1] - - def get_datetimes(self): - return self._datetimes_and_frequency[0] - - def __repr__(self): - return "InputHandler\n " + "\n ".join(str(i) for i in self.loops) - - -class Loop(dict): - def __init__(self, dic, inputs, partial=False, parent=None): - assert isinstance(dic, dict), dic - assert len(dic) == 1, dic - super().__init__(dic) - - self.parent = parent - self.name = list(dic.keys())[0] - self.config = deepcopy(dic[self.name]) - self.partial = partial - - if "applies_to" not in self.config: - # if applies_to is not specified, apply to all inputs - self.config.applies_to = [i.name for i in inputs] - assert "applies_to" in self.config, self.config - applies_to = self.config.pop("applies_to") - self.applies_to_inputs = Inputs( - [input for input in inputs if input.name in applies_to] - ) - for i in self.applies_to_inputs: - i.process_inheritance(self.applies_to_inputs) - - self.values = {} - for k, v in self.config.items(): - self.values[k] = self.expand(v) - - def expand(self, values): - return expand(values) - - def __repr__(self) -> str: - def repr_lengths(v): - return f"{','.join([str(len(x)) for x in v])}" - - lenghts = [f"{k}({repr_lengths(v)})" for k, v in self.values.items()] - return f"Loop({self.name}, {','.join(lenghts)}) {self.config}" - - @cached_property - def n_iter_loops(self): - return len(list(itertools.product(*self.values.values()))) - - def iterate(self): - for items in itertools.product(*self.values.values()): - yield CubeCreator( - inputs=self.applies_to_inputs, - vars=dict(zip(self.values.keys(), items)), - loop_config=self.config, - output=self.parent.output, - partial=self.partial, - ) - - @property - def first(self): - return CubeCreator( - inputs=self.applies_to_inputs, - vars={k: lst[0] for k, lst in self.values.items() if lst}, - loop_config=self.config, - output=self.parent.output, - partial=self.partial, - ) - - @cached_property - def _info(self): - first_info = self.first._info - coords = deepcopy(first_info.coords) - assert ( - "valid_datetime" in coords - ), f"valid_datetime not found in coords {coords}" - coords["valid_datetime"] = self.get_datetimes() - return Info( - first_field=first_info.first_field, - grid_points=first_info.grid_points, - resolution=first_info.resolution, - coords=coords, - variables=first_info.variables, - data_request=first_info.data_request, - ) - - def get_datetimes(self): - # merge datetimes from all cubecreators and check there are no duplicates - datetimes = set() - - for i in self.iterate(): - assert isinstance(i, CubeCreator), i - new = i.get_datetimes() - - duplicates = datetimes.intersection(set(new)) - if duplicates: - raise DuplicateDateTimeError( - f"{len(duplicates)} duplicated datetimes " - f"'{sorted(list(duplicates))[0]},...' when processing << {self} >>" - ) - - datetimes = datetimes.union(set(new)) - return sorted(list(datetimes)) - - -class DuplicateDateTimeError(ValueError): - pass - - -class CubeCreator: - def __init__(self, inputs, vars, loop_config, output, partial=False): - self._loop_config = loop_config - self._vars = vars - self._inputs = inputs - self.output = output - self.partial = partial - - self.inputs = inputs.substitute(vars=vars, ignore_missing=True) - - @property - def length(self): - return 1 - - def __repr__(self) -> str: - out = f"CubeCreator ({self.length}):\n" - out += f" loop_config: {self._loop_config}" - out += f" vars: {self._vars}\n" - out += " Inputs:\n" - for _i, i in zip(self._inputs, self.inputs): - out += f"- {_i}\n" - out += f" {i}\n" - return out - - def do_load(self): - return self.inputs.do_load(self.partial) - - def get_datetimes(self): - return self.inputs.get_datetimes() - - def to_cube(self): - cube, _ = self._to_data_and_cube() - return cube - - def _to_data_and_cube(self): - data = self.do_load() - - start = time.time() - LOG.info("Sorting dataset %s %s", self.output.order_by, self.output.remapping) - cube = data.cube( - self.output.order_by, - remapping=self.output.remapping, - patches={"number": {None: 0}}, - flatten_values=self.output.flatten_grid, - ) - cube = cube.squeeze() - LOG.info(f"Sorting done in {seconds(time.time()-start)}.") - - def check(actual_dic, requested_dic): - assert self.output.statistics in actual_dic - - for key in set(list(actual_dic.keys()) + list(requested_dic.keys())): - actual = actual_dic[key] - requested = requested_dic[key] - - actual = list(actual) - - if requested == "ascending": - assert actual == sorted( - actual - ), f"Requested= {requested} Actual= {actual}" - continue - assert actual == requested, f"Requested= {requested} Actual= {actual}" - - check(actual_dic=cube.user_coords, requested_dic=self.output.order_by) - - return cube, data - - @property - def _info(self): - cube, data = self._to_data_and_cube() - - first_field = data[0] - data_request = self._get_data_request(data) - grid_points = first_field.grid_points() - resolution = first_field.resolution - coords = cube.user_coords - variables = list(coords[list(coords.keys())[1]]) - - return Info( - first_field, grid_points, resolution, coords, variables, data_request - ) - - def _get_data_request(self, data): - date = None - params_levels = defaultdict(set) - params_steps = defaultdict(set) - - for field in data: - if not hasattr(field, "as_mars"): - continue - if date is None: - date = field.valid_datetime() - if field.valid_datetime() != date: - continue - - as_mars = field.as_mars() - step = as_mars.get("step") - levtype = as_mars.get("levtype", "sfc") - param = as_mars["param"] - levelist = as_mars.get("levelist", None) - area = field.mars_area - grid = field.mars_grid - - if levelist is None: - params_levels[levtype].add(param) - else: - params_levels[levtype].add((param, levelist)) - - if step: - params_steps[levtype].add((param, step)) - - def sort(old_dic): - new_dic = {} - for k, v in old_dic.items(): - new_dic[k] = sorted(list(v)) - return new_dic - - params_steps = sort(params_steps) - params_levels = sort(params_levels) - - out = dict( - param_level=params_levels, param_step=params_steps, area=area, grid=grid - ) - return out - - -def _format_list(x): - if isinstance(x, (list, tuple)): - if isinstance(x[0], datetime.datetime): - is_regular = True - delta = x[1] - x[0] - for prev, current in zip(x[:-1], x[1:]): - if current - prev != delta: - is_regular = False - break - if is_regular: - return f"{_format_list(x[0])}/to/{_format_list(x[-1])}/by/{delta.total_seconds()/3600}" - - txt = "/".join(_format_list(_) for _ in x) - if len(txt) > 200: - txt = txt[:50] + "..." + txt[-50:] - return txt - - if isinstance(x, datetime.datetime): - return x.strftime("%Y-%m-%d.%H:%M") - return str(x) - - -class Info: - def __init__( - self, first_field, grid_points, resolution, coords, variables, data_request - ): - assert len(set(variables)) == len(variables), ( - "Duplicate variables", - variables, - ) - - assert grid_points[0].shape == grid_points[1].shape, ( - grid_points[0].shape, - grid_points[1].shape, - grid_points[0], - grid_points[1], - ) - - assert len(grid_points) == 2, grid_points - - expected = math.prod(first_field.shape) - assert len(grid_points[0]) == expected, (len(grid_points[0]), expected) - - self.first_field = first_field - self.grid_points = grid_points - self.resolution = resolution - self.coords = coords - self.variables = variables - self.data_request = data_request - - def __repr__(self): - shape = ( - f"{','.join([str(len(v)) for k, v in self.coords.items()])}," - + f"{','.join([str(_) for _ in self.first_field.shape])}" - ) - shape = shape.rjust(20) - return ( - f"Info(first_field={self.first_field}, " - f"resolution={self.resolution}, " - f"variables={'/'.join(self.variables)})" - f" coords={', '.join([k + ':' + _format_list(v) for k, v in self.coords.items()])}" - f" {shape}" - ) - - -class Purpose: - def __init__(self, name): - self.name = name - - def __str__(self): - return str(self.name) - - def __call__(self, config): - pass - - @classmethod - def dict_to_str(cls, x): - if isinstance(x, str): - return x - return list(x.keys())[0] - - -class NonePurpose(Purpose): - def __call__(self, config): - config.output.flatten_grid = config.output.get("flatten_grid", False) - config.output.ensemble_dimension = config.output.get( - "ensemble_dimension", False - ) - - -class AifsPurpose(Purpose): - def __call__(self, config): - def check_dict_value_and_set(dic, key, value): - if key in dic: - if dic[key] != value: - raise ValueError( - f"Cannot use {key}={dic[key]} with {self} purpose. Must use {value}." - ) - dic[key] = value - - def ensure_element_in_list(lst, elt, index): - if elt in lst: - assert lst[index] == elt - return lst - - _lst = [self.dict_to_str(d) for d in lst] - if elt in _lst: - assert _lst[index] == elt - return lst - - return lst[:index] + [elt] + lst[index:] - - check_dict_value_and_set(config.output, "flatten_grid", True) - check_dict_value_and_set(config.output, "ensemble_dimension", 2) - - assert isinstance(config.output.order_by, (list, tuple)), config.output.order_by - config.output.order_by = ensure_element_in_list( - config.output.order_by, "number", config.output.ensemble_dimension - ) - - order_by = config.output.order_by - assert len(order_by) == 3, order_by - assert self.dict_to_str(order_by[0]) == "valid_datetime", order_by - assert self.dict_to_str(order_by[2]) == "number", order_by - - -PURPOSES = {None: NonePurpose, "aifs": AifsPurpose} - - -class LoadersConfig(Config): - def __init__(self, config, *args, **kwargs): - super().__init__(config, *args, **kwargs) - - if "description" not in self: - raise ValueError("Must provide a description in the config.") - - purpose = PURPOSES[self.get("purpose")](self.get("purpose")) - purpose(self) - - if not isinstance(self.input, (tuple, list)): - LOG.warning(f"{self.input=} is not a list") - self.input = [self.input] - - if "loops" in self: - warnings.warn("Should use loop instead of loops in config") - assert "loop" not in self - self.loop = self.pop("loops") - - if not isinstance(self.loop, list): - assert isinstance(self.loop, dict), self.loop - self.loop = [dict(loop_a=self.loop)] - - if "order" in self.output: - raise ValueError(f"Do not use 'order'. Use order_by in {config}") - if "order_by" in self.output: - self.output.order_by = normalize_order_by(self.output.order_by) - - self.output.remapping = self.output.get("remapping", {}) - self.output.remapping = build_remapping( - self.output.remapping, patches={"number": {None: 0}} - ) - - self.output.chunking = self.output.get("chunking", {}) - self.output.dtype = self.output.get("dtype", "float32") - - self.reading_chunks = self.get("reading_chunks") - assert "flatten_values" not in self.output - assert "flatten_grid" in self.output - - # The axis along which we append new data - # TODO: assume grid points can be 2d as well - self.output.append_axis = 0 - - assert "statistics" in self.output - statistics_axis_name = self.output.statistics - statistics_axis = -1 - for i, k in enumerate(self.output.order_by): - if k == statistics_axis_name: - statistics_axis = i - - assert ( - statistics_axis >= 0 - ), f"{self.output.statistics} not in {list(self.output.order_by.keys())}" - - self.statistics_names = self.output.order_by[statistics_axis_name] - - # TODO: consider 2D grid points - self.statistics_axis = statistics_axis - - def input_handler(self, partial=False): - return InputHandler(self.loop, self.input, output=self.output, partial=partial) - - -def substitute(x, vars=None, ignore_missing=False): - """Recursively substitute environment variables and dict values in a nested list ot dict of string. - substitution is performed using the environment var (if UPPERCASE) or the input dictionary. - - - >>> substitute({'bar': '$bar'}, {'bar': '43'}) - {'bar': '43'} - - >>> substitute({'bar': '$BAR'}, {'BAR': '43'}) - Traceback (most recent call last): - ... - KeyError: 'BAR' - - >>> substitute({'bar': '$BAR'}, ignore_missing=True) - {'bar': '$BAR'} - - >>> os.environ["BAR"] = "42" - >>> substitute({'bar': '$BAR'}) - {'bar': '42'} - - >>> substitute('$bar', {'bar': '43'}) - '43' - - >>> substitute('$hdates_from_date($date, 2015, 2018)', {'date': '2023-05-12'}) - '2015-05-12/2016-05-12/2017-05-12/2018-05-12' - - """ - if vars is None: - vars = {} - if isinstance(x, (tuple, list)): - return [substitute(y, vars, ignore_missing=ignore_missing) for y in x] - - if isinstance(x, dict): - return { - k: substitute(v, vars, ignore_missing=ignore_missing) for k, v in x.items() - } - - if isinstance(x, str): - if "$" not in x: - return x - - lst = [] - - for i, bit in enumerate(re.split(r"(\$(\w+)(\([^\)]*\))?)", x)): - i %= 4 - if i in [2, 3]: - continue - if i == 1: - try: - if "(" in bit: - # substitute by a function - FUNCTIONS = dict(hdates_from_date=hdates_from_date) - - pattern = r"\$(\w+)\(([^)]*)\)" - match = re.match(pattern, bit) - assert match, bit - - function_name = match.group(1) - params = [p.strip() for p in match.group(2).split(",")] - params = [ - substitute(p, vars, ignore_missing=ignore_missing) - for p in params - ] - - bit = FUNCTIONS[function_name](*params) - - elif bit.upper() == bit: - # substitute by the var env if $UPPERCASE - bit = os.environ[bit[1:]] - else: - # substitute by the value in the 'vars' dict - bit = vars[bit[1:]] - except KeyError as e: - if not ignore_missing: - raise e - - if bit != x: - bit = substitute(bit, vars, ignore_missing=ignore_missing) - - lst.append(bit) - - lst = [_ for _ in lst if _ != ""] - if len(lst) == 1: - return lst[0] - - out = [] - for elt in lst: - # if isinstance(elt, str): - # elt = [elt] - assert isinstance(elt, (list, tuple)), elt - out += elt - return out - - return x - - -def hdates_from_date(date, start_year, end_year): - """ - Returns a list of dates in the format '%Y%m%d' between start_year and end_year (inclusive), - with the year of the input date. - - Args: - date (str or datetime): The input date. - start_year (int): The start year. - end_year (int): The end year. - - Returns: - List[str]: A list of dates in the format '%Y%m%d'. - """ - if not str(start_year).isdigit(): - raise ValueError(f"start_year must be an int: {start_year}") - if not str(end_year).isdigit(): - raise ValueError(f"end_year must be an int: {end_year}") - start_year = int(start_year) - end_year = int(end_year) - - from climetlab.utils.dates import to_datetime - - if isinstance(date, (list, tuple)): - if len(date) != 1: - raise NotImplementedError(f"{date} should have only one element.") - date = date[0] - - date = to_datetime(date) - assert not (date.hour or date.minute or date.second), date - - hdates = [date.replace(year=year) for year in range(start_year, end_year + 1)] - return "/".join(d.strftime("%Y-%m-%d") for d in hdates) - - -class Expand(list): - """ - This class is used to expand loops. - It creates a list of list in self.groups. - """ - - def __init__(self, config, **kwargs): - self._config = config - self.kwargs = kwargs - self.groups = [] - self.parse_config() - - def parse_config(self): - from climetlab.utils.dates import to_datetime - - self.start = self._config.get("start") - if self.start is not None: - self.start = to_datetime(self.start) - self.end = self._config.get("end", self._config.get("stop")) - if self.end is not None: - self.end = to_datetime(self.end) - self.step = self._config.get("step", self._config.get("frequency", 1)) - self.group_by = self._config.get("group_by") - - -class HindcastExpand(Expand): - def __init__(self, config, **kwargs): - super().__init__(config, **kwargs) - self.groups = [["todo", "todo"]] - - -class ValuesExpand(Expand): - def __init__(self, config, **kwargs): - super().__init__(config, **kwargs) - values = self._config["values"] - values = [[v] if not isinstance(v, list) else v for v in values] - for v in self._config["values"]: - if not isinstance(v, (tuple, list)): - v = [v] - self.groups.append(v) - - -class StartStopExpand(Expand): - def __init__(self, config, **kwargs): - super().__init__(config, **kwargs) - - x = self.start - all = [] - while x <= self.end: - all.append(x) - x += self.step - - result = [list(g) for _, g in itertools.groupby(all, key=self.grouper_key)] - self.groups = [[self.format(x) for x in g] for g in result] - - def parse_config(self): - if "stop" in self._config: - raise ValueError(f"Use 'end' not 'stop' in loop. {self._config}") - super().parse_config() - - def format(self, x): - return x - - -class GroupByDays: - def __init__(self, days): - self.days = days - - def __call__(self, dt): - year = dt.year - days = (dt - datetime.datetime(year, 1, 1)).days - x = (year, days // self.days) - # print(x) - return x - - -class DateStartStopExpand(StartStopExpand): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def parse_config(self): - super().parse_config() - assert isinstance(self.start, datetime.date), (type(self.start), self.start) - assert isinstance(self.end, datetime.date), (type(self.end), self.end) - self.step = datetime.timedelta(days=self.step) - - if isinstance(self.group_by, int) and self.group_by > 0: - self.grouper_key = GroupByDays(self.group_by) - else: - self.grouper_key = { - 0: lambda dt: 0, # only one group - "monthly": lambda dt: (dt.year, dt.month), - "daily": lambda dt: (dt.year, dt.month, dt.day), - "MMDD": lambda dt: (dt.month, dt.day), - }[self.group_by] - - def format(self, x): - return x.isoformat() - - -class IntStartStopExpand(StartStopExpand): - def grouper_key(self, x): - return { - 1: lambda x: 0, # only one group - None: lambda x: x, # one group per value - }[self.group_by](x) - - -def _expand_class(values): - if isinstance(values, list): - values = {"values": values} - - assert isinstance(values, dict), values - - if isinstance(values.get("values"), list): - assert len(values) == 1, f"No other config keys implemented. {values}" - return ValuesExpand - - if values.get("type") == "hindcast": - return HindcastExpand - - if start := values.get("start"): - if isinstance(start, datetime.datetime): - return DateStartStopExpand - if values.get("group_by") in [ - "monthly", - "daily", - "weekly", - "fortnightly", - ] or isinstance(values.get("group_by"), int): - return DateStartStopExpand - return IntStartStopExpand - - raise ValueError(f"Cannot expand loop from {values}") - - -def expand(values, **kwargs): - cls = _expand_class(values) - return cls(values, **kwargs).groups From a8d64011919bba5a64b50abc60190023b8aea123 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Sat, 18 Nov 2023 16:11:15 +0000 Subject: [PATCH 5/6] remove zarr creation --- climetlab/loaders/__init__.py | 1194 --------------------------------- climetlab/scripts/main.py | 2 - 2 files changed, 1196 deletions(-) delete mode 100644 climetlab/loaders/__init__.py diff --git a/climetlab/loaders/__init__.py b/climetlab/loaders/__init__.py deleted file mode 100644 index 9cfd7d58..00000000 --- a/climetlab/loaders/__init__.py +++ /dev/null @@ -1,1194 +0,0 @@ -# (C) Copyright 2023 ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. -# - - -import datetime -import logging -import os -import re -import time -import uuid -import warnings -from functools import cached_property - -import numpy as np -import tqdm - -from climetlab.core.order import build_remapping # noqa:F401 -from climetlab.utils import progress_bar -from climetlab.utils.config import LoadersConfig -from climetlab.utils.humanize import bytes, seconds - - -def compute_directory_size(path): - if not os.path.isdir(path): - return None - size = 0 - n = 0 - for dirpath, _, filenames in tqdm.tqdm( - os.walk(path), desc="Computing size", leave=False - ): - for filename in filenames: - file_path = os.path.join(dirpath, filename) - size += os.path.getsize(file_path) - n += 1 - return size, n - - -LOG = logging.getLogger(__name__) - -VERSION = "0.13" - - -class DatasetName: - def __init__( - self, - name, - resolution=None, - start_date=None, - end_date=None, - frequency=None, - ): - self.name = name - self.parsed = self._parse(name) - - self.messages = [] - - self.check_parsed() - self.check_resolution(resolution) - self.check_frequency(frequency) - self.check_start_date(start_date) - self.check_end_date(end_date) - - if self.messages: - self.messages.append( - f"{self} is parsed as :" - + "/".join(f"{k}={v}" for k, v in self.parsed.items()) - ) - - @property - def is_valid(self): - return not self.messages - - @property - def error_message(self): - out = " And ".join(self.messages) - if out: - out = out[0].upper() + out[1:] - return out - - def raise_if_not_valid(self, print=print): - if not self.is_valid: - for m in self.messages: - print(m) - raise ValueError(self.error_message) - - def _parse(self, name): - pattern = r"^(\w+)-(\w+)-(\w+)-(\w+)-(\w\w\w\w)-(\w+)-(\w+)-([\d\-]+)-(\d+h)-v(\d+)-?(.*)$" - match = re.match(pattern, name) - - parsed = {} - if match: - keys = [ - "use_case", - "class_", - "type_", - "stream", - "expver", - "source", - "resolution", - "period", - "frequency", - "version", - "additional", - ] - parsed = {k: v for k, v in zip(keys, match.groups())} - - period = parsed["period"].split("-") - assert len(period) in (1, 2), (name, period) - parsed["start_date"] = period[0] - if len(period) == 1: - parsed["end_date"] = period[0] - if len(period) == 2: - parsed["end_date"] = period[1] - - return parsed - - def __str__(self): - return self.name - - def check_parsed(self): - if not self.parsed: - self.messages.append( - ( - f"the dataset name {self} does not follow naming convention. " - "See here for details: " - "https://confluence.ecmwf.int/display/DWF/Datasets+available+as+zarr" - ) - ) - - def check_resolution(self, resolution): - if ( - self.parsed.get("resolution") - and self.parsed["resolution"][0] not in "0123456789on" - ): - self.messages.append( - ( - f"the resolution {self.parsed['resolution'] } should start " - f"with a number or 'o' or 'n' in the dataset name {self}." - ) - ) - - if resolution is None: - return - resolution_str = str(resolution).replace(".", "p").lower() - self._check_missing("resolution", resolution_str) - self._check_mismatch("resolution", resolution_str) - - def check_frequency(self, frequency): - if frequency is None: - return - frequency_str = f"{frequency}h" - self._check_missing("frequency", frequency_str) - self._check_mismatch("frequency", frequency_str) - - def check_start_date(self, start_date): - if start_date is None: - return - start_date_str = str(start_date.year) - self._check_missing("first date", start_date_str) - self._check_mismatch("start_date", start_date_str) - - def check_end_date(self, end_date): - if end_date is None: - return - end_date_str = str(end_date.year) - self._check_missing("end_date", end_date_str) - self._check_mismatch("end_date", end_date_str) - - def _check_missing(self, key, value): - if value not in self.name: - self.messages.append( - (f"the {key} is {value}, but is missing in {self.name}.") - ) - - def _check_mismatch(self, key, value): - if self.parsed.get(key) and self.parsed[key] != value: - self.messages.append( - (f"the {key} is {value}, but is {self.parsed[key]} in {self.name}.") - ) - - -def check_data_values(arr, *, name: str, log=[]): - min, max = arr.min(), arr.max() - assert not (np.isnan(arr).any()), (name, min, max, *log) - - if min == 9999.0: - warnings.warn(f"Min value 9999 for {name}") - if max == 9999.0: - warnings.warn(f"Max value 9999 for {name}") - - if name == ["lsm", "insolation"]: # 0. to 1. - assert max <= 1, (name, min, max, *log) - assert min >= 0, (name, min, max, *log) - - if name == "2t": # surface temp between -100 celcius and +100 celcius - assert max <= 373.15, (name, min, max, *log) - assert min >= 173.15, (name, min, max, *log) - - -def check_stats(minimum, maximum, mean, msg, **kwargs): - tolerance = (abs(minimum) + abs(maximum)) * 0.01 - if (mean - minimum < -tolerance) or (mean - minimum < -tolerance): - raise ValueError( - f"Mean is not in min/max interval{msg} : we should have {minimum} <= {mean} <= {maximum}" - ) - - -def _prepare_serialisation(o): - if isinstance(o, dict): - dic = {} - for k, v in o.items(): - v = _prepare_serialisation(v) - if k == "order_by": - # zarr attributes are saved with sort_keys=True - # and ordered dict are reordered. - # This is a problem for "order_by" - # We ensure here that the order_by key contains - # a list of dict - v = [{kk: vv} for kk, vv in v.items()] - dic[k] = v - return dic - - if isinstance(o, (list, tuple)): - return [_prepare_serialisation(v) for v in o] - - if o in (None, True, False): - return o - - if isinstance(o, (str, int, float)): - return o - - if isinstance(o, (datetime.date, datetime.datetime)): - return o.isoformat() - - return str(o) - - -class ArrayLike: - def flush(): - pass - - -class DummyArrayLike(ArrayLike): - """""" - - def __init__(self, array, shape): - self.array = array - - def __getattribute__(self, __name: str): - return super().__getattribute__(__name) - - def new_key(self, key, values_shape): - return key - - -class FastWriter(ArrayLike): - """ - A class that provides a caching mechanism for writing to a NumPy-like array. - - The `FastWriter` instance is initialized with a NumPy-like array and its shape. - The array is used to store the final data, while the cache is used to temporarily - store the data before flushing it to the array. The cache is a NumPy array of the same - shape as the final array, initialized with zeros. - - The `flush` method copies the contents of the cache to the final array. - """ - - def __init__(self, array, shape): - self.array = array - self.shape = shape - self.dtype = array.dtype - self.cache = np.zeros(shape, dtype=self.dtype) - - def __setitem__(self, key, value): - self.cache[key] = value - - def __getitem__(self, key): - return self.cache[key] - - def new_key(self, key, values_shape): - return self.array.new_key(key, values_shape) - - def flush(self): - self.array[:] = self.cache[:] - - def compute_statistics(self, statistics_registry, names): - nvars = self.shape[1] - - stats_shape = (self.shape[0], nvars) - - count = np.zeros(stats_shape, dtype=np.int64) - sums = np.zeros(stats_shape, dtype=np.float64) - squares = np.zeros(stats_shape, dtype=np.float64) - - minimum = np.zeros(stats_shape, dtype=np.float64) - maximum = np.zeros(stats_shape, dtype=np.float64) - - for i, chunk in enumerate(self.cache): - values = chunk.reshape((nvars, -1)) - minimum[i] = np.min(values, axis=1) - maximum[i] = np.max(values, axis=1) - sums[i] = np.sum(values, axis=1) - squares[i] = np.sum(values * values, axis=1) - count[i] = values.shape[1] - - stats = { - "minimum": minimum, - "maximum": maximum, - "sums": sums, - "squares": squares, - "count": count, - } - new_key = self.array.new_key(slice(None, None), self.shape) - assert self.array.axis == 0, self.array.axis - # print("new_key", new_key, self.array.offset, self.array.axis) - new_key = new_key[0] - statistics_registry[new_key] = stats - return stats - - def save_statistics(self, icube, statistics_registry, names): - now = time.time() - self.compute_statistics(statistics_registry, names) - LOG.info(f"Computed statistics in {seconds(time.time()-now)}.") - # for k, v in stats.items(): - # with open(f"stats_{icube}_{k}.npy", "wb") as f: - # np.save(f, v) - - -class OffsetView(ArrayLike): - """ - A view on a portion of the large_array. - 'axis' is the axis along which the offset applies. - 'shape' is the shape of the view. - """ - - def __init__(self, large_array, *, offset, axis, shape): - self.large_array = large_array - self.dtype = large_array.dtype - self.offset = offset - self.axis = axis - self.shape = shape - - def new_key(self, key, values_shape): - if isinstance(key, slice): - # Ensure that the slice covers the entire view along the axis. - print(self.shape) - assert key.start is None and key.stop is None, key - - # Create a new key for indexing the large array. - new_key = tuple( - slice(self.offset, self.offset + values_shape[i]) - if i == self.axis - else slice(None) - for i in range(len(self.shape)) - ) - else: - # For non-slice keys, adjust the key based on the offset and axis. - new_key = tuple( - k + self.offset if i == self.axis else k for i, k in enumerate(key) - ) - return new_key - - def __setitem__(self, key, values): - new_key = self.new_key(key, values.shape) - - start = time.time() - LOG.info("Writing data to disk") - self.large_array[new_key] = values - LOG.info(f"Writing data done in {seconds(time.time()-start)}.") - - -class CubesFilter: - def __init__(self, *, loader, parts, **kwargs): - self.loader = loader - - if parts is None: - self.parts = None - return - - if len(parts) == 1: - part = parts[0] - if part.lower() in ["all", "*"]: - self.parts = None - return - - if "/" in part: - i_chunk, n_chunks = part.split("/") - i_chunk, n_chunks = int(i_chunk), int(n_chunks) - - total = len(self.loader.registry.get_flags()) - assert i_chunk > 0, f"Chunk number {i_chunk} must be positive." - if n_chunks > total: - warnings.warn( - f"Number of chunks {n_chunks} is larger than the total number of chunks: {total}+1." - ) - - chunk_size = total / n_chunks - parts = [ - x - for x in range(total) - if x >= (i_chunk - 1) * chunk_size and x < i_chunk * chunk_size - ] - - parts = [int(_) for _ in parts] - LOG.info(f"Running parts: {parts}") - if not parts: - warnings.warn(f"Nothing to do for chunk {i_chunk}.") - - self.parts = parts - - def __call__(self, i): - if self.parts is None: - return True - return i in self.parts - - -class Loader: - def __init__(self, *, path, config, print=print, partial=False, **kwargs): - np.seterr( - all="raise" - ) # Catch all floating point errors, including overflow, sqrt(<0), etc - - self.main_config = LoadersConfig(config) - self.input_handler = self.main_config.input_handler(partial) - self.path = path - self.kwargs = kwargs - self.print = print - self.registry = ZarrBuiltRegistry(self.path) - self.statistics_registry = ZarrStatisticsRegistry(self.path) - - def load(self, **kwargs): - import zarr - - self.z = zarr.open(self.path, mode="r+") - self.registry.add_to_history("loading_data_start", parts=kwargs.get("parts")) - - filter = CubesFilter(loader=self, **kwargs) - ncubes = self.input_handler.n_cubes - for icube, cubecreator in enumerate(self.input_handler.iter_cubes()): - if not filter(icube): - continue - if self.registry.get_flag(icube): - LOG.info(f" -> Skipping {icube} total={ncubes} (already done)") - continue - self.print(f" -> Processing i={icube} total={ncubes}") - - cube = cubecreator.to_cube() - shape = cube.extended_user_shape - chunks = cube.chunking(self.input_handler.output.chunking) - axis = self.input_handler.output.append_axis - - slice = self.registry.get_slice_for(icube) - - LOG.info( - f"Building ZARR '{self.path}' i={icube} total={ncubes} (total shape ={shape}) at {slice}, {chunks=}" - ) - self.print(f"Building ZARR (total shape ={shape}) at {slice}, {chunks=}") - - offset = slice.start - array = OffsetView( - self.z["data"], - offset=offset, - axis=axis, - shape=shape, - ) - array = FastWriter(array, shape=shape) - self.load_datacube(cube, array) - - array.save_statistics( - icube, self.statistics_registry, self._variables_names - ) - - array.flush() - - self.registry.set_flag(icube) - - self.registry.add_to_history("loading_data_end", parts=kwargs.get("parts")) - self.registry.add_provenance(name="provenance_load") - - def load_datacube(self, cube, array): - start = time.time() - load = 0 - save = 0 - - reading_chunks = None - total = cube.count(reading_chunks) - self.print(f"Loading datacube {cube}") - bar = progress_bar( - iterable=cube.iterate_cubelets(reading_chunks), - total=total, - desc=f"Loading datacube {cube}", - ) - for i, cubelet in enumerate(bar): - now = time.time() - data = cubelet.to_numpy() - bar.set_description( - f"Loading {i}/{total} {str(cubelet)} ({data.shape}) {cube=}" - ) - load += time.time() - now - - j = cubelet.extended_icoords[1] - check_data_values( - data[:], - name=self._variables_names[j], - log=[i, j, data.shape, cubelet.extended_icoords], - ) - - now = time.time() - array[cubelet.extended_icoords] = data - save += time.time() - now - - now = time.time() - save += time.time() - now - - LOG.info("Written") - self.print_info() - LOG.info("Written.") - - self.print( - f"Elapsed: {seconds(time.time() - start)}," - f" load time: {seconds(load)}," - f" write time: {seconds(save)}." - ) - LOG.info( - f"Elapsed: {seconds(time.time() - start)}," - f" load time: {seconds(load)}," - f" write time: {seconds(save)}." - ) - - -def add_zarr_dataset( - *, - name, - dtype=None, - fill_value=np.nan, - zarr_root, - shape=None, - array=None, - overwrite=True, - **kwargs, -): - if dtype is None: - assert array is not None, (name, shape, array, dtype, zarr_root, fill_value) - dtype = array.dtype - - if shape is None: - assert array is not None, (name, shape, array, dtype, zarr_root, fill_value) - shape = array.shape - else: - assert array is None, (name, shape, array, dtype, zarr_root, fill_value) - array = np.full(shape, fill_value, dtype=dtype) - - a = zarr_root.create_dataset( - name, - shape=shape, - dtype=dtype, - overwrite=overwrite, - **kwargs, - ) - a[...] = array - return a - - -class ZarrRegistry: - synchronizer_name = None # to be defined in subclasses - - def __init__(self, path): - assert self.synchronizer_name is not None, self.synchronizer_name - - import zarr - - assert isinstance(path, str), path - self.zarr_path = path - self.synchronizer = zarr.ProcessSynchronizer(self._synchronizer_path) - - @property - def _synchronizer_path(self): - return self.zarr_path + "-" + self.synchronizer_name + ".sync" - - def _open_write(self): - import zarr - - return zarr.open(self.zarr_path, mode="r+", synchronizer=self.synchronizer) - - def _open_read(self, sync=True): - import zarr - - if sync: - return zarr.open(self.zarr_path, mode="r", synchronizer=self.synchronizer) - else: - return zarr.open(self.zarr_path, mode="r") - - def new_dataset(self, *args, **kwargs): - z = self._open_write() - zarr_root = z["_build"] - add_zarr_dataset(*args, zarr_root=zarr_root, overwrite=True, **kwargs) - - def add_to_history(self, action, **kwargs): - new = dict( - action=action, - timestamp=datetime.datetime.utcnow().isoformat(), - ) - new.update(kwargs) - - z = self._open_write() - history = z.attrs.get("history", []) - history.append(new) - z.attrs["history"] = history - - -class ZarrStatisticsRegistry(ZarrRegistry): - names = [ - "mean", - "stdev", - "minimum", - "maximum", - "sums", - "squares", - "count", - ] - build_names = [ - "minimum", - "maximum", - "sums", - "squares", - "count", - ] - synchronizer_name = "statistics" - - def __init__(self, path): - super().__init__(path) - - def create(self): - z = self._open_read() - shape = z["data"].shape - shape = (shape[0], shape[1]) - - for name in self.build_names: - if name == "count": - self.new_dataset(name=name, shape=shape, fill_value=0, dtype=np.int64) - else: - self.new_dataset( - name=name, shape=shape, fill_value=np.nan, dtype=np.float64 - ) - self.add_to_history("statistics_initialised") - - def __setitem__(self, key, stats): - z = self._open_write() - - LOG.info(f"Writting stats for {key}") - for name in self.build_names: - LOG.info(f"Writting stats for {key} {name} {stats[name].shape}") - z["_build"][name][key] = stats[name] - LOG.info(f"Written stats for {key}") - - def get_by_name(self, name): - z = self._open_read() - return z["_build"][name] - - -class ZarrBuiltRegistry(ZarrRegistry): - name_lengths = "lengths" - name_flags = "flags" - lengths = None - flags = None - z = None - synchronizer_name = "build" - - def get_slice_for(self, i): - lengths = self.get_lengths() - assert i >= 0 and i < len(lengths) - - start = sum(lengths[:i]) - stop = sum(lengths[: (i + 1)]) - return slice(start, stop) - - def get_lengths(self): - z = self._open_read() - return list(z["_build"][self.name_lengths][:]) - - def get_flags(self, **kwargs): - z = self._open_read(**kwargs) - LOG.info(list(z["_build"][self.name_flags][:])) - return list(z["_build"][self.name_flags][:]) - - def get_flag(self, i): - z = self._open_read() - return z["_build"][self.name_flags][i] - - def set_flag(self, i, value=True): - z = self._open_write() - z.attrs["latest_write_timestamp"] = datetime.datetime.utcnow().isoformat() - z["_build"][self.name_flags][i] = value - - def create(self, lengths, overwrite=False): - self.new_dataset(name=self.name_lengths, array=np.array(lengths, dtype="i4")) - self.new_dataset( - name=self.name_flags, array=np.array([False] * len(lengths), dtype=bool) - ) - self.add_to_history("initialised") - - def reset(self, lengths): - return self.create(lengths, overwrite=True) - - def add_provenance(self, name): - from ecml_tools.provenance import gather_provenance_info - - z = self._open_write() - z.attrs[name] = gather_provenance_info() - - -class ZarrLoader(Loader): - writer = None - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.z = None - - @classmethod - def from_config(cls, *, config, path, **kwargs): - # config is the path to the config file - # or a dict with the config - obj = cls(config=config, path=path, **kwargs) - return obj - - @classmethod - def from_zarr(cls, *, config, path, **kwargs): - import zarr - - assert os.path.exists(path), path - z = zarr.open(path, mode="r") - config = z.attrs["_create_yaml_config"] - # config = yaml.safe_load(z.attrs["_yaml_dump"])["_create_yaml_config"] - kwargs.get("print", print)("Config loaded from zarr: ", config) - return cls.from_config(config=config, path=path, **kwargs) - - def iter_loops(self): - for vars in self.input_handler.iter_loops(): - yield vars - - def _compute_lengths(self, multiply): - def squeeze_dict(dic): - keys = list(dic.keys()) - assert len(dic) == 1, keys - return dic[keys[0]] - - lengths = [] - for i, vars in enumerate(self.iter_loops()): - lst = squeeze_dict(vars) - assert isinstance(lst, (tuple, list)), lst - lengths.append(len(lst)) - print("i vars", i, vars, lengths, lst, f"{multiply=}") - - lengths = [x * multiply for x in lengths] - return lengths - - @property - def _variables_names(self): - return self.main_config.output.order_by[self.main_config.output.statistics] - - def initialise(self): - """Create empty zarr from self.main_config and self.path""" - import pandas as pd - import zarr - - self.print("config loaded ok:") - print(self.main_config) - print("-------------------------") - - total_shape = self.input_handler.shape - self.print(f"total_shape = {total_shape}") - print("-------------------------") - - grid_points = self.input_handler.grid_points - print(f"gridpoints size: {[len(i) for i in grid_points]}") - print("-------------------------") - - dates = self.input_handler.get_datetimes() - self.print(f"Found {len(dates)} datetimes.") - print( - f"Dates: Found {len(dates)} datetimes, in {self.input_handler.n_cubes} cubes: ", - end="", - ) - lengths = [str(len(c.get_datetimes())) for c in self.input_handler.iter_cubes()] - print("+".join(lengths)) - self.print(f"Found {len(dates)} datetimes {'+'.join(lengths)}.") - print("-------------------------") - - variables_names = self.input_handler.variables - self.print( - f"Found {len(variables_names)} variables : {','.join(variables_names)}." - ) - - assert ( - variables_names - == self.main_config.output.order_by[self.main_config.output.statistics] - ), ( - f"Requested= {self.main_config.output.order_by[self.main_config.output.statistics]} " - f"Actual= {variables_names}" - ) - - resolution = self.input_handler.resolution - print(f"{resolution=}") - - chunks = self.input_handler.chunking - print(f"{chunks=}") - dtype = self.main_config.output.dtype - - self.print( - f"Creating ZARR '{self.path}', with {total_shape=}, {chunks=} and {dtype=}" - ) - - frequency = self.input_handler.frequency - assert isinstance(frequency, int), frequency - - if not self.kwargs["no_check"]: - basename, ext = os.path.splitext(os.path.basename(self.path)) - - ds_name = DatasetName( - basename, - resolution, - dates[0], - dates[-1], - frequency, - ) - ds_name.raise_if_not_valid(print=self.print) - - metadata = {} - metadata["uuid"] = str(uuid.uuid4()) - - metadata.update(self.main_config.get("add_metadata", {})) - - metadata["_create_yaml_config"] = _prepare_serialisation(self.main_config) - - metadata["description"] = self.main_config.description - metadata["resolution"] = resolution - - metadata["data_request"] = self.input_handler.data_request - - metadata["order_by"] = self.main_config.output.order_by - metadata["remapping"] = self.main_config.output.remapping - metadata["flatten_grid"] = self.main_config.output.flatten_grid - metadata["ensemble_dimension"] = self.main_config.output.ensemble_dimension - - metadata["variables"] = variables_names - metadata["version"] = VERSION - metadata["frequency"] = frequency - metadata["start_date"] = dates[0].isoformat() - metadata["end_date"] = dates[-1].isoformat() - pd_dates_kwargs = dict( - start=metadata["start_date"], - end=metadata["end_date"], - freq=f"{metadata['frequency']}h", - unit="s", - ) - pd_dates = pd.date_range(**pd_dates_kwargs) - - def check_dates(input_handler, pd_dates, total_shape): - for i, loop in enumerate(input_handler.loops): - print(f"Loop {i}: ", loop._info) - if pd_dates.size != total_shape[0]: - raise ValueError( - f"Final date size {pd_dates.size} (from {pd_dates[0]} to {pd_dates[-1]}, " - f"{frequency=}) does not match data shape {total_shape[0]}. {total_shape=}" - ) - if pd_dates.size != len(dates): - raise ValueError( - f"Final date size {pd_dates.size} (from {pd_dates[0]} to {pd_dates[-1]}, " - f"{frequency=}) does not match data shape {len(dates)} (from {dates[0]} to " - f"{dates[-1]}). {pd_dates_kwargs}" - ) - - check_dates(self.input_handler, pd_dates, total_shape) - - metadata.update(self.main_config.get("force_metadata", {})) - - # write data - self.z = zarr.open(self.path, mode="w") - self.z.create_group("_build") - - self.z.create_dataset("data", shape=total_shape, chunks=chunks, dtype=dtype) - - np_dates = pd_dates.to_numpy() - self._add_dataset(name="dates", array=np_dates) - - self._add_dataset(name="latitudes", array=grid_points[0]) - self._add_dataset(name="longitudes", array=grid_points[1]) - - self.z = None - - self.update_metadata(**metadata) - - self.registry.create(lengths=lengths) - self.statistics_registry.create() - - self.registry.add_to_history("init finished") - - def update_metadata(self, **kwargs): - import zarr - - z = zarr.open(self.path, mode="w+") - for k, v in kwargs.items(): - if isinstance(v, np.datetime64): - v = v.astype(datetime.datetime) - if isinstance(v, datetime.date): - v = v.isoformat() - z.attrs[k] = v - - def statistics_start_indice(self): - return self._statistics_subset_indices[0] - - def statistics_end_indice(self): - return self._statistics_subset_indices[1] - - def _actual_statistics_start(self): - return self._statistics_subset_indices[2] - - def _actual_statistics_end(self): - return self._statistics_subset_indices[3] - - @cached_property - def _statistics_subset_indices(self): - statistics_start = self.main_config.output.get("statistics_start") - statistics_end = self.main_config.output.get("statistics_end") - try: - from ecml_tools.data import open_dataset - except ImportError: - raise Exception("Need to pip install ecml_tools[zarr]") - - if statistics_end is None: - warnings.warn( - "No statistics_end specified, using last date of the dataset." - ) - ds = open_dataset(self.path) - subset = ds.dates_interval_to_indices(statistics_start, statistics_end) - - return (subset[0], subset[-1], ds.dates[subset[0]], ds.dates[subset[-1]]) - - def _add_dataset(self, *args, **kwargs): - import zarr - - # print('add_dataset', args, kwargs) - - z = self.z - if z is None: - z = zarr.open(self.path, mode="r+") - - return add_zarr_dataset(*args, **kwargs, zarr_root=z) - - def print_info(self): - assert self.z is not None - try: - print(self.z["data"].info) - except Exception as e: - print(e) - print("...") - try: - print(self.z["data"].info) - except Exception as e: - print(e) - - def add_total_size(self, **kwargs): - size, n = compute_directory_size(self.path) - self.update_metadata(total_size=size, total_number_of_files=n) - - def add_statistics(self, no_write, **kwargs): - do_write = not no_write - - incomplete = not all(self.registry.get_flags(sync=False)) - if do_write and incomplete: - raise Exception( - f"Zarr {self.path} is not fully built, not writing statistics." - ) - - statistics_start = self.main_config.output.get("statistics_start") - statistics_end = self.main_config.output.get("statistics_end") - - if do_write: - self.registry.add_to_history( - "compute_statistics_start", - start=statistics_start, - end=statistics_end, - ) - - try: - from ecml_tools.data import open_dataset - except ImportError: - raise Exception("Need to pip install ecml_tools") - ds = open_dataset(self.path) - - stats = self.compute_statistics(ds, statistics_start, statistics_end) - - print( - "\n".join( - ( - f"{v.rjust(10)}: " - f"min/max = {stats['minimum'][j]:.6g} {stats['maximum'][j]:.6g}" - " \t: " - f"mean/stdev = {stats['mean'][j]:.6g} {stats['stdev'][j]:.6g}" - ) - for j, v in enumerate(ds.variables) - ) - ) - - if do_write: - for k in [ - "mean", - "stdev", - "minimum", - "maximum", - "sums", - "squares", - "count", - ]: - self._add_dataset(name=k, array=stats[k]) - - self.update_metadata( - statistics_start_date=self._actual_statistics_start(), - statistics_end_date=self._actual_statistics_end(), - ) - - self.registry.add_to_history( - "compute_statistics_end", - start=statistics_start, - end=statistics_end, - ) - - self.registry.add_provenance(name="provenance_statistics") - - def compute_statistics(self, ds, statistics_start, statistics_end): - i_start = self.statistics_start_indice() - i_end = self.statistics_end_indice() - - i_len = i_end + 1 - i_start - - self.print( - f"Statistics computed on {i_len}/{len(ds.dates)} samples " - f"first={ds.dates[i_start]} " - f"last={ds.dates[i_end]}" - ) - if i_end < i_start: - raise ValueError( - f"Cannot compute statistics on an empty interval." - f" Requested : {ds.dates[i_start]} {ds.dates[i_end]}." - f" Available: {ds.dates[0]=} {ds.dates[-1]=}" - ) - - reg = self.statistics_registry - - maximum = reg.get_by_name("maximum")[i_start : i_end + 1] - minimum = reg.get_by_name("minimum")[i_start : i_end + 1] - sums = reg.get_by_name("sums")[i_start : i_end + 1] - squares = reg.get_by_name("squares")[i_start : i_end + 1] - count = reg.get_by_name("count")[i_start : i_end + 1] - - assert len(maximum) == i_len, (len(maximum), i_len) - assert len(minimum) == i_len, (len(minimum), i_len) - assert len(sums) == i_len, (len(sums), i_len) - assert len(squares) == i_len, (len(squares), i_len) - assert len(count) == i_len, (len(count), i_len) - - assert not np.isnan(minimum).any(), minimum - assert not np.isnan(maximum).any(), maximum - assert not np.isnan(sums).any(), sums - assert not np.isnan(squares).any(), squares - # assert all(count > 0), count - - _minimum = np.amin(minimum, axis=0) - _maximum = np.amax(maximum, axis=0) - _count = np.sum(count, axis=0) - _sums = np.sum(sums, axis=0) - _squares = np.sum(squares, axis=0) - _mean = _sums / _count - - assert all(_count[0] == c for c in _count), _count - - x = _squares / _count - _mean * _mean - # remove negative variance due to numerical errors - # x[- 1e-15 < (x / (np.sqrt(_squares / _count) + np.abs(_mean))) < 0] = 0 - if not (x >= 0).all(): - print(x) - print(ds.variables) - print(_count) - for i, (var, y) in enumerate(zip(ds.variables, x)): - if y < 0: - print( - var, - y, - _maximum[i], - _minimum[i], - _mean[i], - _count[i], - _sums[i], - _squares[i], - ) - - print(var, np.min(sums[i]), np.max(sums[i]), np.argmin(sums[i])) - print( - var, - np.min(squares[i]), - np.max(squares[i]), - np.argmin(squares[i]), - ) - print(var, np.min(count[i]), np.max(count[i]), np.argmin(count[i])) - - raise ValueError("Negative variance") - - _stdev = np.sqrt(x) - - stats = { - "mean": _mean, - "stdev": _stdev, - "minimum": _minimum, - "maximum": _maximum, - "sums": _sums, - "squares": _squares, - "count": _count, - } - - for v in stats.values(): - assert v.shape == stats["mean"].shape - - for i, name in enumerate(ds.variables): - check_stats(**{k: v[i] for k, v in stats.items()}, msg=f"{i} {name}") - - return stats - - -class HDF5Loader: - def __init__(self, *args, **kwargs): - raise NotImplementedError() - - def append_array(self, *args, **kwargs): - raise NotImplementedError("Appending do HDF5 not yet implemented") - - def create_array( - self, - dataset, - shape, - chunks, - dtype, - metadata, - grid_points, - nloops, - ): - import h5py - - if not isinstance(chunks, tuple): - chunks = None - - print( - f"Creating HDD5 file '{self.path}', with {dataset=}, {shape=}, {chunks=} and {dtype=}" - ) - - self.h5 = h5py.File(self.path, mode="w") - array = self.h5.create_dataset( - dataset, - chunks=chunks, - maxshape=shape, - dtype=dtype, - data=np.empty( - shape - ) # Can we avoid that? Looks like its needed for chuncking - # data = h5py.Empty(dtype), - ) - return array - - def close(self): - self.h5.close() - del self.h5 - - def print_info(self): - import h5py - - def h5_tree(h5, depth=0): - for k, v in h5.items(): - if isinstance(v, h5py._hl.group.Group): - h5_tree(v, depth + 1) - else: - print(" " * (depth * 3), k, v) - for p, q in v.attrs.items(): - print(" " * (depth * 3 + 3), p, q) - - size = os.path.getsize(self.path) - print(f"HDF5 file {self.path}: {size:,} ({bytes(size)})") - with h5py.File(self.path, mode="r") as f: - print("Content:") - h5_tree(f, 1) diff --git a/climetlab/scripts/main.py b/climetlab/scripts/main.py index a997e122..1795f9b8 100644 --- a/climetlab/scripts/main.py +++ b/climetlab/scripts/main.py @@ -23,7 +23,6 @@ from .cache import CacheCmd from .check import CheckCmd from .completion import CompletionCmd -from .create import LoadersCmd from .grib import GribCmd from .grib_info import GribInfoCmd from .settings import SettingsCmd @@ -68,7 +67,6 @@ class CliMetLabApp( BenchmarkCmd, GribInfoCmd, AvailabilityCmd, - LoadersCmd, TestDataCmd, *get_plugins(), ): From a4db36b24ef50bba859bbd5a05ce935dd8a8044f Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Mon, 20 Nov 2023 21:14:43 +0000 Subject: [PATCH 6/6] Bump version 0.19.0 --- climetlab/version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/climetlab/version b/climetlab/version index 3f2bb467..1cf0537c 100644 --- a/climetlab/version +++ b/climetlab/version @@ -1 +1 @@ -0.18.15 +0.19.0