diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index f994227a..a3348a5b 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -15,7 +15,7 @@ _dask_uses_tasks = hasattr(dask.blockwise, "Task") if _dask_uses_tasks: - from dask.blockwise import Task, TaskRef # type: ignore + from dask.blockwise import Task, TaskRef if TYPE_CHECKING: from awkward import Array as AwkwardArray @@ -37,6 +37,7 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer: return ob def mock(self) -> AwkwardBlockwiseLayer: + """Mock this layer without evaluating it""" layer = copy.copy(self) nb = layer.numblocks layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()} @@ -216,23 +217,24 @@ def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T """Mock the input layer as starting with a data-less typetracer. This method is used to create new dask task graphs that operate purely on typetracer Arrays (that is, array with - awkward structure but without real data buffers). This allows - us to test which parts of a real awkward array will be used in + Awkward structure but without real data buffers). This allows + us to test which parts of a real Awkward array will be used in a real computation. We do this by running a graph which starts with mocked AwkwardInputLayers. We mock an AwkwardInputLayer in these steps: - 1. Ask the IO function to prepare a new meta array, and return + 1. Ask the IO function to prepare a new meta Array, and return any transient state. 2. Build a new AwkwardInputLayer whose IO function just returns - this meta (typetracer) array + this meta (typetracer) Array 3. Return the new input layer and the transient state - When this new layer is added to a dask task graph and that + When this new layer is added to a dask task graph, and that graph is computed, the report object will be mutated. Inspecting the report object after the compute tells us which buffers from the original form would be required for a real compute with the same graph. + Returns ------- AwkwardInputLayer @@ -283,6 +285,15 @@ def project( ) def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]: + """Report the necessary _columns_ implied by a given buffer optimisation state. + + Each IO source usually has the notion of a "column". For uproot, that's a TTree key, + whilst Parquet has "fields". Awkward operates at the _buffer_ level, which is nearly-always + a lower-level representation. As such, when users want to answer the question "which IO-columns" + does this graph require, we need to map between buffer names and the IO-source columns. + + This routine asks the IO function to perform that remapping, without knowing anything about what it does. + """ assert self.is_columnar return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns( report=report, state=state diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 6becf1b1..9838d625 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -62,7 +62,7 @@ ) if _dask_uses_tasks: - from dask.blockwise import TaskRef # type: ignore + from dask.blockwise import TaskRef if TYPE_CHECKING: from awkward.contents.content import Content diff --git a/src/dask_awkward/lib/inspect.py b/src/dask_awkward/lib/inspect.py index 3f63fc22..9c3baa28 100644 --- a/src/dask_awkward/lib/inspect.py +++ b/src/dask_awkward/lib/inspect.py @@ -121,6 +121,8 @@ def report_necessary_columns( r"""Get columns necessary to compute a collection This function is specific to sources that are columnar (e.g. Parquet). + It determines the names of IO-specific "columns" (e.g. TTree keys, + or Parquet fields) that are required by the necessary-buffers optimization. Parameters ---------- diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 6f5b4a4c..53834671 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -3,9 +3,10 @@ import copy import logging import warnings -from collections.abc import Iterable, Mapping, Sequence +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, cast, no_type_check +import awkward as ak import dask.config from awkward.typetracer import touch_data from dask.blockwise import Blockwise, fuse_roots, optimize_blockwise @@ -22,7 +23,7 @@ from dask_awkward.utils import first if _dask_uses_tasks: - from dask.blockwise import GraphNode, Task, TaskRef # type: ignore + from dask.blockwise import GraphNode, Task, TaskRef if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport @@ -85,9 +86,17 @@ def optimize(dsk: HighLevelGraph, keys: Sequence[Key], **_: Any) -> Mapping: def _prepare_buffer_projection( dsk: HighLevelGraph, keys: Sequence[Key] ) -> tuple[dict[str, TypeTracerReport], dict[str, Any]] | None: - """Pair layer names with lists of necessary columns.""" - import awkward as ak + """Prepare for buffer projection by building and evaluating a version of the graph that + has been annotated for typetracer reporting. + Parameters + ---------- + dsk : HighLevelGraph + Task graph to optimize. + keys : list[str] + Sequence of keys to optimize with respect to. + """ + # Skip early if we can't meaningfully optimise any layers if not _has_projectable_awkward_io_layer(dsk): return None @@ -97,24 +106,32 @@ def _prepare_buffer_projection( for name, lay in dsk.layers.items(): if isinstance(lay, AwkwardInputLayer): + # The layer supports buffer projection if lay.is_projectable: - # Insert mocked array into layers, replacing generation func - # Keep track of mocked state + # Replace input layer with one that is ready for input projection using a report + # Store the report for subsequent retrieval, and cache the transient state + # that column projection later needs to finalise the optimisation ( projection_layers[name], layer_to_reports[name], layer_to_projection_state[name], ) = lay.prepare_for_projection() + # Layers that don't support buffer projection might support mocking + # This means that we at least do not have to compute them during evaluation of the optimisation graph elif lay.is_mockable: projection_layers[name] = lay.mock() + # Layers that don't support buffer projection might support mocking + # This means that we at least do not have to compute them during evaluation of the optimisation graph elif hasattr(lay, "mock"): projection_layers[name] = lay.mock() + # Ensure that the buffers of each output are entirely touched for name in _ak_output_layer_names(dsk): projection_layers[name] = _mock_output(projection_layers[name]) hlg = HighLevelGraph(projection_layers, dsk.dependencies) + # The caller should apply this optimisation with respect to a number of output keys minimal_keys: set[Key] = set() for k in keys: if isinstance(k, tuple) and len(k) == 2: @@ -122,14 +139,17 @@ def _prepare_buffer_projection( else: minimal_keys.add(k) - # now we try to compute for each possible output layer key (leaf + # Now we try to compute for each possible output layer key (leaf # node on partition 0); this will cause the typetacer reports to # get correct fields/columns touched. If the result is a record or # an array we of course want to touch all of the data/fields. try: for layer in hlg.layers.values(): layer.__dict__.pop("_cached_dict", None) + results = get_sync(hlg, list(minimal_keys)) + + # Touch all the buffers associated with the given output keys for out in results: if isinstance(out, (ak.Array, ak.Record)): touch_data(out) @@ -185,14 +205,15 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph New, optimized task graph with column-projected ``AwkwardInputLayer``. """ + # 1. Build-and-evaluate typetracer-annotated graph projection_data = _prepare_buffer_projection(dsk, keys) if projection_data is None: return dsk - # Unpack result + # 2. Unpack result layer_to_reports, layer_to_projection_state = projection_data - # Project layers using projection state + # 3. Project layers using projection state from (1) layers = dict(dsk.layers) for name, state in layer_to_projection_state.items(): layers[name] = cast(AwkwardInputLayer, layers[name]).project( @@ -227,10 +248,10 @@ def _ak_output_layer_names(dsk: HighLevelGraph) -> list[str]: def _has_projectable_awkward_io_layer(dsk: HighLevelGraph) -> bool: """Check if a graph at least one AwkwardInputLayer that is project-able.""" - for _, v in dsk.layers.items(): - if isinstance(v, AwkwardInputLayer) and v.is_projectable: - return True - return False + return any( + isinstance(v, AwkwardInputLayer) and v.is_projectable + for v in dsk.layers.values() + ) def _touch_all_data(*args, **kwargs): @@ -432,9 +453,3 @@ def _recursive_replace(args, layer, parent, indices): else: args2.append(arg) return args2 - - -def _buffer_keys_for_layer( - buffer_keys: Iterable[str], known_buffer_keys: frozenset[str] -) -> set[str]: - return {k for k in buffer_keys if k in known_buffer_keys} diff --git a/src/dask_awkward/lib/unproject_layout.py b/src/dask_awkward/lib/unproject_layout.py index 493e3150..e2bc46c0 100644 --- a/src/dask_awkward/lib/unproject_layout.py +++ b/src/dask_awkward/lib/unproject_layout.py @@ -391,16 +391,14 @@ def _unproject_layout(form, layout, length, backend): def unproject_layout(form: Form | None, layout: Content) -> Content: - """Does nothing! Currently returns the passed in layout unchanged! + """Rehydrate a layout to include all parts of an original form. - Rehydrate a layout to include all parts of an original form. - - When we perform the necessary columns optimization we drop fields + When we perform the necessary columns optimization, we drop fields that are not necessary for a computed result. Sometimes we have - task graphs that expect to see fields in name only (but no their - data). To protect against FieldNotFound exception we "unproject" - or "rehydrate" the layout with the original form. This reapplys - all original fields, but the ones that were orignally projected + task graphs that expect to see fields in name only (but not their + data). To protect against `FieldNotFound` exceptions we "unproject" + or "rehydrate" the layout with the original form. This restores + the original structure, but fields that were orignally projected away are data-less. Parameters