Skip to content

Commit

Permalink
Remove nnbench.BenchmarkRunner in favor of nnbench.collect() and …
Browse files Browse the repository at this point in the history
…`nnbench.run()`

The class had little to no useful state, anyways, and these two standalone APIs play
way better with threading and concurrency, so we opt for functional APIs instead of
a monolithic class.

This is effectively an idiom change - before, we were instantiating as
`runner = nnbench.BenchmarkRunner`, whereas now, we go the extra mile to
collect in the open (`benchmarks = nnbench.collect(path, tags=...)`), and
then pass them as the first argument to the "new" `nnbench.run()` API, which
is just the BenchmarkRunner.run() with the path and tags arguments removed.

This also prevents erroneous caching and persistence of large params to an extent,
since the benchmark list is made explicit.
  • Loading branch information
nicholasjng committed Dec 21, 2024
1 parent 58bf902 commit c7ba2df
Showing 1 changed file with 175 additions and 205 deletions.
380 changes: 175 additions & 205 deletions src/nnbench/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import sys
import time
import uuid
import warnings
from collections.abc import Callable, Generator, Sequence
from dataclasses import asdict
from datetime import datetime
Expand Down Expand Up @@ -94,213 +93,184 @@ def _jsonify(val):
return json_params


class BenchmarkRunner:
def collect(path_or_module: str | os.PathLike[str], tags: tuple[str, ...] = ()) -> list[Benchmark]:
# TODO: functools.cache this guy
"""
An abstract benchmark runner class.
Discover benchmarks in a module and memoize them for later use.
Collects benchmarks from a module or file using the ``BenchmarkRunner.collect()``
method.
Parameters
----------
path_or_module: str | os.PathLike[str]
Name or path of the module to discover benchmarks in. Can also be a directory,
in which case benchmarks are collected from the Python files therein.
tags: tuple[str, ...]
Tags to filter for when collecting benchmarks. Only benchmarks containing either of
these tags are collected.
Raises
------
ValueError
If the given path is not a Python file, directory, or module name.
"""
benchmarks: list[Benchmark] = []
ppath = Path(path_or_module)
if ppath.is_dir():
pythonpaths = (p for p in ppath.iterdir() if p.suffix == ".py")
for py in pythonpaths:
logger.debug(f"Collecting benchmarks from submodule {py.name!r}.")
benchmarks.extend(collect(py, tags))
return benchmarks
elif ppath.is_file():
logger.debug(f"Collecting benchmarks from file {ppath}.")
module = import_file_as_module(path_or_module)
elif ismodule(path_or_module):
module = sys.modules[str(path_or_module)]
else:
raise ValueError(
f"expected a module name, Python file, or directory, " f"got {str(path_or_module)!r}"
)

Runs a previously collected benchmark workload with parameters in the
``BenchmarkRunner.run()`` method, outputting the results to a
``BenchmarkRecord`` dataclass.
# iterate through the module dict members to register
for k, v in module.__dict__.items():
if k.startswith("__") and k.endswith("__"):
# dunder names are ignored.
continue
elif isinstance(v, Benchmark):
if not tags or set(tags) & set(v.tags):
benchmarks.append(v)
elif isinstance(v, list | tuple | set | frozenset):
for bm in v:
if isinstance(bm, Benchmark):
if not tags or set(tags) & set(bm.tags):
benchmarks.append(bm)

return benchmarks


