Skip to content

Commit

Permalink
✨ Track write operations on array stores (#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
Koncopd authored Aug 2, 2024
1 parent 8e5a957 commit d369754
Show file tree
Hide file tree
Showing 9 changed files with 915 additions and 777 deletions.
66 changes: 50 additions & 16 deletions lamindb/_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,10 @@ def get_artifact_kwargs_from_data(
else:
storage = default_storage

if key is not None and key.startswith(AUTO_KEY_PREFIX):
raise ValueError(f"Key cannot start with {AUTO_KEY_PREFIX}")
# for now comment out this error to allow creating new versions of stores
# in the default folder (.lamindb)
# if key is not None and key.startswith(AUTO_KEY_PREFIX):
# raise ValueError(f"Key cannot start with {AUTO_KEY_PREFIX}")

log_storage_hint(
check_path_in_storage=check_path_in_storage,
Expand Down Expand Up @@ -861,15 +863,15 @@ def replace(

# deprecated
def backed(
self, is_run_input: bool | None = None
self, mode: str = "r", is_run_input: bool | None = None
) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment:
logger.warning("`.backed()` is deprecated, use `.open()`!'")
return self.open(is_run_input)
return self.open(mode, is_run_input)


# docstring handled through attach_func_to_class_method
def open(
self, is_run_input: bool | None = None
self, mode: str = "r", is_run_input: bool | None = None
) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment:
# ignore empty suffix for now
suffixes = (".h5", ".hdf5", ".h5ad", ".zarr", ".tiledbsoma", "")
Expand All @@ -879,29 +881,61 @@ def open(
" use one of the following suffixes for the object name:"
f" {', '.join(suffixes[:-1])}."
)
if self.suffix != ".tiledbsoma" and self.key != "soma" and mode != "r":
raise ValueError("Only a tiledbsoma store can be openened with `mode!='r'`.")

from lamindb.core.storage._backed_access import backed_access
from lamindb.core.storage._backed_access import _track_writes_factory, backed_access

_track_run_input(self, is_run_input)
using_key = settings._using_key
filepath = filepath_from_artifact(self, using_key=using_key)
is_tiledbsoma_w = (
filepath.name == "soma" or filepath.suffix == ".tiledbsoma"
) and mode == "w"
# consider the case where an object is already locally cached
localpath = setup_settings.instance.storage.cloud_to_local_no_update(filepath)
if localpath.exists():
return backed_access(localpath, using_key)
if not is_tiledbsoma_w and localpath.exists():
access = backed_access(localpath, mode, using_key)
else:
return backed_access(filepath, using_key)
access = backed_access(filepath, mode, using_key)
if is_tiledbsoma_w:

def finalize():
nonlocal self, filepath, localpath
if not isinstance(filepath, LocalPathClasses):
_, hash, _, _ = get_stat_dir_cloud(filepath)
else:
# this can be very slow
_, hash, _, _ = hash_dir(filepath)
if self.hash != hash:
from ._record import init_self_from_db

logger.warning(
"The hash of the tiledbsoma store has changed, creating a new version of the artifact."
)
new_version = Artifact(filepath, is_new_version_of=self).save()
init_self_from_db(self, new_version)

if localpath != filepath and localpath.exists():
shutil.rmtree(localpath)

access = _track_writes_factory(access, finalize)
# only call if open is successfull
_track_run_input(self, is_run_input)
return access


# docstring handled through attach_func_to_class_method
def load(self, is_run_input: bool | None = None, stream: bool = False, **kwargs) -> Any:
_track_run_input(self, is_run_input)
if hasattr(self, "_memory_rep") and self._memory_rep is not None:
return self._memory_rep
using_key = settings._using_key
return load_to_memory(
filepath_from_artifact(self, using_key=using_key), stream=stream, **kwargs
)
access_memory = self._memory_rep
else:
using_key = settings._using_key
access_memory = load_to_memory(
filepath_from_artifact(self, using_key=using_key), stream=stream, **kwargs
)
# only call if load is successfull
_track_run_input(self, is_run_input)
return access_memory


# docstring handled through attach_func_to_class_method
Expand Down
2 changes: 1 addition & 1 deletion lamindb/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ def mapped(

# docstring handled through attach_func_to_class_method
def cache(self, is_run_input: bool | None = None) -> list[UPath]:
_track_run_input(self, is_run_input)
path_list = []
for artifact in self.ordered_artifacts.all():
path_list.append(artifact.cache())
_track_run_input(self, is_run_input)
return path_list


Expand Down
2 changes: 1 addition & 1 deletion lamindb/core/_feature_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def parse_feature_sets_from_anndata(
from lamindb.core.storage._backed_access import backed_access

using_key = settings._using_key
data_parse = backed_access(filepath, using_key)
data_parse = backed_access(filepath, using_key=using_key)
else:
data_parse = ad.read_h5ad(filepath, backed="r")
type = "float"
Expand Down
2 changes: 1 addition & 1 deletion lamindb/core/_mapped_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from lamin_utils import logger
from lamindb_setup.core.upath import UPath

from .storage._backed_access import (
from .storage._anndata_accessor import (
ArrayType,
ArrayTypes,
GroupType,
Expand Down
Loading

0 comments on commit d369754

Please sign in to comment.