Skip to content

Commit

Permalink
✨ API to create and append to tiledbsoma stores (#1823)
Browse files Browse the repository at this point in the history
  • Loading branch information
Koncopd authored Aug 19, 2024
1 parent 92c25a9 commit 754dbe8
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- "faq"
- "storage"
- "cli"
timeout-minutes: 7
timeout-minutes: 10

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion lamindb/core/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

from lamindb_setup.core.upath import LocalPathClasses, UPath, infer_filesystem

from ._anndata_sizes import size_adata
from ._backed_access import AnnDataAccessor, BackedAccessor
from ._tiledbsoma import write_tiledbsoma_store
from ._valid_suffixes import VALID_SUFFIXES
from .objects import infer_suffix, write_to_disk
from .paths import delete_storage, load_to_memory
40 changes: 2 additions & 38 deletions lamindb/core/storage/_backed_access.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Literal
from typing import TYPE_CHECKING, Any, Callable

from anndata._io.specs.registry import get_spec
from lnschema_core import Artifact

from ._anndata_accessor import AnnDataAccessor, StorageType, registry
from ._tiledbsoma import _open_tiledbsoma
from .paths import filepath_from_artifact

if TYPE_CHECKING:
Expand Down Expand Up @@ -52,43 +53,6 @@ def close(self, *args, **kwargs):
return obj


def _open_tiledbsoma(
filepath: UPath, mode: Literal["r", "w"] = "r"
) -> SOMACollection | SOMAExperiment:
try:
import tiledbsoma as soma
except ImportError as e:
raise ImportError("Please install tiledbsoma: pip install tiledbsoma") from e
filepath_str = filepath.as_posix()
if filepath.protocol == "s3":
from lamindb_setup.core._settings_storage import get_storage_region

region = get_storage_region(filepath_str)
tiledb_config = {"vfs.s3.region": region}
storage_options = filepath.storage_options
if "key" in storage_options:
tiledb_config["vfs.s3.aws_access_key_id"] = storage_options["key"]
if "secret" in storage_options:
tiledb_config["vfs.s3.aws_secret_access_key"] = storage_options["secret"]
if "token" in storage_options:
tiledb_config["vfs.s3.aws_session_token"] = storage_options["token"]
ctx = soma.SOMATileDBContext(tiledb_config=tiledb_config)
# this is a strange bug
# for some reason iterdir futher gives incorrect results
# if cache is not invalidated
# instead of obs and ms it gives ms and ms in the list of names
filepath.fs.invalidate_cache()
else:
ctx = None

soma_objects = [obj.name for obj in filepath.iterdir()]
if "obs" in soma_objects and "ms" in soma_objects:
SOMAType = soma.Experiment
else:
SOMAType = soma.Collection
return SOMAType.open(filepath_str, mode=mode, context=ctx)


@dataclass
class BackedAccessor:
"""h5py.File or zarr.Group accessor."""
Expand Down
110 changes: 110 additions & 0 deletions lamindb/core/storage/_tiledbsoma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Literal

from anndata import AnnData
from lamindb_setup.core._settings_storage import get_storage_region
from lamindb_setup.core.upath import create_path
from lnschema_core import Artifact, Run

if TYPE_CHECKING:
from lamindb_setup.core.types import UPathStr
from tiledbsoma import Collection as SOMACollection
from tiledbsoma import Experiment as SOMAExperiment
from upath import UPath


def _tiledb_config_s3(storepath: UPath) -> dict:
region = get_storage_region(storepath)
tiledb_config = {"vfs.s3.region": region}
storage_options = storepath.storage_options
if "key" in storage_options:
tiledb_config["vfs.s3.aws_access_key_id"] = storage_options["key"]
if "secret" in storage_options:
tiledb_config["vfs.s3.aws_secret_access_key"] = storage_options["secret"]
if "token" in storage_options:
tiledb_config["vfs.s3.aws_session_token"] = storage_options["token"]

return tiledb_config


def _open_tiledbsoma(
storepath: UPath, mode: Literal["r", "w"] = "r"
) -> SOMACollection | SOMAExperiment:
try:
import tiledbsoma as soma
except ImportError as e:
raise ImportError("Please install tiledbsoma: pip install tiledbsoma") from e

storepath_str = storepath.as_posix()
if storepath.protocol == "s3":
ctx = soma.SOMATileDBContext(tiledb_config=_tiledb_config_s3(storepath))
# this is a strange bug
# for some reason iterdir futher gives incorrect results
# if cache is not invalidated
# instead of obs and ms it gives ms and ms in the list of names
storepath.fs.invalidate_cache()
else:
ctx = None

soma_objects = [obj.name for obj in storepath.iterdir()]
if "obs" in soma_objects and "ms" in soma_objects:
SOMAType = soma.Experiment
else:
SOMAType = soma.Collection
return SOMAType.open(storepath_str, mode=mode, context=ctx)


def write_tiledbsoma_store(
storepath: UPathStr,
adata: AnnData | UPathStr,
run: Run | None = None,
artifact_kwargs: dict | None = None,
**kwargs,
) -> Artifact:
"""Write `AnnData` to `tiledbsoma.Experiment`.
Reads `AnnData`, writes it to `tiledbsoma.Experiment` and creates `lamindb.Artifact`.
See `tiledbsoma.io.from_h5ad
<https://tiledbsoma.readthedocs.io/en/latest/_autosummary/tiledbsoma.io.from_h5ad.html>`__.
"""
try:
import tiledbsoma as soma
import tiledbsoma.io as soma_io
except ImportError as e:
raise ImportError("Please install tiledbsoma: pip install tiledbsoma") from e

from lamindb.core._data import get_run

if artifact_kwargs is None:
artifact_kwargs = {}

if not isinstance(adata, AnnData):
from lamindb.core.storage.paths import read_adata_h5ad, read_adata_zarr

# in case adata is somewhere in our managed s3 bucket or just in s3
adata = create_path(adata)
if adata.is_dir():
adata = read_adata_zarr(adata)
else:
adata = read_adata_h5ad(adata)
elif adata.is_view:
raise ValueError(
"Can not write from an `AnnData` view, please do `adata.copy()` before passing."
)

run = get_run(run)
adata.obs["lamin_run_uid"] = run.uid

storepath = create_path(storepath)
if storepath.protocol == "s3":
ctx = soma.SOMATileDBContext(tiledb_config=_tiledb_config_s3(storepath))
else:
ctx = None

soma_io.from_anndata(storepath.as_posix(), adata, context=ctx, **kwargs)

del adata.obs["lamin_run_uid"]

return Artifact(storepath, run=run, **artifact_kwargs)
71 changes: 53 additions & 18 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tiledbsoma
import tiledbsoma.io
import zarr
from lamindb.core.storage import write_tiledbsoma_store
from lamindb.core.storage._backed_access import (
AnnDataAccessor,
BackedAccessor,
Expand Down Expand Up @@ -227,42 +228,76 @@ def test_backed_tiledbsoma(storage):
ln.settings.storage = "s3://lamindb-test"

test_file = ln.core.datasets.anndata_file_pbmc68k_test()
tiledbsoma.io.from_h5ad("test.tiledbsoma", test_file, "RNA")
adata = read_adata_h5ad(test_file)
# write less
del adata.varp
del adata.obsp
del adata.layers
if storage is None:
# test local with zarr
test_file = test_file.with_suffix(".zarr")
adata.write_zarr(test_file)
else:
adata.write_h5ad(test_file)

artifact_soma = ln.Artifact("test.tiledbsoma", description="test tiledbsoma")
# fails with a view
with pytest.raises(ValueError):
write_tiledbsoma_store("test.tiledbsoma", adata[:2])

transform = ln.Transform(name="test tiledbsoma store")
transform.save()
run = ln.Run(transform)
run.save()

experiment_path = ln.settings.storage.root / "test.tiledbsoma"
if storage is not None and experiment_path.exists():
experiment_path.rmdir()

artifact_soma = write_tiledbsoma_store(
experiment_path,
test_file,
run,
artifact_kwargs={"description": "test tiledbsoma"},
measurement_name="RNA",
)
artifact_soma.save()

# copied to cache on .save()
cache_path = artifact_soma.cache()
with artifact_soma.open() as store: # mode="r" by default
assert isinstance(store, tiledbsoma.Experiment)
obs = store["obs"]
n_obs = len(obs)
assert "lamin_run_uid" in obs.schema.names
run_id = obs.read(column_names=["lamin_run_uid"]).concat().to_pandas()
assert all(run_id == run.uid)

hash_on_disk = artifact_soma.hash
cache_path = artifact_soma.cache()
hash_before_changes = artifact_soma.hash
with artifact_soma.open(mode="w") as store:
assert store.__class__.__name__ == "ExperimentTrack"
tiledbsoma.io.add_matrix_to_collection(
exp=store,
measurement_name="RNA",
collection_name="obsm",
matrix_name="test_array",
matrix_data=np.ones((n_obs, 2)),
)
assert artifact_soma.hash != hash_before_changes
if storage is not None:
# hash in the cloud will be different from hash on disk
# therefore the artifact will be updated
assert artifact_soma.hash != hash_on_disk
# delete the cached store on hash change
# cache should be ignored and deleted after the changes
assert not cache_path.exists()
else:
# hash stays the same
assert artifact_soma.hash == hash_on_disk
assert artifact_soma.path == cache_path

experiment = artifact_soma.open() # mode="r" by default
assert isinstance(experiment, tiledbsoma.Experiment)
experiment.close()

# wrong mode, should be either r or w for tiledbsoma
with pytest.raises(ValueError):
artifact_soma.open(mode="p")

# run deprecated backed
with artifact_soma.backed():
pass
# and test running without the context manager
store = artifact_soma.backed()
store.close()

artifact_soma.versions.delete(permanent=True, storage=True)
shutil.rmtree("test.tiledbsoma")

if storage is not None:
ln.settings.storage = previous_storage

0 comments on commit 754dbe8

Please sign in to comment.