def run(
benchmarks: Benchmark | Sequence[Benchmark], # TODO: Support benchmark iterators
name: str | None = None,
params: dict[str, Any] | Parameters | None = None,
context: Sequence[ContextProvider] = (),
jsonifier: Callable[[dict[str, Any]], dict[str, Any]] = jsonify_params,
) -> BenchmarkRecord:
"""
Run a previously collected benchmark workload.
benchmark_type = Benchmark

def __init__(self):
self.benchmarks: list[Benchmark] = list()

def clear(self) -> None:
"""Clear all registered benchmarks."""
self.benchmarks.clear()

def collect(self, path_or_module: str | os.PathLike[str], tags: tuple[str, ...] = ()) -> None:
# TODO: functools.cache this guy
"""
Discover benchmarks in a module and memoize them for later use.
Parameters
----------
path_or_module: str | os.PathLike[str]
Name or path of the module to discover benchmarks in. Can also be a directory,
in which case benchmarks are collected from the Python files therein.
tags: tuple[str, ...]
Tags to filter for when collecting benchmarks. Only benchmarks containing either of
these tags are collected.
Raises
------
ValueError
If the given path is not a Python file, directory, or module name.
"""
ppath = Path(path_or_module)
if ppath.is_dir():
pythonpaths = (p for p in ppath.iterdir() if p.suffix == ".py")
for py in pythonpaths:
logger.debug(f"Collecting benchmarks from submodule {py.name!r}.")
self.collect(py, tags)
return
elif ppath.is_file():
logger.debug(f"Collecting benchmarks from file {ppath}.")
module = import_file_as_module(path_or_module)
elif ismodule(path_or_module):
module = sys.modules[str(path_or_module)]
else:
raise ValueError(
f"expected a module name, Python file, or directory, "
f"got {str(path_or_module)!r}"
)

# iterate through the module dict members to register
for k, v in module.__dict__.items():
if k.startswith("__") and k.endswith("__"):
# dunder names are ignored.
continue
elif isinstance(v, self.benchmark_type):
if not tags or set(tags) & set(v.tags):
self.benchmarks.append(v)
elif isinstance(v, list | tuple | set | frozenset):
for bm in v:
if isinstance(bm, self.benchmark_type):
if not tags or set(tags) & set(bm.tags):
self.benchmarks.append(bm)

def run(
self,
path_or_module: str | os.PathLike[str],
name: str | None = None,
params: dict[str, Any] | Parameters | None = None,
tags: tuple[str, ...] = (),
context: Sequence[ContextProvider] = (),
jsonifier: Callable[[dict[str, Any]], dict[str, Any]] = jsonify_params,
) -> BenchmarkRecord:
"""
Run a previously collected benchmark workload.
Parameters
----------
path_or_module: str | os.PathLike[str]
Name or path of the module to discover benchmarks in. Can also be a directory,
in which case benchmarks are collected from the Python files therein.
name: str | None
A name for the currently started run. If None, a name will be automatically generated.
params: dict[str, Any] | Parameters | None
Parameters to use for the benchmark run. Names have to match positional and keyword
argument names of the benchmark functions.
tags: tuple[str, ...]
Tags to filter for when collecting benchmarks. Only benchmarks containing either of
these tags are collected.
context: Sequence[ContextProvider]
Additional context to log with the benchmarks in the output JSON record. Useful for
obtaining environment information and configuration, like CPU/GPU hardware info,
ML model metadata, and more.
jsonifier: Callable[[dict[str, Any], dict[str, Any]]]
A function constructing a string representation from the input parameters.
Defaults to ``nnbench.runner.jsonify_params()``. Must produce a dictionary containing
only JSON-serializable values.
Returns
-------
BenchmarkRecord
A JSON output representing the benchmark results. Has three top-level keys,
"name" giving the benchmark run name, "context" holding the context information,
and "benchmarks", holding an array with the benchmark results.
"""
run = name or "nnbench-" + platform.node() + "-" + uuid.uuid1().hex[:8]

if not self.benchmarks:
self.collect(path_or_module, tags)

family_sizes: dict[str, Any] = collections.defaultdict(int)
family_indices: dict[str, Any] = collections.defaultdict(int)

ctx: dict[str, Any] = {}
for provider in context:
val = provider()
duplicates = set(ctx.keys()) & set(val.keys())
if duplicates:
dupe, *_ = duplicates
raise ValueError(f"got multiple values for context key {dupe!r}")
ctx.update(val)

# if we didn't find any benchmarks, warn and return an empty record.
if not self.benchmarks:
warnings.warn(f"No benchmarks found in path/module {str(path_or_module)!r}.")
return BenchmarkRecord(run=run, context=ctx, benchmarks=[])

for bm in self.benchmarks:
family_sizes[bm.interface.funcname] += 1

if isinstance(params, Parameters):
dparams = asdict(params)
else:
dparams = params or {}

results: list[dict[str, Any]] = []

def _maybe_dememo(v, expected_type):
"""Compute and memoize a value if a memo is given for a variable."""
if is_memo(v) and not is_memo_type(expected_type):
return v()
return v

for benchmark in self.benchmarks:
bm_family = benchmark.interface.funcname
state = State(
name=benchmark.name,
family=bm_family,
family_size=family_sizes[bm_family],
family_index=family_indices[bm_family],
)
family_indices[bm_family] += 1

# Assemble benchmark parameters. First grab all defaults from the interface,
bmparams = {
name: _maybe_dememo(val, typ)
for name, typ, val in benchmark.interface.variables
if val != inspect.Parameter.empty
}
# ... then hydrate with the appropriate subset of input parameters.
bmparams |= {k: v for k, v in dparams.items() if k in benchmark.interface.names}
# If any arguments are still unresolved, go look them up as fixtures.
if set(bmparams) < set(benchmark.interface.names):
# TODO: This breaks for a module name (like __main__).
# Since that only means that we cannot resolve fixtures when benchmarking
# a module name (which makes sense), and we can always pass extra
# parameters in the module case, fixing this is not as urgent.
p = Path(path_or_module)
if p.is_file():
root = p.parent
elif p.is_dir():
root = p
else:
raise ValueError(f"expected a file or directory, got {path_or_module!r}")
fm = FixtureManager(root)
bmparams |= fm.resolve(benchmark)

res: dict[str, Any] = {
"name": benchmark.name,
"function": qualname(benchmark.fn),
"description": benchmark.fn.__doc__ or "",
"date": datetime.now().isoformat(timespec="seconds"),
"error_occurred": False,
"error_message": "",
"parameters": jsonifier(bmparams),
}
try:
benchmark.setUp(state, bmparams)
with timer(res):
res["value"] = benchmark.fn(**bmparams)
except Exception as e:
res["error_occurred"] = True
res["error_message"] = str(e)
finally:
benchmark.tearDown(state, bmparams)
results.append(res)

return BenchmarkRecord(
run=run,
context=ctx,
benchmarks=results,
Parameters
----------
benchmarks: Sequence[Benchmark]
The list of discovered benchmarks to run.
name: str | None
A name for the currently started run. If None, a name will be automatically generated.
params: dict[str, Any] | Parameters | None
Parameters to use for the benchmark run. Names have to match positional and keyword
argument names of the benchmark functions.
context: Sequence[ContextProvider]
Additional context to log with the benchmarks in the output JSON record. Useful for
obtaining environment information and configuration, like CPU/GPU hardware info,
ML model metadata, and more.
jsonifier: Callable[[dict[str, Any], dict[str, Any]]]
A function constructing a string representation from the input parameters.
Defaults to ``nnbench.runner.jsonify_params()``. Must produce a dictionary containing
only JSON-serializable values.
Returns
-------
BenchmarkRecord
A JSON output representing the benchmark results. Has three top-level keys,
"name" giving the benchmark run name, "context" holding the context information,
and "benchmarks", holding an array with the benchmark results.
"""
_run = name or "nnbench-" + platform.node() + "-" + uuid.uuid1().hex[:8]

