Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Track write operations on array stores #1756

Merged
merged 22 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions lamindb/_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,10 @@ def get_artifact_kwargs_from_data(
else:
storage = default_storage

if key is not None and key.startswith(AUTO_KEY_PREFIX):
raise ValueError(f"Key cannot start with {AUTO_KEY_PREFIX}")
# for now comment out this error to allow creating new versions of stores
# in the default folder (.lamindb)
# if key is not None and key.startswith(AUTO_KEY_PREFIX):
# raise ValueError(f"Key cannot start with {AUTO_KEY_PREFIX}")

log_storage_hint(
check_path_in_storage=check_path_in_storage,
Expand Down Expand Up @@ -861,15 +863,15 @@ def replace(

# deprecated
def backed(
self, is_run_input: bool | None = None
self, mode: str = "r", is_run_input: bool | None = None
) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment:
logger.warning("`.backed()` is deprecated, use `.open()`!'")
return self.open(is_run_input)
return self.open(mode, is_run_input)


# docstring handled through attach_func_to_class_method
def open(
self, is_run_input: bool | None = None
self, mode: str = "r", is_run_input: bool | None = None
) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment:
# ignore empty suffix for now
suffixes = (".h5", ".hdf5", ".h5ad", ".zarr", ".tiledbsoma", "")
Expand All @@ -879,29 +881,61 @@ def open(
" use one of the following suffixes for the object name:"
f" {', '.join(suffixes[:-1])}."
)
if self.suffix != ".tiledbsoma" and self.key != "soma" and mode != "r":
raise ValueError("Only a tiledbsoma store can be openened with `mode!='r'`.")

from lamindb.core.storage._backed_access import backed_access
from lamindb.core.storage._backed_access import _track_writes_factory, backed_access

_track_run_input(self, is_run_input)
using_key = settings._using_key
filepath = filepath_from_artifact(self, using_key=using_key)
is_tiledbsoma_w = (
filepath.name == "soma" or filepath.suffix == ".tiledbsoma"
) and mode == "w"
# consider the case where an object is already locally cached
localpath = setup_settings.instance.storage.cloud_to_local_no_update(filepath)
if localpath.exists():
return backed_access(localpath, using_key)
if not is_tiledbsoma_w and localpath.exists():
access = backed_access(localpath, mode, using_key)
else:
return backed_access(filepath, using_key)
access = backed_access(filepath, mode, using_key)
if is_tiledbsoma_w:

def finalize():
nonlocal self, filepath, localpath
if not isinstance(filepath, LocalPathClasses):
_, hash, _, _ = get_stat_dir_cloud(filepath)
else:
# this can be very slow
_, hash, _, _ = hash_dir(filepath)
if self.hash != hash:
from ._record import init_self_from_db

logger.warning(
"The hash of the tiledbsoma store has changed, creating a new version of the artifact."
)
new_version = Artifact(filepath, is_new_version_of=self).save()
init_self_from_db(self, new_version)

if localpath != filepath and localpath.exists():
shutil.rmtree(localpath)

access = _track_writes_factory(access, finalize)
# only call if open is successfull
_track_run_input(self, is_run_input)
return access


# docstring handled through attach_func_to_class_method
def load(self, is_run_input: bool | None = None, stream: bool = False, **kwargs) -> Any:
_track_run_input(self, is_run_input)
if hasattr(self, "_memory_rep") and self._memory_rep is not None:
return self._memory_rep
using_key = settings._using_key
return load_to_memory(
filepath_from_artifact(self, using_key=using_key), stream=stream, **kwargs
)
access_memory = self._memory_rep
else:
using_key = settings._using_key
access_memory = load_to_memory(
filepath_from_artifact(self, using_key=using_key), stream=stream, **kwargs
)
# only call if load is successfull
_track_run_input(self, is_run_input)
return access_memory


# docstring handled through attach_func_to_class_method
Expand Down
2 changes: 1 addition & 1 deletion lamindb/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ def mapped(

# docstring handled through attach_func_to_class_method
def cache(self, is_run_input: bool | None = None) -> list[UPath]:
_track_run_input(self, is_run_input)
path_list = []
for artifact in self.ordered_artifacts.all():
path_list.append(artifact.cache())
_track_run_input(self, is_run_input)
return path_list


Expand Down
2 changes: 1 addition & 1 deletion lamindb/core/_feature_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
from lamindb.core.storage._backed_access import backed_access

using_key = settings._using_key
data_parse = backed_access(filepath, using_key)
data_parse = backed_access(filepath, using_key=using_key)

Check warning on line 244 in lamindb/core/_feature_manager.py

View check run for this annotation

Codecov / codecov/patch

lamindb/core/_feature_manager.py#L244

Added line #L244 was not covered by tests
else:
data_parse = ad.read_h5ad(filepath, backed="r")
type = "float"
Expand Down
2 changes: 1 addition & 1 deletion lamindb/core/_mapped_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from lamin_utils import logger
from lamindb_setup.core.upath import UPath

from .storage._backed_access import (
from .storage._anndata_accessor import (
ArrayType,
ArrayTypes,
GroupType,
Expand Down
Loading
Loading