Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add better code-docs on buffer optimisation #564

Merged
merged 17 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
_dask_uses_tasks = hasattr(dask.blockwise, "Task")

if _dask_uses_tasks:
from dask.blockwise import Task, TaskRef # type: ignore
from dask.blockwise import Task, TaskRef

if TYPE_CHECKING:
from awkward import Array as AwkwardArray
Expand All @@ -37,6 +37,7 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer:
return ob

def mock(self) -> AwkwardBlockwiseLayer:
"""Mock this layer without evaluating it"""
layer = copy.copy(self)
nb = layer.numblocks
layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()}
Expand Down Expand Up @@ -216,23 +217,24 @@ def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T
"""Mock the input layer as starting with a data-less typetracer.
This method is used to create new dask task graphs that
operate purely on typetracer Arrays (that is, array with
awkward structure but without real data buffers). This allows
us to test which parts of a real awkward array will be used in
Awkward structure but without real data buffers). This allows
us to test which parts of a real Awkward array will be used in
a real computation. We do this by running a graph which starts
with mocked AwkwardInputLayers.

We mock an AwkwardInputLayer in these steps:
1. Ask the IO function to prepare a new meta array, and return
1. Ask the IO function to prepare a new meta Array, and return
any transient state.
2. Build a new AwkwardInputLayer whose IO function just returns
this meta (typetracer) array
this meta (typetracer) Array
3. Return the new input layer and the transient state

When this new layer is added to a dask task graph and that
When this new layer is added to a dask task graph, and that
graph is computed, the report object will be mutated.
Inspecting the report object after the compute tells us which
buffers from the original form would be required for a real
compute with the same graph.

Returns
-------
AwkwardInputLayer
Expand Down Expand Up @@ -283,6 +285,15 @@ def project(
)

def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]:
"""Report the necessary _columns_ implied by a given buffer optimisation state.

Each IO source usually has the notion of a "column". For uproot, that's a TTree key,
whilst Parquet has "fields". Awkward operates at the _buffer_ level, which is nearly-always
a lower-level representation. As such, when users want to answer the question "which IO-columns"
does this graph require, we need to map between buffer names and the IO-source columns.

This routine asks the IO function to perform that remapping, without knowing anything about what it does.
"""
assert self.is_columnar
return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns(
report=report, state=state
Expand Down
2 changes: 1 addition & 1 deletion src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
)

if _dask_uses_tasks:
from dask.blockwise import TaskRef # type: ignore
from dask.blockwise import TaskRef

if TYPE_CHECKING:
from awkward.contents.content import Content
Expand Down
2 changes: 2 additions & 0 deletions src/dask_awkward/lib/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def report_necessary_columns(
r"""Get columns necessary to compute a collection

This function is specific to sources that are columnar (e.g. Parquet).
It determines the names of IO-specific "columns" (e.g. TTree keys,
or Parquet fields) that are required by the necessary-buffers optimization.

Parameters
----------
Expand Down
53 changes: 34 additions & 19 deletions src/dask_awkward/lib/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import copy
import logging
import warnings
from collections.abc import Iterable, Mapping, Sequence
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, cast, no_type_check

import awkward as ak
import dask.config
from awkward.typetracer import touch_data
from dask.blockwise import Blockwise, fuse_roots, optimize_blockwise
Expand All @@ -22,7 +23,7 @@
from dask_awkward.utils import first

if _dask_uses_tasks:
from dask.blockwise import GraphNode, Task, TaskRef # type: ignore
from dask.blockwise import GraphNode, Task, TaskRef

if TYPE_CHECKING:
from awkward._nplikes.typetracer import TypeTracerReport
Expand Down Expand Up @@ -85,9 +86,17 @@ def optimize(dsk: HighLevelGraph, keys: Sequence[Key], **_: Any) -> Mapping:
def _prepare_buffer_projection(
dsk: HighLevelGraph, keys: Sequence[Key]
) -> tuple[dict[str, TypeTracerReport], dict[str, Any]] | None:
"""Pair layer names with lists of necessary columns."""
import awkward as ak
"""Prepare for buffer projection by building and evaluating a version of the graph that
has been annotated for typetracer reporting.

Parameters
----------
dsk : HighLevelGraph
Task graph to optimize.
keys : list[str]
Sequence of keys to optimize with respect to.
"""
# Skip early if we can't meaningfully optimise any layers
if not _has_projectable_awkward_io_layer(dsk):
return None