family_sizes: dict[str, Any] = collections.defaultdict(int)
family_indices: dict[str, Any] = collections.defaultdict(int)

ctx: dict[str, Any] = {}
for provider in context:
val = provider()
duplicates = set(ctx.keys()) & set(val.keys())
if duplicates:
dupe, *_ = duplicates
raise ValueError(f"got multiple values for context key {dupe!r}")
ctx.update(val)

if isinstance(benchmarks, Benchmark):
benchmarks = [benchmarks]

# if we didn't find any benchmarks, return an empty record.
if not benchmarks:
return BenchmarkRecord(run=_run, context=ctx, benchmarks=[])

for bm in benchmarks:
family_sizes[bm.interface.funcname] += 1

if isinstance(params, Parameters):
dparams = asdict(params)
else:
dparams = params or {}

results: list[dict[str, Any]] = []

def _maybe_dememo(v, expected_type):
"""Compute and memoize a value if a memo is given for a variable."""
if is_memo(v) and not is_memo_type(expected_type):
return v()
return v

for benchmark in benchmarks:
bm_family = benchmark.interface.funcname
state = State(
name=benchmark.name,
family=bm_family,
family_size=family_sizes[bm_family],
family_index=family_indices[bm_family],
)
family_indices[bm_family] += 1

# Assemble benchmark parameters. First grab all defaults from the interface,
bmparams = {
name: _maybe_dememo(val, typ)
for name, typ, val in benchmark.interface.variables
if val != inspect.Parameter.empty
}
# ... then hydrate with the appropriate subset of input parameters.
bmparams |= {k: v for k, v in dparams.items() if k in benchmark.interface.names}
# If any arguments are still unresolved, go look them up as fixtures.
if set(bmparams) < set(benchmark.interface.names):
# TODO: This breaks for a module name (like __main__).
# Since that only means that we cannot resolve fixtures when benchmarking
# a module name (which makes sense), and we can always pass extra
# parameters in the module case, fixing this is not as urgent.
mod = benchmark.__module__
file = sys.modules[mod].__file__
p = Path(file).parent
fm = FixtureManager(p)
bmparams |= fm.resolve(benchmark)

res: dict[str, Any] = {
"name": benchmark.name,
"function": qualname(benchmark.fn),
"description": benchmark.fn.__doc__ or "",
"date": datetime.now().isoformat(timespec="seconds"),
"error_occurred": False,
"error_message": "",
"parameters": jsonifier(bmparams),
}
try:
benchmark.setUp(state, bmparams)
with timer(res):
res["value"] = benchmark.fn(**bmparams)
except Exception as e:
res["error_occurred"] = True
res["error_message"] = str(e)
finally:
benchmark.tearDown(state, bmparams)
results.append(res)

return BenchmarkRecord(
run=_run,
context=ctx,
benchmarks=results,
)

0 comments on commit c7ba2df

Please sign in to comment.