diff --git a/tiled/_tests/test_awkward.py b/tiled/_tests/test_awkward.py new file mode 100644 index 000000000..5ad9084ad --- /dev/null +++ b/tiled/_tests/test_awkward.py @@ -0,0 +1,41 @@ +import awkward + +from ..catalog import in_memory +from ..client import Context, from_context, record_history +from ..server.app import build_app + + +def test_awkward(tmpdir): + catalog = in_memory(writable_storage=tmpdir) + app = build_app(catalog) + with Context.from_app(app) as context: + client = from_context(context) + + # Write data into catalog. It will be stored as directory of buffers + # named like 'node0-offsets' and 'node2-data'. + array = awkward.Array( + [ + [{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}], + [], + [{"x": 3.3, "y": [1, 2, 3]}], + ] + ) + aac = client.write_awkward(array, key="test") + + # Read the data back out from the AwkwardArrrayClient, progressively sliced. + assert awkward.almost_equal(aac.read(), array) + assert awkward.almost_equal(aac[:], array) + assert awkward.almost_equal(aac[0], array[0]) + assert awkward.almost_equal(aac[0, "y"], array[0, "y"]) + assert awkward.almost_equal(aac[0, "y", :1], array[0, "y", :1]) + + # When sliced, the serer sends less data. + with record_history() as h: + aac[:] + assert len(h.responses) == 1 # sanity check + full_response_size = len(h.responses[0].content) + with record_history() as h: + aac[0, "y"] + assert len(h.responses) == 1 # sanity check + sliced_response_size = len(h.responses[0].content) + assert sliced_response_size < full_response_size diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py new file mode 100644 index 000000000..67537c4eb --- /dev/null +++ b/tiled/adapters/awkward_buffers.py @@ -0,0 +1,59 @@ +""" +A directory containing awkward buffers, one file per form key. +""" +from urllib import parse + +from ..structures.core import StructureFamily + + +class AwkwardBuffersAdapter: + structure_family = StructureFamily.awkward + + def __init__( + self, + directory, + structure, + metadata=None, + specs=None, + access_policy=None, + ): + self.directory = directory + self._metadata = metadata or {} + self._structure = structure + self.specs = list(specs or []) + self.access_policy = access_policy + + def metadata(self): + return self._metadata + + @classmethod + def init_storage(cls, directory, structure): + from ..server.schemas import Asset + + directory.mkdir() + data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) + return [Asset(data_uri=data_uri, is_directory=True)] + + def write(self, data): + for form_key, value in data.items(): + with open(self.directory / form_key, "wb") as file: + file.write(value) + + def read(self, form_keys=None): + selected_suffixed_form_keys = [] + if form_keys is None: + # Read all. + selected_suffixed_form_keys.extend(self._structure.suffixed_form_keys) + else: + for form_key in form_keys: + for suffixed_form_key in self._structure.suffixed_form_keys: + if suffixed_form_key.startswith(form_key): + selected_suffixed_form_keys.append(suffixed_form_key) + buffers = {} + for form_key in selected_suffixed_form_keys: + with open(self.directory / form_key, "rb") as file: + buffers[form_key] = file.read() + return buffers + + def structure(self): + return self._structure diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 8b49daf89..f0bd57257 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -49,6 +49,7 @@ PARQUET_MIMETYPE, SPARSE_BLOCKS_PARQUET_MIMETYPE, ZARR_MIMETYPE, + ZIP_MIMETYPE, ) from .utils import SCHEME_PATTERN, ensure_uri, safe_path @@ -57,6 +58,7 @@ DEFAULT_CREATION_MIMETYPE = { StructureFamily.array: ZARR_MIMETYPE, + StructureFamily.awkward: ZIP_MIMETYPE, StructureFamily.table: PARQUET_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } @@ -65,6 +67,9 @@ ZARR_MIMETYPE: lambda: importlib.import_module( "...adapters.zarr", __name__ ).ZarrArrayAdapter.init_storage, + ZIP_MIMETYPE: lambda: importlib.import_module( + "...adapters.awkward_buffers", __name__ + ).AwkwardBuffersAdapter.init_storage, PARQUET_MIMETYPE: lambda: importlib.import_module( "...adapters.parquet", __name__ ).ParquetDatasetAdapter.init_storage, @@ -820,6 +825,14 @@ async def write_block(self, *args, **kwargs): ) +class CatalogAwkwardAdapter(CatalogNodeAdapter): + async def read(self, *args, **kwargs): + return await ensure_awaitable((await self.get_adapter()).read, *args, **kwargs) + + async def write(self, *args, **kwargs): + return await ensure_awaitable((await self.get_adapter()).write, *args, **kwargs) + + class CatalogSparseAdapter(CatalogArrayAdapter): pass @@ -1082,6 +1095,7 @@ def json_serializer(obj): STRUCTURES = { StructureFamily.container: CatalogContainerAdapter, StructureFamily.array: CatalogArrayAdapter, + StructureFamily.awkward: CatalogAwkwardAdapter, StructureFamily.table: CatalogTableAdapter, StructureFamily.sparse: CatalogSparseAdapter, } diff --git a/tiled/catalog/core.py b/tiled/catalog/core.py index 8fcecd18f..bdeba6e58 100644 --- a/tiled/catalog/core.py +++ b/tiled/catalog/core.py @@ -5,10 +5,10 @@ # This is the alembic revision ID of the database revision # required by this version of Tiled. -REQUIRED_REVISION = "83889e049ddc" +REQUIRED_REVISION = "0b033e7fbe30" # This is list of all valid revisions (from current to oldest). -ALL_REVISIONS = ["83889e049ddc", "6825c778aa3c"] +ALL_REVISIONS = ["0b033e7fbe30", "83889e049ddc", "6825c778aa3c"] async def initialize_database(engine): diff --git a/tiled/catalog/migrations/versions/0b033e7fbe30_add_awkward_to_structurefamily_enum.py b/tiled/catalog/migrations/versions/0b033e7fbe30_add_awkward_to_structurefamily_enum.py new file mode 100644 index 000000000..5f90606ae --- /dev/null +++ b/tiled/catalog/migrations/versions/0b033e7fbe30_add_awkward_to_structurefamily_enum.py @@ -0,0 +1,33 @@ +"""Add 'awkward' to structurefamily enum. + +Revision ID: 0b033e7fbe30 +Revises: 83889e049ddc +Create Date: 2023-08-08 21:10:20.181470 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0b033e7fbe30" +down_revision = "83889e049ddc" +branch_labels = None +depends_on = None + + +def upgrade(): + connection = op.get_bind() + + if connection.engine.dialect.name == "postgresql": + with op.get_context().autocommit_block(): + op.execute( + sa.text( + "ALTER TYPE structurefamily ADD VALUE IF NOT EXISTS 'awkward' AFTER 'array'" + ) + ) + + +def downgrade(): + # This _could_ be implemented but we will wait for a need since we are + # still in alpha releases. + raise NotImplementedError diff --git a/tiled/catalog/mimetypes.py b/tiled/catalog/mimetypes.py index 03cfc3e79..a66b4ad7c 100644 --- a/tiled/catalog/mimetypes.py +++ b/tiled/catalog/mimetypes.py @@ -8,6 +8,7 @@ # for importing Readers that we will not actually use. PARQUET_MIMETYPE = "application/x-parquet" SPARSE_BLOCKS_PARQUET_MIMETYPE = "application/x-parquet-sparse" # HACK! +ZIP_MIMETYPE = "application/zip" ZARR_MIMETYPE = "application/x-zarr" DEFAULT_ADAPTERS_BY_MIMETYPE = OneShotCachedMap( { @@ -38,6 +39,9 @@ ZARR_MIMETYPE: lambda: importlib.import_module( "...adapters.zarr", __name__ ).read_zarr, + ZIP_MIMETYPE: lambda: importlib.import_module( + "...adapters.awkward_buffers", __name__ + ).AwkwardBuffersAdapter, } ) diff --git a/tiled/client/awkward.py b/tiled/client/awkward.py new file mode 100644 index 000000000..36b109105 --- /dev/null +++ b/tiled/client/awkward.py @@ -0,0 +1,52 @@ +import awkward + +from ..serialization.awkward import from_zipped_buffers, to_zipped_buffers +from ..structures.awkward import project_form +from .base import BaseClient +from .utils import handle_error + + +class AwkwardArrayClient(BaseClient): + def __repr__(self): + # TODO Include some summary of the structure. Probably + # lift __repr__ code from awkward itself here. + return f"<{type(self).__name__}>" + + def write(self, container): + handle_error( + self.context.http_client.put( + self.item["links"]["full"], + content=bytes(to_zipped_buffers(container, {})), + headers={"Content-Type": "application/zip"}, + ) + ) + + def read(self, slice=...): + structure = self.structure() + form = awkward.forms.from_dict(structure.form) + typetracer, report = awkward.typetracer.typetracer_with_report( + form, + forget_length=True, + ) + proxy_array = awkward.Array(typetracer) + # TODO Ask awkward to promote _touch_data to a public method. + proxy_array[slice].layout._touch_data(recursive=True) + form_keys_touched = set(report.data_touched) + projected_form = project_form(form, form_keys_touched) + # The order is not important, but sort so that the request is deterministic. + params = {"form_key": sorted(list(form_keys_touched))} + content = handle_error( + self.context.http_client.get( + self.item["links"]["full"], + headers={"Accept": "application/zip"}, + params=params, + ) + ).read() + container = from_zipped_buffers(content) + projected_array = awkward.from_buffers( + projected_form, structure.length, container + ) + return projected_array[slice] + + def __getitem__(self, slice): + return self.read(slice=slice) diff --git a/tiled/client/base.py b/tiled/client/base.py index 1411fd6c4..a40978120 100644 --- a/tiled/client/base.py +++ b/tiled/client/base.py @@ -261,6 +261,9 @@ def metadata_revisions(self): StructureFamily.array: lambda: importlib.import_module( "...structures.array", BaseClient.__module__ ).ArrayStructure, + StructureFamily.awkward: lambda: importlib.import_module( + "...structures.awkward", BaseClient.__module__ + ).AwkwardStructure, StructureFamily.table: lambda: importlib.import_module( "...structures.table", BaseClient.__module__ ).TableStructure, diff --git a/tiled/client/container.py b/tiled/client/container.py index 054b51671..3110da61a 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -731,6 +731,52 @@ def write_block(x, block_id, client): da.map_blocks(write_block, dtype=da.dtype, client=client).compute() return client + def write_awkward( + self, + array, + *, + key=None, + metadata=None, + dims=None, + specs=None, + ): + """ + Write an AwkwardArray. + + Parameters + ---------- + array: awkward.Array + key : str, optional + Key (name) for this new node. If None, the server will provide a unique key. + metadata : dict, optional + User metadata. May be nested. Must contain only basic types + (e.g. numbers, strings, lists, dicts) that are JSON-serializable. + dims : List[str], optional + A label for each dimension of the array. + specs : List[Spec], optional + List of names that are used to label that the data and/or metadata + conform to some named standard specification. + """ + import awkward + + from ..structures.awkward import AwkwardStructure + + form, length, container = awkward.to_buffers(array) + structure = AwkwardStructure( + length=length, + form=form.to_dict(), + suffixed_form_keys=list(container), + ) + client = self.new( + StructureFamily.awkward, + structure, + key=key, + metadata=metadata, + specs=specs, + ) + client.write(container) + return client + def write_sparse( self, coords, @@ -921,6 +967,9 @@ def __call__(self): { "container": _Wrap(Container), "array": _LazyLoad(("..array", Container.__module__), "ArrayClient"), + "awkward": _LazyLoad( + ("..awkward", Container.__module__), "AwkwardArrayClient" + ), "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), @@ -937,6 +986,8 @@ def __call__(self): { "container": _Wrap(Container), "array": _LazyLoad(("..array", Container.__module__), "DaskArrayClient"), + # TODO Create DaskAwkwardArrayClient + # "awkward": _LazyLoad(("..awkward", Container.__module__), "DaskAwkwardArrayClient"), "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DaskDataFrameClient" ), diff --git a/tiled/client/utils.py b/tiled/client/utils.py index 43b712995..eda720e37 100644 --- a/tiled/client/utils.py +++ b/tiled/client/utils.py @@ -146,7 +146,7 @@ def client_for_item(context, structure_clients, item, structure=None): def params_from_slice(slice): "Generate URL query param ?slice=... from Python slice object." params = {} - if slice is not None: + if (slice is not None) and (slice is not ...): if isinstance(slice, (int, builtins.slice)): slice = [slice] slices = [] diff --git a/tiled/serialization/__init__.py b/tiled/serialization/__init__.py index 7705a4765..368e09a14 100644 --- a/tiled/serialization/__init__.py +++ b/tiled/serialization/__init__.py @@ -14,6 +14,10 @@ def register_builtin_serializers(): from ..serialization import array as _array # noqa: F401 del _array + if modules_available("awkward"): + from ..serialization import awkward as _awkward # noqa: F401 + + del _awkward if modules_available("pandas", "pyarrow", "dask.dataframe"): from ..serialization import table as _table # noqa: F401 diff --git a/tiled/serialization/awkward.py b/tiled/serialization/awkward.py new file mode 100644 index 000000000..1d407fa3b --- /dev/null +++ b/tiled/serialization/awkward.py @@ -0,0 +1,28 @@ +import io +import zipfile + +from ..media_type_registration import deserialization_registry, serialization_registry + + +@serialization_registry.register("awkward", "application/zip") +def to_zipped_buffers(container, metadata): + file = io.BytesIO() + # Pack multiple buffers into a zipfile, uncompressed. This enables + # multiple buffers in a single response, with random access. The + # entire payload *may* be compressed using Tiled's normal compression + # mechanisms. + with zipfile.ZipFile(file, "w", compresslevel=zipfile.ZIP_STORED) as zip: + for form_key, buffer in container.items(): + zip.writestr(form_key, buffer) + return file.getbuffer() + + +@deserialization_registry.register("awkward", "application/zip") +def from_zipped_buffers(buffer): + file = io.BytesIO(buffer) + with zipfile.ZipFile(file, "r") as zip: + form_keys = zip.namelist() + buffers = {} + for form_key in form_keys: + buffers[form_key] = zip.read(form_key) + return buffers diff --git a/tiled/server/core.py b/tiled/server/core.py index 225bdc436..d7e4027ae 100644 --- a/tiled/server/core.py +++ b/tiled/server/core.py @@ -243,6 +243,7 @@ async def construct_entries_response( DEFAULT_MEDIA_TYPES = { StructureFamily.array: {"*/*": "application/octet-stream", "image/*": "image/png"}, + StructureFamily.awkward: {"*/*": "application/zip"}, StructureFamily.table: {"*/*": APACHE_ARROW_FILE_MIME_TYPE}, StructureFamily.container: {"*/*": "application/x-hdf5"}, StructureFamily.sparse: {"*/*": APACHE_ARROW_FILE_MIME_TYPE}, @@ -708,8 +709,9 @@ class WrongTypeForRoute(Exception): FULL_LINKS = { - StructureFamily.container: {"full": "{base_url}/node/full/{path}"}, StructureFamily.array: {"full": "{base_url}/array/full/{path}"}, + StructureFamily.awkward: {"full": "{base_url}/awkward/full/{path}"}, + StructureFamily.container: {"full": "{base_url}/node/full/{path}"}, StructureFamily.table: {"full": "{base_url}/node/full/{path}"}, StructureFamily.sparse: {"full": "{base_url}/array/full/{path}"}, } diff --git a/tiled/server/pydantic_awkward.py b/tiled/server/pydantic_awkward.py new file mode 100644 index 000000000..47caf1376 --- /dev/null +++ b/tiled/server/pydantic_awkward.py @@ -0,0 +1,11 @@ +import pydantic + + +class AwkwardStructure(pydantic.BaseModel): + length: int + form: dict + suffixed_form_keys: list[str] + + @classmethod + def from_json(cls, structure): + return cls(**structure) diff --git a/tiled/server/router.py b/tiled/server/router.py index 4cbe7962b..4be4c3116 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -603,6 +603,62 @@ async def node_full( raise HTTPException(status_code=406, detail=err.args[0]) +@router.get( + "/awkward/full/{path:path}", + response_model=schemas.Response, + name="full awkward array", +) +async def awkward_full( + request: Request, + entry=SecureEntry(scopes=["read:data"]), + # slice=Depends(slice_), + form_key: Optional[List[str]] = Query(None, min_length=1), + format: Optional[str] = None, + filename: Optional[str] = None, + serialization_registry=Depends(get_serialization_registry), + settings: BaseSettings = Depends(get_settings), +): + """ + Fetch a slice of AwkwardArray data. + """ + structure_family = entry.structure_family + if structure_family != StructureFamily.awkward: + raise HTTPException( + status_code=404, + detail=f"Cannot read {entry.structure_family} structure with /awkwrad/full route.", + ) + with record_timing(request.state.metrics, "read"): + # The plural vs. singular mismatch is due to the way query parameters + # are given as ?form_key=A&form_key=B&form_key=C. + container = await ensure_awaitable(entry.read, form_key) + if ( + sum(len(buffer) for buffer in container.values()) + > settings.response_bytesize_limit + ): + raise HTTPException( + status_code=400, + detail=( + f"Response would exceed {settings.response_bytesize_limit}. " + "Use slicing ('?slice=...') to request smaller chunks." + ), + ) + try: + with record_timing(request.state.metrics, "pack"): + return await construct_data_response( + structure_family, + serialization_registry, + container, + entry.metadata(), + request, + format, + specs=getattr(entry, "specs", []), + expires=getattr(entry, "content_stale_at", None), + filename=filename, + ) + except UnsupportedMediaTypes as err: + raise HTTPException(status_code=406, detail=err.args[0]) + + @router.post("/metadata/{path:path}", response_model=schemas.PostMetadataResponse) async def post_metadata( request: Request, @@ -684,6 +740,8 @@ async def post_metadata( elif body.structure_family == StructureFamily.container: links["full"] = f"{base_url}/node/full/{path_str}" links["search"] = f"{base_url}/search/{path_str}" + elif body.structure_family == StructureFamily.awkward: + links["full"] = f"{base_url}/awkward/full/{path_str}" else: raise NotImplementedError(body.structure_family) response_data = { @@ -804,6 +862,28 @@ async def put_table_partition( return json_or_msgpack(request, None) +@router.put("/awkward/full/{path:path}") +async def put_awkward_full( + request: Request, + entry=SecureEntry(scopes=["write:data"]), + deserialization_registry=Depends(get_deserialization_registry), +): + body = await request.body() + if entry.structure_family != StructureFamily.awkward: + raise HTTPException( + status_code=404, detail="This route is not applicable to this node." + ) + if not hasattr(entry, "write"): + raise HTTPException(status_code=405, detail="This node cannot be written to.") + media_type = request.headers["content-type"] + deserializer = deserialization_registry.dispatch( + StructureFamily.awkward, media_type + ) + data = await ensure_awaitable(deserializer, body) + await ensure_awaitable(entry.write, data) + return json_or_msgpack(request, None) + + @router.put("/metadata/{path:path}", response_model=schemas.PutMetadataResponse) async def put_metadata( request: Request, diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index 55dd46436..64e9a0b9e 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -12,6 +12,7 @@ from ..structures.core import StructureFamily from .pydantic_array import ArrayStructure +from .pydantic_awkward import AwkwardStructure from .pydantic_sparse import SparseStructure from .pydantic_table import TableStructure @@ -122,7 +123,13 @@ def from_orm(cls, orm): class DataSource(pydantic.BaseModel): id: Optional[int] = None structure: Optional[ - Union[ArrayStructure, TableStructure, NodeStructure, SparseStructure] + Union[ + ArrayStructure, + AwkwardStructure, + TableStructure, + NodeStructure, + SparseStructure, + ] ] = None mimetype: Optional[str] = None parameters: dict = {} @@ -147,7 +154,13 @@ class NodeAttributes(pydantic.BaseModel): specs: Optional[Specs] metadata: Optional[Dict] # free-form, user-specified dict structure: Optional[ - Union[ArrayStructure, TableStructure, NodeStructure, SparseStructure] + Union[ + ArrayStructure, + AwkwardStructure, + TableStructure, + NodeStructure, + SparseStructure, + ] ] sorting: Optional[List[SortingItem]] data_sources: Optional[List[DataSource]] @@ -174,6 +187,11 @@ class ArrayLinks(pydantic.BaseModel): block: str +class AwkwardLinks(pydantic.BaseModel): + self: str + full: str + + class DataFrameLinks(pydantic.BaseModel): self: str full: str @@ -189,6 +207,7 @@ class SparseLinks(pydantic.BaseModel): resource_links_type_by_structure_family = { StructureFamily.container: ContainerLinks, StructureFamily.array: ArrayLinks, + StructureFamily.awkward: AwkwardLinks, StructureFamily.table: DataFrameLinks, StructureFamily.sparse: SparseLinks, } diff --git a/tiled/structures/awkward.py b/tiled/structures/awkward.py new file mode 100644 index 000000000..844e16cec --- /dev/null +++ b/tiled/structures/awkward.py @@ -0,0 +1,51 @@ +from dataclasses import dataclass + +import awkward + + +@dataclass +class AwkwardStructure: + length: int + form: dict + suffixed_form_keys: list[str] + + @classmethod + def from_json(cls, structure): + return cls(**structure) + + +def project_form(form, form_keys_touched): + # See https://github.com/bluesky/tiled/issues/450 + if isinstance(form, awkward.forms.RecordForm): + if form.fields is None: + original_fields = [None] * len(form.contents) + else: + original_fields = form.fields + + fields = [] + contents = [] + for field, content in zip(original_fields, form.contents): + projected = project_form(content, form_keys_touched) + if projected is not None: + fields.append(field) + contents.append(content) + + if form.fields is None: + fields = None + + return form.copy(fields=fields, contents=contents) + + elif isinstance(form, awkward.forms.UnionForm): + raise NotImplementedError + + elif isinstance(form, (awkward.forms.NumpyForm, awkward.forms.EmptyForm)): + if form.form_key in form_keys_touched: + return form.copy() + else: + return None + + else: + if form.form_key in form_keys_touched: + return form.copy(content=project_form(form.content, form_keys_touched)) + else: + return None diff --git a/tiled/structures/core.py b/tiled/structures/core.py index ecc5e13df..065e3dd4e 100644 --- a/tiled/structures/core.py +++ b/tiled/structures/core.py @@ -10,6 +10,7 @@ class StructureFamily(str, enum.Enum): + awkward = "awkward" container = "container" array = "array" sparse = "sparse"