Expand All @@ -97,39 +106,50 @@ def _prepare_buffer_projection(

for name, lay in dsk.layers.items():
if isinstance(lay, AwkwardInputLayer):
# The layer supports buffer projection
if lay.is_projectable:
# Insert mocked array into layers, replacing generation func
# Keep track of mocked state
# Replace input layer with one that is ready for input projection using a report
# Store the report for subsequent retrieval, and cache the transient state
# that column projection later needs to finalise the optimisation
(
projection_layers[name],
layer_to_reports[name],
layer_to_projection_state[name],
) = lay.prepare_for_projection()
# Layers that don't support buffer projection might support mocking
# This means that we at least do not have to compute them during evaluation of the optimisation graph
elif lay.is_mockable:
projection_layers[name] = lay.mock()
# Layers that don't support buffer projection might support mocking
# This means that we at least do not have to compute them during evaluation of the optimisation graph
elif hasattr(lay, "mock"):
projection_layers[name] = lay.mock()

# Ensure that the buffers of each output are entirely touched
for name in _ak_output_layer_names(dsk):
projection_layers[name] = _mock_output(projection_layers[name])

hlg = HighLevelGraph(projection_layers, dsk.dependencies)

# The caller should apply this optimisation with respect to a number of output keys
minimal_keys: set[Key] = set()
for k in keys:
if isinstance(k, tuple) and len(k) == 2:
minimal_keys.add((k[0], 0))
else:
minimal_keys.add(k)

# now we try to compute for each possible output layer key (leaf
# Now we try to compute for each possible output layer key (leaf
# node on partition 0); this will cause the typetacer reports to
# get correct fields/columns touched. If the result is a record or
# an array we of course want to touch all of the data/fields.
try:
for layer in hlg.layers.values():
layer.__dict__.pop("_cached_dict", None)

results = get_sync(hlg, list(minimal_keys))

# Touch all the buffers associated with the given output keys
for out in results:
if isinstance(out, (ak.Array, ak.Record)):
touch_data(out)
Expand Down Expand Up @@ -185,14 +205,15 @@ def optimize_columns(dsk: HighLevelGraph, keys: Sequence[Key]) -> HighLevelGraph
New, optimized task graph with column-projected ``AwkwardInputLayer``.

"""
# 1. Build-and-evaluate typetracer-annotated graph
projection_data = _prepare_buffer_projection(dsk, keys)
if projection_data is None:
return dsk

# Unpack result
# 2. Unpack result
layer_to_reports, layer_to_projection_state = projection_data

# Project layers using projection state
# 3. Project layers using projection state from (1)
layers = dict(dsk.layers)
for name, state in layer_to_projection_state.items():
layers[name] = cast(AwkwardInputLayer, layers[name]).project(
Expand Down Expand Up @@ -227,10 +248,10 @@ def _ak_output_layer_names(dsk: HighLevelGraph) -> list[str]:

def _has_projectable_awkward_io_layer(dsk: HighLevelGraph) -> bool:
"""Check if a graph at least one AwkwardInputLayer that is project-able."""
for _, v in dsk.layers.items():
if isinstance(v, AwkwardInputLayer) and v.is_projectable:
return True
return False
return any(
isinstance(v, AwkwardInputLayer) and v.is_projectable
for v in dsk.layers.values()
)


def _touch_all_data(*args, **kwargs):
Expand Down Expand Up @@ -432,9 +453,3 @@ def _recursive_replace(args, layer, parent, indices):
else:
args2.append(arg)
return args2


def _buffer_keys_for_layer(
buffer_keys: Iterable[str], known_buffer_keys: frozenset[str]
) -> set[str]:
return {k for k in buffer_keys if k in known_buffer_keys}
14 changes: 6 additions & 8 deletions src/dask_awkward/lib/unproject_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,16 +391,14 @@ def _unproject_layout(form, layout, length, backend):


def unproject_layout(form: Form | None, layout: Content) -> Content:
"""Does nothing! Currently returns the passed in layout unchanged!
"""Rehydrate a layout to include all parts of an original form.

Rehydrate a layout to include all parts of an original form.

When we perform the necessary columns optimization we drop fields
When we perform the necessary columns optimization, we drop fields
that are not necessary for a computed result. Sometimes we have
task graphs that expect to see fields in name only (but no their
data). To protect against FieldNotFound exception we "unproject"
or "rehydrate" the layout with the original form. This reapplys
all original fields, but the ones that were orignally projected
task graphs that expect to see fields in name only (but not their
data). To protect against `FieldNotFound` exceptions we "unproject"
or "rehydrate" the layout with the original form. This restores
the original structure, but fields that were orignally projected
away are data-less.

Parameters
Expand Down
Loading