From d91e1be5e8a52e575aae3a48c294935af746d8eb Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 26 Jul 2024 14:19:51 +0200 Subject: [PATCH 01/22] split AnnData accessor from backed_access --- lamindb/core/_mapped_collection.py | 2 +- lamindb/core/storage/_anndata_accessor.py | 723 ++++++++++++++++++++++ lamindb/core/storage/_backed_access.py | 719 +-------------------- 3 files changed, 728 insertions(+), 716 deletions(-) create mode 100644 lamindb/core/storage/_anndata_accessor.py diff --git a/lamindb/core/_mapped_collection.py b/lamindb/core/_mapped_collection.py index 1484417f7..94581b721 100644 --- a/lamindb/core/_mapped_collection.py +++ b/lamindb/core/_mapped_collection.py @@ -10,7 +10,7 @@ from lamin_utils import logger from lamindb_setup.core.upath import UPath -from .storage._backed_access import ( +from .storage._anndata_accessor import ( ArrayType, ArrayTypes, GroupType, diff --git a/lamindb/core/storage/_anndata_accessor.py b/lamindb/core/storage/_anndata_accessor.py new file mode 100644 index 000000000..b09f0b45c --- /dev/null +++ b/lamindb/core/storage/_anndata_accessor.py @@ -0,0 +1,723 @@ +from __future__ import annotations + +import inspect +from functools import cached_property +from itertools import chain +from typing import TYPE_CHECKING, Callable, Mapping, Union + +import h5py +import numpy as np +import pandas as pd +from anndata import AnnData +from anndata import __version__ as anndata_version +from anndata._core.index import Index, _normalize_indices +from anndata._core.views import _resolve_idx +from anndata._io.h5ad import read_dataframe_legacy as read_dataframe_legacy_h5 +from anndata._io.specs.registry import get_spec, read_elem, read_elem_partial +from anndata.compat import _read_attr +from fsspec.implementations.local import LocalFileSystem +from lamin_utils import logger +from lamindb_setup.core.upath import UPath, create_mapper, infer_filesystem +from packaging import version + +if TYPE_CHECKING: + from pathlib import Path + + from fsspec.core import OpenFile + + +anndata_version_parse = version.parse(anndata_version) + +if anndata_version_parse < version.parse("0.10.0"): + if anndata_version_parse < version.parse("0.9.1"): + logger.warning( + "Full backed capabilities are not available for this version of anndata," + " please install anndata>=0.9.1." + ) + + from anndata._core.sparse_dataset import SparseDataset + + # try csr for groups with no encoding_type + class CSRDataset(SparseDataset): + @property + def format_str(self) -> str: + return "csr" + + def sparse_dataset(group): + return SparseDataset(group) + +else: + from anndata._core.sparse_dataset import ( + BaseCompressedSparseDataset as SparseDataset, + ) + from anndata._core.sparse_dataset import ( # type: ignore + CSRDataset, + sparse_dataset, + ) + + def _check_group_format(*args): + pass + + CSRDataset._check_group_format = _check_group_format + + +# zarr and CSRDataset have problems with full selection +def _subset_sparse(sparse_ds: CSRDataset | SparseDataset, indices): + has_arrays = isinstance(indices[0], np.ndarray) or isinstance( + indices[1], np.ndarray + ) + if not has_arrays and indices == (slice(None), slice(None)): + return sparse_ds.to_memory() + else: + return sparse_ds[indices] + + +def get_module_name(obj): + return inspect.getmodule(obj).__name__.partition(".")[0] + + +def _records_to_df(obj): + if isinstance(obj, pd.DataFrame): + return obj + + if hasattr(obj, "dtype") and obj.dtype.names is not None: + formats = [] + for name, (dt, _) in obj.dtype.fields.items(): + if dt.char == "S": + new_dt = str(dt).replace("S", "U") + else: + new_dt = dt + formats.append((name, new_dt)) + df = pd.DataFrame(obj.astype(formats, copy=False)) + for index_name in ("index", "_index"): + if index_name in df.columns: + return df.set_index(index_name) + return df + else: + return obj + + +class AccessRegistry: + def __init__(self): + self._registry = {} + self._openers = {} + + def register_open(self, module: str): + def wrapper(func: Callable): + self._openers[module] = func + return func + + return wrapper + + def open(self, module: str, *args, **kwargs): + if module in self._openers: + return self._openers[module](*args, **kwargs) + else: + raise ValueError(f"Module {module} not found, please install it.") + + def register(self, module: str): + def wrapper(func: Callable): + func_name = func.__name__ + if func_name not in self._registry: + self._registry[func_name] = {} + self._registry[func_name][module] = func + return func + + return wrapper + + def __getattr__(self, func_name: str): + def wrapper(*args, **kwargs): + func_registry = self._registry[func_name] + for arg in chain(args, kwargs.values()): + arg_module = get_module_name(arg) + if arg_module in func_registry: + return func_registry[arg_module](*args, **kwargs) + raise ValueError(f"{func_name} is not registered for this module.") + + return wrapper + + +# storage specific functions should be registered and called through the registry +registry = AccessRegistry() + + +@registry.register_open("h5py") +def open(filepath: UPath | Path | str): + fs, file_path_str = infer_filesystem(filepath) + if isinstance(fs, LocalFileSystem): + return None, h5py.File(file_path_str, mode="r") + conn = fs.open(file_path_str, mode="rb") + try: + storage = h5py.File(conn, mode="r") + except Exception as e: + conn.close() + raise e + return conn, storage + + +@registry.register("h5py") +def read_dataframe(elem: h5py.Dataset | h5py.Group): + if isinstance(elem, h5py.Dataset): + return read_dataframe_legacy_h5(elem) + else: + return read_elem(elem) + + +@registry.register("h5py") +def safer_read_partial(elem, indices): + is_dataset = isinstance(elem, h5py.Dataset) + indices_inverse: list | None = None + encoding_type = get_spec(elem).encoding_type + # h5py selection for datasets requires sorted indices + if is_dataset or encoding_type == "dataframe": + indices_increasing = [] + indices_inverse = [] + for indices_dim in indices: + # should be integer or bool + # ignore bool or increasing unique integers + if ( + isinstance(indices_dim, np.ndarray) + and indices_dim.dtype != "bool" + and not np.all(np.diff(indices_dim) > 0) + ): + idx_unique, idx_inverse = np.unique(indices_dim, return_inverse=True) + indices_increasing.append(idx_unique) + indices_inverse.append(idx_inverse) + else: + indices_increasing.append(indices_dim) + indices_inverse.append(None) + indices = tuple(indices_increasing) + if all(idx is None for idx in indices_inverse): + indices_inverse = None + result = None + if encoding_type == "": + if is_dataset: + dims = len(elem.shape) + if dims == 2: + result = elem[indices] + elif dims == 1: + if indices[0] == slice(None): + result = elem[indices[1]] + elif indices[1] == slice(None): + result = elem[indices[0]] + elif isinstance(elem, h5py.Group): + try: + ds = CSRDataset(elem) + result = _subset_sparse(ds, indices) + except Exception as e: + logger.debug( + f"Encountered an exception while attempting to subset a sparse dataset by indices.\n{e}" + ) + if result is None: + raise ValueError( + "Can not get a subset of the element of type" + f" {type(elem).__name__} with an empty spec." + ) + else: + result = read_elem_partial(elem, indices=indices) + if indices_inverse is None: + return result + else: + if indices_inverse[0] is None: + if len(result.shape) == 2: + return result[:, indices_inverse[1]] + else: + return result[indices_inverse[1]] + elif indices_inverse[1] is None: + if isinstance(result, pd.DataFrame): + return result.iloc[indices_inverse[0]] + else: + return result[indices_inverse[0]] + else: + return result[tuple(indices_inverse)] + + +@registry.register("h5py") +def keys(storage: h5py.File): + attrs_keys: dict[str, list] = {} + for attr in storage.keys(): + if attr == "X": + continue + attr_obj = storage[attr] + if attr in ("obs", "var") and isinstance(attr_obj, h5py.Dataset): + keys = list(attr_obj.dtype.fields.keys()) + else: + keys = list(attr_obj.keys()) + if len(keys) > 0: + attrs_keys[attr] = keys + return attrs_keys + + +ArrayTypes = [h5py.Dataset] +GroupTypes = [h5py.Group] +StorageTypes = [h5py.File] + + +ZARR_INSTALLED = False +try: + import zarr + + ZARR_INSTALLED = True +except ImportError: + pass + +if ZARR_INSTALLED: + from anndata._io.zarr import read_dataframe_legacy as read_dataframe_legacy_zarr + + ArrayTypes.append(zarr.Array) + GroupTypes.append(zarr.Group) + StorageTypes.append(zarr.Group) + + @registry.register_open("zarr") + def open(filepath: Union[UPath, Path, str]): # noqa + fs, file_path_str = infer_filesystem(filepath) + conn = None + if isinstance(fs, LocalFileSystem): + # this is faster than through an fsspec mapper for local + open_obj = file_path_str + else: + open_obj = create_mapper(fs, file_path_str, check=True) + storage = zarr.open(open_obj, mode="r") + return conn, storage + + @registry.register("zarr") + def read_dataframe(elem: Union[zarr.Array, zarr.Group]): # noqa + if isinstance(elem, zarr.Array): + return read_dataframe_legacy_zarr(elem) + else: + return read_elem(elem) + + @registry.register("zarr") + def safer_read_partial(elem, indices): # noqa + encoding_type = get_spec(elem).encoding_type + if encoding_type == "": + if isinstance(elem, zarr.Array): + dims = len(elem.shape) + if dims == 2: + return elem.oindex[indices] + elif dims == 1: + if indices[0] == slice(None): + return elem.oindex[indices[1]] + elif indices[1] == slice(None): + return elem.oindex[indices[0]] + elif isinstance(elem, zarr.Group): + try: + ds = CSRDataset(elem) + return _subset_sparse(ds, indices) + except Exception as e: + logger.debug( + f"Encountered an exception while attempting to subset a sparse dataset by indices.\n{e}" + ) + raise ValueError( + "Can not get a subset of the element of type" + f" {type(elem).__name__} with an empty spec." + ) + else: + if encoding_type in ("csr_matrix", "csc_matrix"): + ds = sparse_dataset(elem) + return _subset_sparse(ds, indices) + else: + return read_elem_partial(elem, indices=indices) + + # this is needed because accessing zarr.Group.keys() directly is very slow + @registry.register("zarr") + def keys(storage: zarr.Group): # noqa + paths = storage._store.keys() + + attrs_keys: dict[str, list] = {} + obs_var_arrays = [] + + for path in paths: + if path in (".zattrs", ".zgroup"): + continue + parts = path.split("/") + if len(parts) < 2: + continue + attr = parts[0] + key = parts[1] + + if attr == "X": + continue + + if attr in ("obs", "var"): + if attr in obs_var_arrays: + continue + if key == ".zarray": + attrs_keys.pop(attr, None) + obs_var_arrays.append(attr) + + if attr not in attrs_keys: + attrs_keys[attr] = [] + + if key in (".zattrs", ".zgroup", ".zarray"): + continue + attr_keys = attrs_keys[attr] + if key not in attr_keys: + attr_keys.append(key) + + for attr in obs_var_arrays: + attrs_keys[attr] = list(storage[attr].dtype.fields.keys()) + + return {attr: keys for attr, keys in attrs_keys.items() if len(keys) > 0} + + +ArrayTypes = tuple(ArrayTypes) # type: ignore +GroupTypes = tuple(GroupTypes) # type: ignore +StorageTypes = tuple(StorageTypes) # type: ignore + + +ArrayType = Union[ArrayTypes] # type: ignore +GroupType = Union[GroupTypes] # type: ignore +StorageType = Union[StorageTypes] # type: ignore + + +def _to_memory(elem): + if isinstance(elem, ArrayTypes): + return elem[()] + elif isinstance(elem, SparseDataset): + return elem.to_memory() + else: + return elem + + +def _try_backed_full(elem): + # think what to do for compatibility with old var and obs + if isinstance(elem, ArrayTypes): + return elem + + if isinstance(elem, GroupTypes): + encoding_type = get_spec(elem).encoding_type + if encoding_type in ("csr_matrix", "csc_matrix"): + return sparse_dataset(elem) + if "h5sparse_format" in elem.attrs: + return sparse_dataset(elem) + if encoding_type == "" and "indptr" in elem: + return CSRDataset(elem) + + return read_elem(elem) + + +def _safer_read_index(elem): + if isinstance(elem, GroupTypes): + return pd.Index(read_elem(elem[_read_attr(elem.attrs, "_index")])) + elif isinstance(elem, ArrayTypes): + indices = None + for index_name in ("index", "_index"): + if index_name in elem.dtype.names: + indices = elem[index_name] + break + if indices is not None and len(indices) > 0: + if isinstance(indices[0], bytes): + indices = np.frompyfunc(lambda x: x.decode("utf-8"), 1, 1)(indices) + return pd.Index(indices) + else: + raise ValueError("Indices not found.") + else: + raise ValueError(f"Unknown elem type {type(elem)} when reading indices.") + + +class _MapAccessor: + def __init__(self, elem, name, indices=None): + self.elem = elem + self.indices = indices + self.name = name + + def __getitem__(self, key): + if self.indices is None: + return _try_backed_full(self.elem[key]) + else: + return registry.safer_read_partial(self.elem[key], indices=self.indices) + + def keys(self): + return list(self.elem.keys()) + + def __repr__(self): + """Description of the _MapAccessor object.""" + descr = f"Accessor for the AnnData attribute {self.name}" + descr += f"\n with keys: {self.keys()}" + return descr + + +class _AnnDataAttrsMixin: + storage: StorageType + _attrs_keys: Mapping[str, list] + + @cached_property + def obs(self) -> pd.DataFrame: + if "obs" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + if indices is not None: + indices = (indices[0], slice(None)) + obj = registry.safer_read_partial(self.storage["obs"], indices=indices) # type: ignore + return _records_to_df(obj) + else: + return registry.read_dataframe(self.storage["obs"]) # type: ignore + + @cached_property + def var(self) -> pd.DataFrame: + if "var" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + if indices is not None: + indices = (indices[1], slice(None)) + obj = registry.safer_read_partial(self.storage["var"], indices=indices) # type: ignore + return _records_to_df(obj) + else: + return registry.read_dataframe(self.storage["var"]) # type: ignore + + @cached_property + def uns(self): + if "uns" not in self._attrs_keys: + return None + return read_elem(self.storage["uns"]) + + @cached_property + def X(self): + indices = getattr(self, "indices", None) + if indices is not None: + return registry.safer_read_partial(self.storage["X"], indices=indices) + else: + return _try_backed_full(self.storage["X"]) + + @cached_property + def obsm(self): + if "obsm" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + if indices is not None: + indices = (indices[0], slice(None)) + return _MapAccessor(self.storage["obsm"], "obsm", indices) + + @cached_property + def varm(self): + if "varm" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + if indices is not None: + indices = (indices[1], slice(None)) + return _MapAccessor(self.storage["varm"], "varm", indices) + + @cached_property + def obsp(self): + if "obsp" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + if indices is not None: + indices = (indices[0], indices[0]) + return _MapAccessor(self.storage["obsp"], "obsp", indices) + + @cached_property + def varp(self): + if "varp" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + if indices is not None: + indices = (indices[1], indices[1]) + return _MapAccessor(self.storage["varp"], "varp", indices) + + @cached_property + def layers(self): + if "layers" not in self._attrs_keys: + return None + indices = getattr(self, "indices", None) + return _MapAccessor(self.storage["layers"], "layers", indices) + + @property + def obs_names(self): + return self._obs_names + + @property + def var_names(self): + return self._var_names + + @cached_property + def shape(self): + return len(self._obs_names), len(self._var_names) + + def to_dict(self): + prepare_adata = {} + + prepare_adata["X"] = _to_memory(self.X) + + if "uns" in self._attrs_keys: + prepare_adata["uns"] = self.uns + + for attr in ("obs", "var"): + if attr in self._attrs_keys: + prepare_adata[attr] = getattr(self, attr) + + for attr in ("obsm", "varm", "obsp", "varp", "layers"): + if attr in self._attrs_keys: + prepare_adata[attr] = {} + get_attr = getattr(self, attr) + for key in self._attrs_keys[attr]: + prepare_adata[attr][key] = _to_memory(get_attr[key]) + + if "raw" in self._attrs_keys: + prepare_adata["raw"] = self.raw.to_dict() + + return prepare_adata + + def to_memory(self): + adata = AnnData(**self.to_dict()) + return adata + + +class AnnDataAccessorSubset(_AnnDataAttrsMixin): + def __init__(self, storage, indices, attrs_keys, obs_names, var_names, ref_shape): + self.storage = storage + self.indices = indices + + self._attrs_keys = attrs_keys + self._obs_names, self._var_names = obs_names, var_names + + self._ref_shape = ref_shape + + def __getitem__(self, index: Index): + """Access a subset of the underlying AnnData object.""" + oidx, vidx = _normalize_indices(index, self._obs_names, self._var_names) + new_obs_names, new_var_names = self._obs_names[oidx], self._var_names[vidx] + if self.indices is not None: + oidx = _resolve_idx(self.indices[0], oidx, self._ref_shape[0]) + vidx = _resolve_idx(self.indices[1], vidx, self._ref_shape[1]) + return type(self)( + self.storage, + (oidx, vidx), + self._attrs_keys, + new_obs_names, + new_var_names, + self._ref_shape, + ) + + def __repr__(self): + """Description of the object.""" + n_obs, n_vars = self.shape + descr = f"{type(self).__name__} object with n_obs × n_vars = {n_obs} × {n_vars}" + for attr, keys in self._attrs_keys.items(): + descr += f"\n {attr}: {keys}" + return descr + + @cached_property + def raw(self): + if "raw" not in self._attrs_keys: + return None + prepare_indices = None + if self.indices is not None: + oidx = self.indices[0] + if isinstance(oidx, np.ndarray) or oidx != slice(None): + prepare_indices = oidx, slice(None) + return AnnDataRawAccessor( + self.storage["raw"], + prepare_indices, + None, + self._obs_names, + None, + self._ref_shape[0], + ) + + +class AnnDataRawAccessor(AnnDataAccessorSubset): + def __init__( + self, storage_raw, indices, attrs_keys, obs_names, var_names, ref_shape + ): + var_raw = storage_raw["var"] + + if var_names is None: + var_names = _safer_read_index(var_raw) + + if isinstance(ref_shape, int): + ref_shape = ref_shape, len(var_names) + elif isinstance(ref_shape, tuple) and len(ref_shape) < 2: + ref_shape = ref_shape[0], len(var_names) + + if attrs_keys is None: + attrs_keys = {} + if isinstance(var_raw, ArrayTypes): + attrs_keys["var"] = list(var_raw.dtype.fields.keys()) + else: + # for some reason list(var_raw.keys()) is very slow for zarr + # maybe also directly get keys from the underlying mapper + attrs_keys["var"] = list(var_raw) + if "varm" in storage_raw: + varm_keys_raw = list(storage_raw["varm"]) + if len(varm_keys_raw) > 0: + attrs_keys["varm"] = varm_keys_raw + + super().__init__( + storage_raw, indices, attrs_keys, obs_names, var_names, ref_shape + ) + + @property + def raw(self): + raise AttributeError + + +class AnnDataAccessor(_AnnDataAttrsMixin): + """Cloud-backed AnnData.""" + + def __init__( + self, + connection: OpenFile | None, + storage: StorageType, + filename: str, + ): + self._conn = connection + self.storage = storage + + self._attrs_keys = registry.keys(self.storage) + + self._name = filename + + self._obs_names = _safer_read_index(self.storage["obs"]) # type: ignore + self._var_names = _safer_read_index(self.storage["var"]) # type: ignore + + self._closed = False + + def close(self): + """Closes the connection.""" + if hasattr(self, "storage") and hasattr(self.storage, "close"): + self.storage.close() + if hasattr(self, "_conn") and hasattr(self._conn, "close"): + self._conn.close() + self._closed = True + + @property + def closed(self): + return self._closed + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def __getitem__(self, index: Index) -> AnnDataAccessorSubset: + """Access a subset of the underlying AnnData object.""" + oidx, vidx = _normalize_indices(index, self._obs_names, self._var_names) + new_obs_names, new_var_names = self._obs_names[oidx], self._var_names[vidx] + return AnnDataAccessorSubset( + self.storage, + (oidx, vidx), + self._attrs_keys, + new_obs_names, + new_var_names, + self.shape, + ) + + def __repr__(self): + """Description of the AnnDataAccessor object.""" + n_obs, n_vars = self.shape + descr = f"AnnDataAccessor object with n_obs × n_vars = {n_obs} × {n_vars}" + descr += f"\n constructed for the AnnData object {self._name}" + for attr, keys in self._attrs_keys.items(): + descr += f"\n {attr}: {keys}" + return descr + + @cached_property + def raw(self): + if "raw" not in self._attrs_keys: + return None + return AnnDataRawAccessor( + self.storage["raw"], None, None, self._obs_names, None, self.shape[0] + ) diff --git a/lamindb/core/storage/_backed_access.py b/lamindb/core/storage/_backed_access.py index 39e32f7cd..6cdfe791b 100644 --- a/lamindb/core/storage/_backed_access.py +++ b/lamindb/core/storage/_backed_access.py @@ -1,28 +1,13 @@ from __future__ import annotations -import inspect from dataclasses import dataclass -from functools import cached_property -from itertools import chain -from typing import TYPE_CHECKING, Callable, Mapping, Union +from typing import TYPE_CHECKING -import h5py -import numpy as np -import pandas as pd -from anndata import AnnData -from anndata import __version__ as anndata_version -from anndata._core.index import Index, _normalize_indices -from anndata._core.views import _resolve_idx -from anndata._io.h5ad import read_dataframe_legacy as read_dataframe_legacy_h5 -from anndata._io.specs.registry import get_spec, read_elem, read_elem_partial -from anndata.compat import _read_attr -from fsspec.implementations.local import LocalFileSystem -from lamin_utils import logger -from lamindb_setup.core.upath import UPath, create_mapper, infer_filesystem +from anndata._io.specs.registry import get_spec from lnschema_core import Artifact -from packaging import version -from lamindb.core.storage.paths import filepath_from_artifact +from ._anndata_accessor import AnnDataAccessor, StorageType, registry +from .paths import filepath_from_artifact if TYPE_CHECKING: from pathlib import Path @@ -31,702 +16,6 @@ from tiledbsoma import Collection as SOMACollection from tiledbsoma import Experiment as SOMAExperiment -anndata_version_parse = version.parse(anndata_version) - -if anndata_version_parse < version.parse("0.10.0"): - if anndata_version_parse < version.parse("0.9.1"): - logger.warning( - "Full backed capabilities are not available for this version of anndata," - " please install anndata>=0.9.1." - ) - - from anndata._core.sparse_dataset import SparseDataset - - # try csr for groups with no encoding_type - class CSRDataset(SparseDataset): - @property - def format_str(self) -> str: - return "csr" - - def sparse_dataset(group): - return SparseDataset(group) - -else: - from anndata._core.sparse_dataset import ( - BaseCompressedSparseDataset as SparseDataset, - ) - from anndata._core.sparse_dataset import ( # type: ignore - CSRDataset, - sparse_dataset, - ) - - def _check_group_format(*args): - pass - - CSRDataset._check_group_format = _check_group_format - - -# zarr and CSRDataset have problems with full selection -def _subset_sparse(sparse_ds: CSRDataset | SparseDataset, indices): - has_arrays = isinstance(indices[0], np.ndarray) or isinstance( - indices[1], np.ndarray - ) - if not has_arrays and indices == (slice(None), slice(None)): - return sparse_ds.to_memory() - else: - return sparse_ds[indices] - - -def get_module_name(obj): - return inspect.getmodule(obj).__name__.partition(".")[0] - - -def _records_to_df(obj): - if isinstance(obj, pd.DataFrame): - return obj - - if hasattr(obj, "dtype") and obj.dtype.names is not None: - formats = [] - for name, (dt, _) in obj.dtype.fields.items(): - if dt.char == "S": - new_dt = str(dt).replace("S", "U") - else: - new_dt = dt - formats.append((name, new_dt)) - df = pd.DataFrame(obj.astype(formats, copy=False)) - for index_name in ("index", "_index"): - if index_name in df.columns: - return df.set_index(index_name) - return df - else: - return obj - - -class AccessRecord: - def __init__(self): - self._registry = {} - self._openers = {} - - def register_open(self, module: str): - def wrapper(func: Callable): - self._openers[module] = func - return func - - return wrapper - - def open(self, module: str, *args, **kwargs): - if module in self._openers: - return self._openers[module](*args, **kwargs) - else: - raise ValueError(f"Module {module} not found, please install it.") - - def register(self, module: str): - def wrapper(func: Callable): - func_name = func.__name__ - if func_name not in self._registry: - self._registry[func_name] = {} - self._registry[func_name][module] = func - return func - - return wrapper - - def __getattr__(self, func_name: str): - def wrapper(*args, **kwargs): - func_registry = self._registry[func_name] - for arg in chain(args, kwargs.values()): - arg_module = get_module_name(arg) - if arg_module in func_registry: - return func_registry[arg_module](*args, **kwargs) - raise ValueError(f"{func_name} is not registered for this module.") - - return wrapper - - -# storage specific functions should be registered and called through the registry -registry = AccessRecord() - - -@registry.register_open("h5py") -def open(filepath: UPath | Path | str): - fs, file_path_str = infer_filesystem(filepath) - if isinstance(fs, LocalFileSystem): - return None, h5py.File(file_path_str, mode="r") - conn = fs.open(file_path_str, mode="rb") - try: - storage = h5py.File(conn, mode="r") - except Exception as e: - conn.close() - raise e - return conn, storage - - -@registry.register("h5py") -def read_dataframe(elem: h5py.Dataset | h5py.Group): - if isinstance(elem, h5py.Dataset): - return read_dataframe_legacy_h5(elem) - else: - return read_elem(elem) - - -@registry.register("h5py") -def safer_read_partial(elem, indices): - is_dataset = isinstance(elem, h5py.Dataset) - indices_inverse: list | None = None - encoding_type = get_spec(elem).encoding_type - # h5py selection for datasets requires sorted indices - if is_dataset or encoding_type == "dataframe": - indices_increasing = [] - indices_inverse = [] - for indices_dim in indices: - # should be integer or bool - # ignore bool or increasing unique integers - if ( - isinstance(indices_dim, np.ndarray) - and indices_dim.dtype != "bool" - and not np.all(np.diff(indices_dim) > 0) - ): - idx_unique, idx_inverse = np.unique(indices_dim, return_inverse=True) - indices_increasing.append(idx_unique) - indices_inverse.append(idx_inverse) - else: - indices_increasing.append(indices_dim) - indices_inverse.append(None) - indices = tuple(indices_increasing) - if all(idx is None for idx in indices_inverse): - indices_inverse = None - result = None - if encoding_type == "": - if is_dataset: - dims = len(elem.shape) - if dims == 2: - result = elem[indices] - elif dims == 1: - if indices[0] == slice(None): - result = elem[indices[1]] - elif indices[1] == slice(None): - result = elem[indices[0]] - elif isinstance(elem, h5py.Group): - try: - ds = CSRDataset(elem) - result = _subset_sparse(ds, indices) - except Exception as e: - logger.debug( - f"Encountered an exception while attempting to subset a sparse dataset by indices.\n{e}" - ) - if result is None: - raise ValueError( - "Can not get a subset of the element of type" - f" {type(elem).__name__} with an empty spec." - ) - else: - result = read_elem_partial(elem, indices=indices) - if indices_inverse is None: - return result - else: - if indices_inverse[0] is None: - if len(result.shape) == 2: - return result[:, indices_inverse[1]] - else: - return result[indices_inverse[1]] - elif indices_inverse[1] is None: - if isinstance(result, pd.DataFrame): - return result.iloc[indices_inverse[0]] - else: - return result[indices_inverse[0]] - else: - return result[tuple(indices_inverse)] - - -@registry.register("h5py") -def keys(storage: h5py.File): - attrs_keys: dict[str, list] = {} - for attr in storage.keys(): - if attr == "X": - continue - attr_obj = storage[attr] - if attr in ("obs", "var") and isinstance(attr_obj, h5py.Dataset): - keys = list(attr_obj.dtype.fields.keys()) - else: - keys = list(attr_obj.keys()) - if len(keys) > 0: - attrs_keys[attr] = keys - return attrs_keys - - -ArrayTypes = [h5py.Dataset] -GroupTypes = [h5py.Group] -StorageTypes = [h5py.File] - - -ZARR_INSTALLED = False -try: - import zarr - - ZARR_INSTALLED = True -except ImportError: - pass - -if ZARR_INSTALLED: - from anndata._io.zarr import read_dataframe_legacy as read_dataframe_legacy_zarr - - ArrayTypes.append(zarr.Array) - GroupTypes.append(zarr.Group) - StorageTypes.append(zarr.Group) - - @registry.register_open("zarr") - def open(filepath: Union[UPath, Path, str]): # noqa - fs, file_path_str = infer_filesystem(filepath) - conn = None - if isinstance(fs, LocalFileSystem): - # this is faster than through an fsspec mapper for local - open_obj = file_path_str - else: - open_obj = create_mapper(fs, file_path_str, check=True) - storage = zarr.open(open_obj, mode="r") - return conn, storage - - @registry.register("zarr") - def read_dataframe(elem: Union[zarr.Array, zarr.Group]): # noqa - if isinstance(elem, zarr.Array): - return read_dataframe_legacy_zarr(elem) - else: - return read_elem(elem) - - @registry.register("zarr") - def safer_read_partial(elem, indices): - encoding_type = get_spec(elem).encoding_type - if encoding_type == "": - if isinstance(elem, zarr.Array): - dims = len(elem.shape) - if dims == 2: - return elem.oindex[indices] - elif dims == 1: - if indices[0] == slice(None): - return elem.oindex[indices[1]] - elif indices[1] == slice(None): - return elem.oindex[indices[0]] - elif isinstance(elem, zarr.Group): - try: - ds = CSRDataset(elem) - return _subset_sparse(ds, indices) - except Exception as e: - logger.debug( - f"Encountered an exception while attempting to subset a sparse dataset by indices.\n{e}" - ) - raise ValueError( - "Can not get a subset of the element of type" - f" {type(elem).__name__} with an empty spec." - ) - else: - if encoding_type in ("csr_matrix", "csc_matrix"): - ds = sparse_dataset(elem) - return _subset_sparse(ds, indices) - else: - return read_elem_partial(elem, indices=indices) - - # this is needed because accessing zarr.Group.keys() directly is very slow - @registry.register("zarr") - def keys(storage: zarr.Group): - paths = storage._store.keys() - - attrs_keys: dict[str, list] = {} - obs_var_arrays = [] - - for path in paths: - if path in (".zattrs", ".zgroup"): - continue - parts = path.split("/") - if len(parts) < 2: - continue - attr = parts[0] - key = parts[1] - - if attr == "X": - continue - - if attr in ("obs", "var"): - if attr in obs_var_arrays: - continue - if key == ".zarray": - attrs_keys.pop(attr, None) - obs_var_arrays.append(attr) - - if attr not in attrs_keys: - attrs_keys[attr] = [] - - if key in (".zattrs", ".zgroup", ".zarray"): - continue - attr_keys = attrs_keys[attr] - if key not in attr_keys: - attr_keys.append(key) - - for attr in obs_var_arrays: - attrs_keys[attr] = list(storage[attr].dtype.fields.keys()) - - return {attr: keys for attr, keys in attrs_keys.items() if len(keys) > 0} - - -ArrayTypes = tuple(ArrayTypes) # type: ignore -GroupTypes = tuple(GroupTypes) # type: ignore -StorageTypes = tuple(StorageTypes) # type: ignore - - -ArrayType = Union[ArrayTypes] # type: ignore -GroupType = Union[GroupTypes] # type: ignore -StorageType = Union[StorageTypes] # type: ignore - - -def _to_memory(elem): - if isinstance(elem, ArrayTypes): - return elem[()] - elif isinstance(elem, SparseDataset): - return elem.to_memory() - else: - return elem - - -def _try_backed_full(elem): - # think what to do for compatibility with old var and obs - if isinstance(elem, ArrayTypes): - return elem - - if isinstance(elem, GroupTypes): - encoding_type = get_spec(elem).encoding_type - if encoding_type in ("csr_matrix", "csc_matrix"): - return sparse_dataset(elem) - if "h5sparse_format" in elem.attrs: - return sparse_dataset(elem) - if encoding_type == "" and "indptr" in elem: - return CSRDataset(elem) - - return read_elem(elem) - - -def _safer_read_index(elem): - if isinstance(elem, GroupTypes): - return pd.Index(read_elem(elem[_read_attr(elem.attrs, "_index")])) - elif isinstance(elem, ArrayTypes): - indices = None - for index_name in ("index", "_index"): - if index_name in elem.dtype.names: - indices = elem[index_name] - break - if indices is not None and len(indices) > 0: - if isinstance(indices[0], bytes): - indices = np.frompyfunc(lambda x: x.decode("utf-8"), 1, 1)(indices) - return pd.Index(indices) - else: - raise ValueError("Indices not found.") - else: - raise ValueError(f"Unknown elem type {type(elem)} when reading indices.") - - -class _MapAccessor: - def __init__(self, elem, name, indices=None): - self.elem = elem - self.indices = indices - self.name = name - - def __getitem__(self, key): - if self.indices is None: - return _try_backed_full(self.elem[key]) - else: - return registry.safer_read_partial(self.elem[key], indices=self.indices) - - def keys(self): - return list(self.elem.keys()) - - def __repr__(self): - """Description of the _MapAccessor object.""" - descr = f"Accessor for the AnnData attribute {self.name}" - descr += f"\n with keys: {self.keys()}" - return descr - - -class _AnnDataAttrsMixin: - storage: StorageType - _attrs_keys: Mapping[str, list] - - @cached_property - def obs(self) -> pd.DataFrame: - if "obs" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - if indices is not None: - indices = (indices[0], slice(None)) - obj = registry.safer_read_partial(self.storage["obs"], indices=indices) # type: ignore - return _records_to_df(obj) - else: - return registry.read_dataframe(self.storage["obs"]) # type: ignore - - @cached_property - def var(self) -> pd.DataFrame: - if "var" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - if indices is not None: - indices = (indices[1], slice(None)) - obj = registry.safer_read_partial(self.storage["var"], indices=indices) # type: ignore - return _records_to_df(obj) - else: - return registry.read_dataframe(self.storage["var"]) # type: ignore - - @cached_property - def uns(self): - if "uns" not in self._attrs_keys: - return None - return read_elem(self.storage["uns"]) - - @cached_property - def X(self): - indices = getattr(self, "indices", None) - if indices is not None: - return registry.safer_read_partial(self.storage["X"], indices=indices) - else: - return _try_backed_full(self.storage["X"]) - - @cached_property - def obsm(self): - if "obsm" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - if indices is not None: - indices = (indices[0], slice(None)) - return _MapAccessor(self.storage["obsm"], "obsm", indices) - - @cached_property - def varm(self): - if "varm" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - if indices is not None: - indices = (indices[1], slice(None)) - return _MapAccessor(self.storage["varm"], "varm", indices) - - @cached_property - def obsp(self): - if "obsp" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - if indices is not None: - indices = (indices[0], indices[0]) - return _MapAccessor(self.storage["obsp"], "obsp", indices) - - @cached_property - def varp(self): - if "varp" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - if indices is not None: - indices = (indices[1], indices[1]) - return _MapAccessor(self.storage["varp"], "varp", indices) - - @cached_property - def layers(self): - if "layers" not in self._attrs_keys: - return None - indices = getattr(self, "indices", None) - return _MapAccessor(self.storage["layers"], "layers", indices) - - @property - def obs_names(self): - return self._obs_names - - @property - def var_names(self): - return self._var_names - - @cached_property - def shape(self): - return len(self._obs_names), len(self._var_names) - - def to_dict(self): - prepare_adata = {} - - prepare_adata["X"] = _to_memory(self.X) - - if "uns" in self._attrs_keys: - prepare_adata["uns"] = self.uns - - for attr in ("obs", "var"): - if attr in self._attrs_keys: - prepare_adata[attr] = getattr(self, attr) - - for attr in ("obsm", "varm", "obsp", "varp", "layers"): - if attr in self._attrs_keys: - prepare_adata[attr] = {} - get_attr = getattr(self, attr) - for key in self._attrs_keys[attr]: - prepare_adata[attr][key] = _to_memory(get_attr[key]) - - if "raw" in self._attrs_keys: - prepare_adata["raw"] = self.raw.to_dict() - - return prepare_adata - - def to_memory(self): - adata = AnnData(**self.to_dict()) - return adata - - -class AnnDataAccessorSubset(_AnnDataAttrsMixin): - def __init__(self, storage, indices, attrs_keys, obs_names, var_names, ref_shape): - self.storage = storage - self.indices = indices - - self._attrs_keys = attrs_keys - self._obs_names, self._var_names = obs_names, var_names - - self._ref_shape = ref_shape - - def __getitem__(self, index: Index): - """Access a subset of the underlying AnnData object.""" - oidx, vidx = _normalize_indices(index, self._obs_names, self._var_names) - new_obs_names, new_var_names = self._obs_names[oidx], self._var_names[vidx] - if self.indices is not None: - oidx = _resolve_idx(self.indices[0], oidx, self._ref_shape[0]) - vidx = _resolve_idx(self.indices[1], vidx, self._ref_shape[1]) - return type(self)( - self.storage, - (oidx, vidx), - self._attrs_keys, - new_obs_names, - new_var_names, - self._ref_shape, - ) - - def __repr__(self): - """Description of the object.""" - n_obs, n_vars = self.shape - descr = f"{type(self).__name__} object with n_obs × n_vars = {n_obs} × {n_vars}" - for attr, keys in self._attrs_keys.items(): - descr += f"\n {attr}: {keys}" - return descr - - @cached_property - def raw(self): - if "raw" not in self._attrs_keys: - return None - prepare_indices = None - if self.indices is not None: - oidx = self.indices[0] - if isinstance(oidx, np.ndarray) or oidx != slice(None): - prepare_indices = oidx, slice(None) - return AnnDataRawAccessor( - self.storage["raw"], - prepare_indices, - None, - self._obs_names, - None, - self._ref_shape[0], - ) - - -class AnnDataRawAccessor(AnnDataAccessorSubset): - def __init__( - self, storage_raw, indices, attrs_keys, obs_names, var_names, ref_shape - ): - var_raw = storage_raw["var"] - - if var_names is None: - var_names = _safer_read_index(var_raw) - - if isinstance(ref_shape, int): - ref_shape = ref_shape, len(var_names) - elif isinstance(ref_shape, tuple) and len(ref_shape) < 2: - ref_shape = ref_shape[0], len(var_names) - - if attrs_keys is None: - attrs_keys = {} - if isinstance(var_raw, ArrayTypes): - attrs_keys["var"] = list(var_raw.dtype.fields.keys()) - else: - # for some reason list(var_raw.keys()) is very slow for zarr - # maybe also directly get keys from the underlying mapper - attrs_keys["var"] = list(var_raw) - if "varm" in storage_raw: - varm_keys_raw = list(storage_raw["varm"]) - if len(varm_keys_raw) > 0: - attrs_keys["varm"] = varm_keys_raw - - super().__init__( - storage_raw, indices, attrs_keys, obs_names, var_names, ref_shape - ) - - @property - def raw(self): - raise AttributeError - - -class AnnDataAccessor(_AnnDataAttrsMixin): - """Cloud-backed AnnData.""" - - def __init__( - self, - connection: OpenFile | None, - storage: StorageType, - filename: str, - ): - self._conn = connection - self.storage = storage - - self._attrs_keys = registry.keys(self.storage) - - self._name = filename - - self._obs_names = _safer_read_index(self.storage["obs"]) # type: ignore - self._var_names = _safer_read_index(self.storage["var"]) # type: ignore - - self._closed = False - - def close(self): - """Closes the connection.""" - if hasattr(self, "storage") and hasattr(self.storage, "close"): - self.storage.close() - if hasattr(self, "_conn") and hasattr(self._conn, "close"): - self._conn.close() - self._closed = True - - @property - def closed(self): - return self._closed - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def __getitem__(self, index: Index) -> AnnDataAccessorSubset: - """Access a subset of the underlying AnnData object.""" - oidx, vidx = _normalize_indices(index, self._obs_names, self._var_names) - new_obs_names, new_var_names = self._obs_names[oidx], self._var_names[vidx] - return AnnDataAccessorSubset( - self.storage, - (oidx, vidx), - self._attrs_keys, - new_obs_names, - new_var_names, - self.shape, - ) - - def __repr__(self): - """Description of the AnnDataAccessor object.""" - n_obs, n_vars = self.shape - descr = f"AnnDataAccessor object with n_obs × n_vars = {n_obs} × {n_vars}" - descr += f"\n constructed for the AnnData object {self._name}" - for attr, keys in self._attrs_keys.items(): - descr += f"\n {attr}: {keys}" - return descr - - @cached_property - def raw(self): - if "raw" not in self._attrs_keys: - return None - return AnnDataRawAccessor( - self.storage["raw"], None, None, self._obs_names, None, self.shape[0] - ) - @dataclass class BackedAccessor: From bfe55e8aea67b3eb6b0d544125c261c424ae89d8 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 26 Jul 2024 15:09:14 +0200 Subject: [PATCH 02/22] Add mode to open --- lamindb/_artifact.py | 6 +++--- lamindb/core/_feature_manager.py | 2 +- lamindb/core/storage/_anndata_accessor.py | 26 +++++++++++++++++------ lamindb/core/storage/_backed_access.py | 8 +++---- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 8530cc2ed..60398e915 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -869,7 +869,7 @@ def backed( # docstring handled through attach_func_to_class_method def open( - self, is_run_input: bool | None = None + self, mode: str = "r", is_run_input: bool | None = None ) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment: # ignore empty suffix for now suffixes = (".h5", ".hdf5", ".h5ad", ".zarr", ".tiledbsoma", "") @@ -888,9 +888,9 @@ def open( # consider the case where an object is already locally cached localpath = setup_settings.instance.storage.cloud_to_local_no_update(filepath) if localpath.exists(): - return backed_access(localpath, using_key) + return backed_access(localpath, mode, using_key) else: - return backed_access(filepath, using_key) + return backed_access(filepath, mode, using_key) # docstring handled through attach_func_to_class_method diff --git a/lamindb/core/_feature_manager.py b/lamindb/core/_feature_manager.py index 710235814..255cf82c2 100644 --- a/lamindb/core/_feature_manager.py +++ b/lamindb/core/_feature_manager.py @@ -241,7 +241,7 @@ def parse_feature_sets_from_anndata( from lamindb.core.storage._backed_access import backed_access using_key = settings._using_key - data_parse = backed_access(filepath, using_key) + data_parse = backed_access(filepath, using_key=using_key) else: data_parse = ad.read_h5ad(filepath, backed="r") type = "float" diff --git a/lamindb/core/storage/_anndata_accessor.py b/lamindb/core/storage/_anndata_accessor.py index b09f0b45c..07a6bff0b 100644 --- a/lamindb/core/storage/_anndata_accessor.py +++ b/lamindb/core/storage/_anndata_accessor.py @@ -3,7 +3,7 @@ import inspect from functools import cached_property from itertools import chain -from typing import TYPE_CHECKING, Callable, Mapping, Union +from typing import TYPE_CHECKING, Callable, Literal, Mapping, Union import h5py import numpy as np @@ -24,6 +24,7 @@ from pathlib import Path from fsspec.core import OpenFile + from lamindb_setup.core.types import UPathStr anndata_version_parse = version.parse(anndata_version) @@ -142,13 +143,22 @@ def wrapper(*args, **kwargs): @registry.register_open("h5py") -def open(filepath: UPath | Path | str): +def open(filepath: UPathStr, mode: str = "r"): fs, file_path_str = infer_filesystem(filepath) if isinstance(fs, LocalFileSystem): - return None, h5py.File(file_path_str, mode="r") - conn = fs.open(file_path_str, mode="rb") + assert mode in {"r", "r+", "a", "w", "w-"}, f"Unknown mode {mode}!" # noqa: S101 + return None, h5py.File(file_path_str, mode=mode) + if mode == "r": + conn_mode = "rb" + elif mode == "w": + conn_mode = "wb" + elif mode == "a": + conn_mode = "ab" + else: + raise ValueError(f"Unknown mode {mode}! Should be 'r', 'w' or 'a'.") + conn = fs.open(file_path_str, mode=conn_mode) try: - storage = h5py.File(conn, mode="r") + storage = h5py.File(conn, mode=mode) except Exception as e: conn.close() raise e @@ -269,7 +279,9 @@ def keys(storage: h5py.File): StorageTypes.append(zarr.Group) @registry.register_open("zarr") - def open(filepath: Union[UPath, Path, str]): # noqa + def open(filepath: UPathStr, mode: Literal["r", "r+", "a", "w", "w-"] = "r"): # noqa + assert mode in {"r", "r+", "a", "w", "w-"}, f"Unknown mode {mode}!" # noqa: S101 + fs, file_path_str = infer_filesystem(filepath) conn = None if isinstance(fs, LocalFileSystem): @@ -277,7 +289,7 @@ def open(filepath: Union[UPath, Path, str]): # noqa open_obj = file_path_str else: open_obj = create_mapper(fs, file_path_str, check=True) - storage = zarr.open(open_obj, mode="r") + storage = zarr.open(open_obj, mode=mode) return conn, storage @registry.register("zarr") diff --git a/lamindb/core/storage/_backed_access.py b/lamindb/core/storage/_backed_access.py index 6cdfe791b..af90016bb 100644 --- a/lamindb/core/storage/_backed_access.py +++ b/lamindb/core/storage/_backed_access.py @@ -28,7 +28,7 @@ class BackedAccessor: def backed_access( - artifact_or_filepath: Artifact | Path, using_key: str | None = None + artifact_or_filepath: Artifact | Path, mode: str = "r", using_key: str | None = None ) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment: if isinstance(artifact_or_filepath, Artifact): filepath = filepath_from_artifact(artifact_or_filepath, using_key=using_key) @@ -73,11 +73,11 @@ def backed_access( SOMAType = soma.Experiment else: SOMAType = soma.Collection - return SOMAType.open(filepath_str, context=ctx) + return SOMAType.open(filepath_str, mode=mode, context=ctx) elif suffix in {".h5", ".hdf5", ".h5ad"}: - conn, storage = registry.open("h5py", filepath) + conn, storage = registry.open("h5py", filepath, mode=mode) elif suffix == ".zarr": - conn, storage = registry.open("zarr", filepath) + conn, storage = registry.open("zarr", filepath, mode=mode) else: raise ValueError( "object should have .h5, .hdf5, .h5ad, .zarr, .tiledbsoma suffix, not" From 72b12d3c938104ff763079411d9d8879c5177e90 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 26 Jul 2024 15:17:36 +0200 Subject: [PATCH 03/22] fix backed backward comp --- lamindb/_artifact.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 60398e915..77524a8e0 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -861,10 +861,10 @@ def replace( # deprecated def backed( - self, is_run_input: bool | None = None + self, mode: str = "r", is_run_input: bool | None = None ) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment: logger.warning("`.backed()` is deprecated, use `.open()`!'") - return self.open(is_run_input) + return self.open(mode, is_run_input) # docstring handled through attach_func_to_class_method From bf9269228b8724f7aa81b4c4935550c1d76957b2 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Mon, 29 Jul 2024 18:22:51 +0200 Subject: [PATCH 04/22] refactor tiledbsoma open --- lamindb/core/storage/_backed_access.py | 91 +++++++++++++------------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/lamindb/core/storage/_backed_access.py b/lamindb/core/storage/_backed_access.py index af90016bb..461da9599 100644 --- a/lamindb/core/storage/_backed_access.py +++ b/lamindb/core/storage/_backed_access.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from anndata._io.specs.registry import get_spec from lnschema_core import Artifact @@ -10,11 +10,47 @@ from .paths import filepath_from_artifact if TYPE_CHECKING: - from pathlib import Path - from fsspec.core import OpenFile from tiledbsoma import Collection as SOMACollection from tiledbsoma import Experiment as SOMAExperiment + from upath import UPath + + +def _open_tiledbsoma( + filepath: UPath, mode: Literal["r", "w"] = "r" +) -> SOMACollection | SOMAExperiment: + try: + import tiledbsoma as soma + except ImportError as e: + raise ImportError("Please install tiledbsoma: pip install tiledbsoma") from e + filepath_str = filepath.as_posix() + if filepath.protocol == "s3": + from lamindb_setup.core._settings_storage import get_storage_region + + region = get_storage_region(filepath_str) + tiledb_config = {"vfs.s3.region": region} + storage_options = filepath.storage_options + if "key" in storage_options: + tiledb_config["vfs.s3.aws_access_key_id"] = storage_options["key"] + if "secret" in storage_options: + tiledb_config["vfs.s3.aws_secret_access_key"] = storage_options["secret"] + if "token" in storage_options: + tiledb_config["vfs.s3.aws_session_token"] = storage_options["token"] + ctx = soma.SOMATileDBContext(tiledb_config=tiledb_config) + # this is a strange bug + # for some reason iterdir futher gives incorrect results + # if cache is not invalidated + # instead of obs and ms it gives ms and ms in the list of names + filepath.fs.invalidate_cache() + else: + ctx = None + + soma_objects = [obj.name for obj in filepath.iterdir()] + if "obs" in soma_objects and "ms" in soma_objects: + SOMAType = soma.Experiment + else: + SOMAType = soma.Collection + return SOMAType.open(filepath_str, mode=mode, context=ctx) @dataclass @@ -28,7 +64,9 @@ class BackedAccessor: def backed_access( - artifact_or_filepath: Artifact | Path, mode: str = "r", using_key: str | None = None + artifact_or_filepath: Artifact | UPath, + mode: str = "r", + using_key: str | None = None, ) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment: if isinstance(artifact_or_filepath, Artifact): filepath = filepath_from_artifact(artifact_or_filepath, using_key=using_key) @@ -38,42 +76,7 @@ def backed_access( suffix = filepath.suffix if name == "soma" or suffix == ".tiledbsoma": - try: - import tiledbsoma as soma - except ImportError as e: - raise ImportError( - "Please install tiledbsoma: pip install tiledbsoma" - ) from e - filepath_str = filepath.as_posix() - if filepath.protocol == "s3": - from lamindb_setup.core._settings_storage import get_storage_region - - region = get_storage_region(filepath_str) - tiledb_config = {"vfs.s3.region": region} - storage_options = filepath.storage_options - if "key" in storage_options: - tiledb_config["vfs.s3.aws_access_key_id"] = storage_options["key"] - if "secret" in storage_options: - tiledb_config["vfs.s3.aws_secret_access_key"] = storage_options[ - "secret" - ] - if "token" in storage_options: - tiledb_config["vfs.s3.aws_session_token"] = storage_options["token"] - ctx = soma.SOMATileDBContext(tiledb_config=tiledb_config) - # this is a strange bug - # for some reason iterdir futher gives incorrect results - # if cache is not invalidated - # instead of obs and ms it gives ms and ms in the list of names - filepath.fs.invalidate_cache() - else: - ctx = None - - soma_objects = [obj.name for obj in filepath.iterdir()] - if "obs" in soma_objects and "ms" in soma_objects: - SOMAType = soma.Experiment - else: - SOMAType = soma.Collection - return SOMAType.open(filepath_str, mode=mode, context=ctx) + return _open_tiledbsoma(filepath, mode=mode) # type: ignore elif suffix in {".h5", ".hdf5", ".h5ad"}: conn, storage = registry.open("h5py", filepath, mode=mode) elif suffix == ".zarr": @@ -84,10 +87,8 @@ def backed_access( f" {suffix}." ) - if suffix == ".h5ad": + is_anndata = suffix == ".h5ad" or get_spec(storage).encoding_type == "anndata" + if is_anndata: return AnnDataAccessor(conn, storage, name) else: - if get_spec(storage).encoding_type == "anndata": - return AnnDataAccessor(conn, storage, name) - else: - return BackedAccessor(conn, storage) + return BackedAccessor(conn, storage) From 1a9f157a8a7466d53eebca8ec8071e46203be7b1 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Mon, 29 Jul 2024 21:37:37 +0200 Subject: [PATCH 05/22] check modes in backed and reorder track_run_input --- lamindb/_artifact.py | 23 ++++++++++++++--------- lamindb/_collection.py | 2 +- lamindb/core/storage/_backed_access.py | 4 ++++ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 77524a8e0..f0c5536da 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -882,26 +882,31 @@ def open( from lamindb.core.storage._backed_access import backed_access - _track_run_input(self, is_run_input) using_key = settings._using_key filepath = filepath_from_artifact(self, using_key=using_key) # consider the case where an object is already locally cached localpath = setup_settings.instance.storage.cloud_to_local_no_update(filepath) if localpath.exists(): - return backed_access(localpath, mode, using_key) + access = backed_access(localpath, mode, using_key) else: - return backed_access(filepath, mode, using_key) + access = backed_access(filepath, mode, using_key) + # only call if open is successfull + _track_run_input(self, is_run_input) + return access # docstring handled through attach_func_to_class_method def load(self, is_run_input: bool | None = None, stream: bool = False, **kwargs) -> Any: - _track_run_input(self, is_run_input) if hasattr(self, "_memory_rep") and self._memory_rep is not None: - return self._memory_rep - using_key = settings._using_key - return load_to_memory( - filepath_from_artifact(self, using_key=using_key), stream=stream, **kwargs - ) + access_memory = self._memory_rep + else: + using_key = settings._using_key + access_memory = load_to_memory( + filepath_from_artifact(self, using_key=using_key), stream=stream, **kwargs + ) + # only call if load is successfull + _track_run_input(self, is_run_input) + return access_memory # docstring handled through attach_func_to_class_method diff --git a/lamindb/_collection.py b/lamindb/_collection.py index bc1c2b37d..33d00b0f5 100644 --- a/lamindb/_collection.py +++ b/lamindb/_collection.py @@ -271,10 +271,10 @@ def mapped( # docstring handled through attach_func_to_class_method def cache(self, is_run_input: bool | None = None) -> list[UPath]: - _track_run_input(self, is_run_input) path_list = [] for artifact in self.ordered_artifacts.all(): path_list.append(artifact.cache()) + _track_run_input(self, is_run_input) return path_list diff --git a/lamindb/core/storage/_backed_access.py b/lamindb/core/storage/_backed_access.py index 461da9599..896f65efe 100644 --- a/lamindb/core/storage/_backed_access.py +++ b/lamindb/core/storage/_backed_access.py @@ -76,6 +76,8 @@ def backed_access( suffix = filepath.suffix if name == "soma" or suffix == ".tiledbsoma": + if mode not in {"r", "w"}: + raise ValueError("`mode` should be either 'r' or 'w' for tiledbsoma.") return _open_tiledbsoma(filepath, mode=mode) # type: ignore elif suffix in {".h5", ".hdf5", ".h5ad"}: conn, storage = registry.open("h5py", filepath, mode=mode) @@ -89,6 +91,8 @@ def backed_access( is_anndata = suffix == ".h5ad" or get_spec(storage).encoding_type == "anndata" if is_anndata: + if mode != "r": + raise ValueError("Can only access `AnnData` with mode='r'.") return AnnDataAccessor(conn, storage, name) else: return BackedAccessor(conn, storage) From 304ed89cf436c2d598079a8fe47aa959702f2466 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Tue, 30 Jul 2024 12:03:13 +0200 Subject: [PATCH 06/22] restore ruff changes after rebase --- lamindb/core/storage/_anndata_accessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lamindb/core/storage/_anndata_accessor.py b/lamindb/core/storage/_anndata_accessor.py index 07a6bff0b..c2ea32e51 100644 --- a/lamindb/core/storage/_anndata_accessor.py +++ b/lamindb/core/storage/_anndata_accessor.py @@ -279,7 +279,7 @@ def keys(storage: h5py.File): StorageTypes.append(zarr.Group) @registry.register_open("zarr") - def open(filepath: UPathStr, mode: Literal["r", "r+", "a", "w", "w-"] = "r"): # noqa + def open(filepath: UPathStr, mode: Literal["r", "r+", "a", "w", "w-"] = "r"): assert mode in {"r", "r+", "a", "w", "w-"}, f"Unknown mode {mode}!" # noqa: S101 fs, file_path_str = infer_filesystem(filepath) @@ -300,7 +300,7 @@ def read_dataframe(elem: Union[zarr.Array, zarr.Group]): # noqa return read_elem(elem) @registry.register("zarr") - def safer_read_partial(elem, indices): # noqa + def safer_read_partial(elem, indices): encoding_type = get_spec(elem).encoding_type if encoding_type == "": if isinstance(elem, zarr.Array): @@ -333,7 +333,7 @@ def safer_read_partial(elem, indices): # noqa # this is needed because accessing zarr.Group.keys() directly is very slow @registry.register("zarr") - def keys(storage: zarr.Group): # noqa + def keys(storage: zarr.Group): paths = storage._store.keys() attrs_keys: dict[str, list] = {} From 4686e6a15d36e378cc8716df550877ae2b57d2ec Mon Sep 17 00:00:00 2001 From: Koncopd Date: Tue, 30 Jul 2024 14:47:55 +0200 Subject: [PATCH 07/22] track remote tiledbsoma stores for writes --- lamindb/_artifact.py | 20 +++++++++++++-- lamindb/core/storage/_backed_access.py | 35 +++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index f0c5536da..f40a79409 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -880,16 +880,32 @@ def open( f" {', '.join(suffixes[:-1])}." ) - from lamindb.core.storage._backed_access import backed_access + from lamindb.core.storage._backed_access import _track_writes_factory, backed_access using_key = settings._using_key filepath = filepath_from_artifact(self, using_key=using_key) + is_tiledbsoma = filepath.name == "soma" or filepath.suffix == ".tiledbsoma" # consider the case where an object is already locally cached localpath = setup_settings.instance.storage.cloud_to_local_no_update(filepath) - if localpath.exists(): + if not is_tiledbsoma and localpath.exists(): access = backed_access(localpath, mode, using_key) else: access = backed_access(filepath, mode, using_key) + if is_tiledbsoma and mode == "w": + if access.__class__.__name__ not in {"Collection", "Experiment"}: + raise ValueError( + "The store seems to be tiledbsoma but it is neither `Collection` nor `Experiment`." + ) + + def finalize(): + nonlocal self, filepath + _, hash, _, _ = get_stat_dir_cloud(filepath) + if self.hash != hash: + logger.warning( + "The hash of the tiledbsoma store has changed, consider updating this artifact." + ) + + access = _track_writes_factory(access, finalize) # only call if open is successfull _track_run_input(self, is_run_input) return access diff --git a/lamindb/core/storage/_backed_access.py b/lamindb/core/storage/_backed_access.py index 896f65efe..1d147a288 100644 --- a/lamindb/core/storage/_backed_access.py +++ b/lamindb/core/storage/_backed_access.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, Any, Callable, Literal from anndata._io.specs.registry import get_spec from lnschema_core import Artifact @@ -16,6 +16,39 @@ from upath import UPath +def _track_writes_factory(obj: Any, finalize: Callable): + closed: bool = False + + tracked_class = obj.__class__ + type_dict = {"__doc__": tracked_class.__doc__} + if hasattr(tracked_class, "__slots__"): + type_dict["__slots__"] = () + if hasattr(tracked_class, "__exit__"): + + def __exit__(self, exc_type, exc_val, exc_tb): + nonlocal closed + tracked_class.__exit__(self, exc_type, exc_val, exc_tb) + if not closed: + finalize() + closed = True + + type_dict["__exit__"] = __exit__ + if hasattr(tracked_class, "close"): + + def close(self, *args, **kwargs): + nonlocal closed + tracked_class.close(self, *args, **kwargs) + if not closed: + finalize() + closed = True + + type_dict["close"] = close + + Track = type(tracked_class.__name__ + "Track", (tracked_class,), type_dict) + obj.__class__ = Track + return obj + + def _open_tiledbsoma( filepath: UPath, mode: Literal["r", "w"] = "r" ) -> SOMACollection | SOMAExperiment: From 39e4d2a825db3856cd95a9eebe712c34a39e3352 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Tue, 30 Jul 2024 15:10:58 +0200 Subject: [PATCH 08/22] ignore cahed tiledbsoma store in write mode --- lamindb/_artifact.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index f40a79409..34dd37b93 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -884,26 +884,30 @@ def open( using_key = settings._using_key filepath = filepath_from_artifact(self, using_key=using_key) - is_tiledbsoma = filepath.name == "soma" or filepath.suffix == ".tiledbsoma" + is_tiledbsoma_w = ( + filepath.name == "soma" or filepath.suffix == ".tiledbsoma" + ) and mode == "w" # consider the case where an object is already locally cached localpath = setup_settings.instance.storage.cloud_to_local_no_update(filepath) - if not is_tiledbsoma and localpath.exists(): + if not is_tiledbsoma_w and localpath.exists(): access = backed_access(localpath, mode, using_key) else: access = backed_access(filepath, mode, using_key) - if is_tiledbsoma and mode == "w": + if is_tiledbsoma_w: if access.__class__.__name__ not in {"Collection", "Experiment"}: raise ValueError( "The store seems to be tiledbsoma but it is neither `Collection` nor `Experiment`." ) def finalize(): - nonlocal self, filepath + nonlocal self, filepath, localpath _, hash, _, _ = get_stat_dir_cloud(filepath) if self.hash != hash: logger.warning( "The hash of the tiledbsoma store has changed, consider updating this artifact." ) + if localpath.exists(): + shutil.rmtree(localpath) access = _track_writes_factory(access, finalize) # only call if open is successfull From c61dee12f0db869de529c561d564fed20f51f3c1 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Wed, 31 Jul 2024 14:55:36 +0200 Subject: [PATCH 09/22] safer sync after changes --- lamindb/_artifact.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 34dd37b93..958050a1f 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -901,12 +901,16 @@ def open( def finalize(): nonlocal self, filepath, localpath - _, hash, _, _ = get_stat_dir_cloud(filepath) + if not isinstance(filepath, LocalPathClasses): + _, hash, _, _ = get_stat_dir_cloud(filepath) + else: + # this can be very slow + _, hash, _, _ = hash_dir(filepath) if self.hash != hash: logger.warning( "The hash of the tiledbsoma store has changed, consider updating this artifact." ) - if localpath.exists(): + if localpath.exists() and localpath != filepath: shutil.rmtree(localpath) access = _track_writes_factory(access, finalize) From be1131d0518453201cf57869ab371a2b16b266e8 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Wed, 31 Jul 2024 15:04:57 +0200 Subject: [PATCH 10/22] allow write mode for tiledbsoma only for now --- lamindb/_artifact.py | 2 ++ lamindb/core/storage/_backed_access.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 958050a1f..6da154d19 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -879,6 +879,8 @@ def open( " use one of the following suffixes for the object name:" f" {', '.join(suffixes[:-1])}." ) + if self.suffix != ".tiledbsoma" and self.key != "soma" and mode != "r": + raise ValueError("Only a tiledbsoma store can be openened with `mode!='r'`.") from lamindb.core.storage._backed_access import _track_writes_factory, backed_access diff --git a/lamindb/core/storage/_backed_access.py b/lamindb/core/storage/_backed_access.py index 1d147a288..66db40375 100644 --- a/lamindb/core/storage/_backed_access.py +++ b/lamindb/core/storage/_backed_access.py @@ -16,6 +16,9 @@ from upath import UPath +# this dynamically creates a subclass of a context manager class +# and reassigns it to an instance of the superclass +# so that the instance calls finalize on close or exit def _track_writes_factory(obj: Any, finalize: Callable): closed: bool = False From 0326ebe101a0f60dc881d52a65e2aea9e2c20cd9 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 11:59:23 +0200 Subject: [PATCH 11/22] test errors --- lamindb/_artifact.py | 4 ---- tests/test_storage.py | 28 +++++++++++++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 6da154d19..4d841d5ab 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -896,10 +896,6 @@ def open( else: access = backed_access(filepath, mode, using_key) if is_tiledbsoma_w: - if access.__class__.__name__ not in {"Collection", "Experiment"}: - raise ValueError( - "The store seems to be tiledbsoma but it is neither `Collection` nor `Experiment`." - ) def finalize(): nonlocal self, filepath, localpath diff --git a/tests/test_storage.py b/tests/test_storage.py index 89b6e0cbd..807435506 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -9,7 +9,11 @@ import tiledbsoma import tiledbsoma.io import zarr -from lamindb.core.storage._backed_access import BackedAccessor, backed_access +from lamindb.core.storage._backed_access import ( + AnnDataAccessor, + BackedAccessor, + backed_access, +) from lamindb.core.storage._zarr import read_adata_zarr, write_adata_zarr from lamindb.core.storage.objects import infer_suffix, write_to_disk from lamindb.core.storage.paths import read_adata_h5ad @@ -83,6 +87,10 @@ def callback(*args, **kwargs): with pytest.raises(ValueError): access = backed_access(fp.with_suffix(".invalid_suffix"), using_key=None) + # can't open anndata in write mode + with pytest.raises(ValueError): + access = backed_access(fp, mode="w", using_key=None) + access = backed_access(fp, using_key=None) assert not access.closed @@ -198,6 +206,20 @@ def test_backed_zarr_not_adata(): shutil.rmtree(zarr_pth) +def test_anndata_open_mode(): + fp = ln.core.datasets.anndata_file_pbmc68k_test() + artifact = ln.Artifact(fp, key="test_adata.h5ad") + artifact.save() + + with artifact.open(mode="r") as access: + assert isinstance(access, AnnDataAccessor) + # can't open in write mode if not tiledbsoma + with pytest.raises(ValueError): + artifact.open(mode="w") + + artifact.delete(permanent=True, storage=True) + + @pytest.mark.parametrize("storage", [None, "s3://lamindb-test"]) def test_backed_tiledbsoma(storage): if storage is not None: @@ -220,6 +242,10 @@ def test_backed_tiledbsoma(storage): assert isinstance(experiment, tiledbsoma.Experiment) experiment.close() + # wrong mode, should be either r or w for tiledbsoma + with pytest.raises(ValueError): + experiment.open(mode="p") + # run deprecated backed with artifact_soma.backed(): pass From 4b090d38aff533520daa36e2a062526ce5e9d25a Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 12:26:09 +0200 Subject: [PATCH 12/22] fix --- tests/test_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_storage.py b/tests/test_storage.py index 807435506..c24fe282d 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -244,7 +244,7 @@ def test_backed_tiledbsoma(storage): # wrong mode, should be either r or w for tiledbsoma with pytest.raises(ValueError): - experiment.open(mode="p") + artifact_soma.open(mode="p") # run deprecated backed with artifact_soma.backed(): From e79c1f433282dd20fdb35e3d6d0b5cc5dc9ff3ba Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 14:28:19 +0200 Subject: [PATCH 13/22] fix --- tests/test_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_storage.py b/tests/test_storage.py index c24fe282d..ea15cd2d4 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -89,7 +89,7 @@ def callback(*args, **kwargs): # can't open anndata in write mode with pytest.raises(ValueError): - access = backed_access(fp, mode="w", using_key=None) + access = backed_access(fp, mode="a", using_key=None) access = backed_access(fp, using_key=None) assert not access.closed From c89c628ea64b4a1d518c18f23ae2e2b3ff9583ab Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 15:37:20 +0200 Subject: [PATCH 14/22] try creating a new artifact for changed soma --- lamindb/_artifact.py | 9 +++++++-- tests/test_storage.py | 8 ++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 4d841d5ab..84aca9270 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -905,10 +905,15 @@ def finalize(): # this can be very slow _, hash, _, _ = hash_dir(filepath) if self.hash != hash: + from ._record import init_self_from_db + logger.warning( - "The hash of the tiledbsoma store has changed, consider updating this artifact." + "The hash of the tiledbsoma store has changed, creating a new version of the artifact." ) - if localpath.exists() and localpath != filepath: + new_version = Artifact(filepath, is_new_version_of=self).save() + init_self_from_db(self, new_version) + + if localpath != filepath and localpath.exists(): shutil.rmtree(localpath) access = _track_writes_factory(access, finalize) diff --git a/tests/test_storage.py b/tests/test_storage.py index ea15cd2d4..383262f02 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -242,6 +242,14 @@ def test_backed_tiledbsoma(storage): assert isinstance(experiment, tiledbsoma.Experiment) experiment.close() + # hash in the cloud will be different from hash on disk + # therefore the artifact will be updated + if storage is not None: + hash_on_disk = artifact_soma.hash + with artifact_soma.open(mode="w"): + pass + assert artifact_soma.hash != hash_on_disk + # wrong mode, should be either r or w for tiledbsoma with pytest.raises(ValueError): artifact_soma.open(mode="p") From 59073bbea94af0e21ffeb627be2ab68996dfe2fa Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 15:55:38 +0200 Subject: [PATCH 15/22] set key to None for a new version of soma artifact --- lamindb/_artifact.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 84aca9270..83a006431 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -910,7 +910,9 @@ def finalize(): logger.warning( "The hash of the tiledbsoma store has changed, creating a new version of the artifact." ) - new_version = Artifact(filepath, is_new_version_of=self).save() + new_version = Artifact( + filepath, key=None, is_new_version_of=self + ).save() init_self_from_db(self, new_version) if localpath != filepath and localpath.exists(): From 641b906229a6139af52eb8785ac56eead27c42d4 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 16:15:36 +0200 Subject: [PATCH 16/22] do not error on registering in .lamindb/ --- lamindb/_artifact.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lamindb/_artifact.py b/lamindb/_artifact.py index 83a006431..2df60319b 100644 --- a/lamindb/_artifact.py +++ b/lamindb/_artifact.py @@ -342,8 +342,10 @@ def get_artifact_kwargs_from_data( else: storage = default_storage - if key is not None and key.startswith(AUTO_KEY_PREFIX): - raise ValueError(f"Key cannot start with {AUTO_KEY_PREFIX}") + # for now comment out this error to allow creating new versions of stores + # in the default folder (.lamindb) + # if key is not None and key.startswith(AUTO_KEY_PREFIX): + # raise ValueError(f"Key cannot start with {AUTO_KEY_PREFIX}") log_storage_hint( check_path_in_storage=check_path_in_storage, @@ -910,9 +912,7 @@ def finalize(): logger.warning( "The hash of the tiledbsoma store has changed, creating a new version of the artifact." ) - new_version = Artifact( - filepath, key=None, is_new_version_of=self - ).save() + new_version = Artifact(filepath, is_new_version_of=self).save() init_self_from_db(self, new_version) if localpath != filepath and localpath.exists(): From 3ae809852a44615b717c088993f61104a17047d3 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 16:25:48 +0200 Subject: [PATCH 17/22] comment out AUTO_KEY_PREFIX --- tests/test_artifact.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/test_artifact.py b/tests/test_artifact.py index 978ae2792..313c20848 100644 --- a/tests/test_artifact.py +++ b/tests/test_artifact.py @@ -197,10 +197,12 @@ def test_is_new_version_of_versioned_file(df, adata): ln.Artifact(df, df) assert error.exconly() == "ValueError: Only one non-keyword arg allowed: data" - # AUTO_KEY_PREFIX - with pytest.raises(ValueError) as error: - ln.Artifact.from_df(df, key=".lamindb/test_df.parquet") - assert error.exconly() == "ValueError: Key cannot start with .lamindb/" + +# comment out for now +# AUTO_KEY_PREFIX +# with pytest.raises(ValueError) as error: +# ln.Artifact.from_df(df, key=".lamindb/test_df.parquet") +# assert error.exconly() == "ValueError: Key cannot start with .lamindb/" def test_is_new_version_of_unversioned_file(df, adata): From c1ba0055c70aa166c99077aeeb85e196956f4a87 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 16:31:23 +0200 Subject: [PATCH 18/22] more testing --- tests/test_storage.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_storage.py b/tests/test_storage.py index 383262f02..d891aba06 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -242,13 +242,16 @@ def test_backed_tiledbsoma(storage): assert isinstance(experiment, tiledbsoma.Experiment) experiment.close() - # hash in the cloud will be different from hash on disk - # therefore the artifact will be updated + hash_on_disk = artifact_soma.hash + with artifact_soma.open(mode="w") as store: + assert store.__class__.__name__ == "ExperimentTrack" if storage is not None: - hash_on_disk = artifact_soma.hash - with artifact_soma.open(mode="w"): - pass + # hash in the cloud will be different from hash on disk + # therefore the artifact will be updated assert artifact_soma.hash != hash_on_disk + else: + # hash stays the same + assert artifact_soma.hash == hash_on_disk # wrong mode, should be either r or w for tiledbsoma with pytest.raises(ValueError): From 3d38f74df0cc6a6c0361a314dab3bcdf36ee11a2 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 16:39:55 +0200 Subject: [PATCH 19/22] delete all versions in tiledbsoma test --- tests/test_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_storage.py b/tests/test_storage.py index d891aba06..635062480 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -261,7 +261,7 @@ def test_backed_tiledbsoma(storage): with artifact_soma.backed(): pass - artifact_soma.delete(permanent=True, storage=True) + artifact_soma.versions.delete(permanent=True, storage=True) shutil.rmtree("test.tiledbsoma") if storage is not None: From 896d684922f4d2bd98cd0ce29b7a8bc089aa3584 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 17:02:16 +0200 Subject: [PATCH 20/22] test cache deletion on hash change --- tests/test_storage.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_storage.py b/tests/test_storage.py index 635062480..fe97e9691 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -232,15 +232,8 @@ def test_backed_tiledbsoma(storage): artifact_soma = ln.Artifact("test.tiledbsoma", description="test tiledbsoma") artifact_soma.save() - # otherwise backed (.open) will use the cached object for connection - if storage is not None: - cache_path = artifact_soma.cache() - shutil.rmtree(cache_path) - assert not cache_path.exists() - - experiment = artifact_soma.open() - assert isinstance(experiment, tiledbsoma.Experiment) - experiment.close() + # copied to cache on .save() + cache_path = artifact_soma.cache() hash_on_disk = artifact_soma.hash with artifact_soma.open(mode="w") as store: @@ -249,9 +242,16 @@ def test_backed_tiledbsoma(storage): # hash in the cloud will be different from hash on disk # therefore the artifact will be updated assert artifact_soma.hash != hash_on_disk + # delete the cached store on hash change + assert not cache_path.exists() else: # hash stays the same assert artifact_soma.hash == hash_on_disk + assert artifact_soma.path == cache_path + + experiment = artifact_soma.open() # mode="r" by default + assert isinstance(experiment, tiledbsoma.Experiment) + experiment.close() # wrong mode, should be either r or w for tiledbsoma with pytest.raises(ValueError): From 9d6b56f9e76ffd60198a53f8dd4e0c5f9aea514d Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 17:33:18 +0200 Subject: [PATCH 21/22] upd lnschema-core --- sub/lnschema-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sub/lnschema-core b/sub/lnschema-core index 8bad20389..df6d1a8cb 160000 --- a/sub/lnschema-core +++ b/sub/lnschema-core @@ -1 +1 @@ -Subproject commit 8bad2038918426a3fd1d78461b1957da92189e18 +Subproject commit df6d1a8cbabe662d04fc5fce6d41f09faf4faa8f From b2f91c6a86012df2a5dbb7a0da013baa86b25155 Mon Sep 17 00:00:00 2001 From: Koncopd Date: Fri, 2 Aug 2024 17:56:06 +0200 Subject: [PATCH 22/22] upd lnschema-core --- sub/lnschema-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sub/lnschema-core b/sub/lnschema-core index df6d1a8cb..e6ce4e7ef 160000 --- a/sub/lnschema-core +++ b/sub/lnschema-core @@ -1 +1 @@ -Subproject commit df6d1a8cbabe662d04fc5fce6d41f09faf4faa8f +Subproject commit e6ce4e7efa27512799c8f8ae8d4063581c073d25