From be800cf23c4fcd14ebb4a4211ed65360c91a6634 Mon Sep 17 00:00:00 2001 From: Garrett Bischof Date: Wed, 27 Nov 2019 17:16:04 -0500 Subject: [PATCH] replaced documents_to_xarray --- databroker/core.py | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/databroker/core.py b/databroker/core.py index 554898fc3..6c90fd0ac 100644 --- a/databroker/core.py +++ b/databroker/core.py @@ -1074,18 +1074,26 @@ def __repr__(self): out = f"" return out + def _to_xarray(self, fill='yes'): + + def stream_gen(): + for i in itertools.count(): + partition = self.read_partition({'index': i, 'fill': fill, + 'partition_size': 'auto'}) + if not partition: + break + yield from partition + + stream = stream_gen() + + arraypages = [eventpage_to_arraypage(doc) for name, doc + in stream if name == 'event_page'] + arraypage = concat_arraypages(array_pages) + datasetpage = arraypage_to_datasetpage(array_page) + return datasetpage + def _open_dataset(self): - self._ds = documents_to_xarray( - start_doc=self._run_start_doc, - stop_doc=self._run_stop_doc, - descriptor_docs=self._descriptors, - get_event_pages=self._get_event_pages, - filler=self.fillers['delayed'], - get_resource=self._get_resource, - lookup_resource_for_datum=self._lookup_resource_for_datum, - get_datum_pages=self._get_datum_pages, - include=self.include, - exclude=self.exclude) + self._ds = _to_xarray()['data'] def read(self): """ @@ -1434,7 +1442,7 @@ def parse_handler_registry(handler_registry): intake.container.container_map['bluesky-event-stream'] = RemoteBlueskyEventStream -def concat_dataarray_pages(dataarray_pages): +def concat_arraypages(dataarray_pages): """ Combines a iterable of dataarray_pages to a single dataarray_page. @@ -1470,7 +1478,7 @@ def concat_dataarray_pages(dataarray_pages): for key in data_keys}} -def event_page_to_dataarray_page(event_page, dims=None, coords=None): +def eventpage_to_arraypage(event_page, dims=None, coords=None): """ Converts the event_page's data, timestamps, and filled to xarray.DataArray. @@ -1509,7 +1517,7 @@ def event_page_to_dataarray_page(event_page, dims=None, coords=None): for key in data_keys}} -def dataarray_page_to_dataset_page(dataarray_page): +def arraypage_to_datasetpage(dataarray_page): """ Converts the dataarray_page's data, timestamps, and filled to xarray.DataSet.