Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into myst
Browse files Browse the repository at this point in the history
  • Loading branch information
hobu committed Jan 7, 2025
2 parents 136288e + 1b962f1 commit 86b2f43
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 147 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ jobs:
test:
name: ${{ matrix.os }} py${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
environment: testing
timeout-minutes: 30
defaults:
run:
Expand Down Expand Up @@ -52,8 +51,8 @@ jobs:
pip list
- name: Run Tests
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_ACCESS_KEY_ID: ${{ secrets.SM_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.SM_AWS_SECRET_ACCESS_KEY }}
run: |
pytest -vv --durations=0
Expand Down
5 changes: 3 additions & 2 deletions src/silvimetric/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
__version__ = '1.3.0'
__version__ = '1.3.1'

from .resources.bounds import Bounds
from .resources.extents import Extents
from .resources.storage import Storage
from .resources.metric import Metric, run_metrics
from .resources.metric import Metric
from .resources.metrics import grid_metrics, l_moments, percentiles, statistics, all_metrics
from .resources.metrics import product_moments
from .resources.taskgraph import Graph
from .resources.log import Log
from .resources.data import Data
from .resources.attribute import Attribute, Pdal_Attributes, Attributes
Expand Down
30 changes: 15 additions & 15 deletions src/silvimetric/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,25 @@ def dask_handle(dasktype: str, scheduler: str, workers: int, threads: int,
p.register()

elif scheduler == 'distributed':
raise Exception("Dask distributed scheduler is currently disabled. "
"Please select another scheduler ('single-threaded' or 'local')")
# raise Exception("Dask distributed scheduler is currently disabled. "
# "Please select another scheduler ('single-threaded' or 'local')")

# TODO: this is disabled for now. Distributed dask keeps crashing when
# used for shatter.

# dask_config['scheduler'] = scheduler
# if dasktype == 'processes':
# cluster = LocalCluster(processes=True, n_workers=workers, threads_per_worker=threads)
# elif dasktype == 'threads':
# cluster = LocalCluster(processes=False, n_workers=workers, threads_per_worker=threads)
# else:
# raise ValueError(f"Invalid value for 'dasktype', {dasktype}")

# client = Client(cluster)
# client.get_versions(check=True)
# dask_config['distributed.client'] = client
# if watch:
# webbrowser.open(client.cluster.dashboard_link)
dask_config['scheduler'] = scheduler
if dasktype == 'processes':
cluster = LocalCluster(processes=True, n_workers=workers, threads_per_worker=threads)
elif dasktype == 'threads':
cluster = LocalCluster(processes=False, n_workers=workers, threads_per_worker=threads)
else:
raise ValueError(f"Invalid value for 'dasktype', {dasktype}")

client = Client(cluster)
client.get_versions(check=True)
dask_config['distributed.client'] = client
if watch:
webbrowser.open(client.cluster.dashboard_link)

elif scheduler == 'single-threaded':
dask_config['scheduler'] = scheduler
Expand Down
30 changes: 12 additions & 18 deletions src/silvimetric/commands/shatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from dask.delayed import Delayed
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

from .. import Extents, Storage, Data, ShatterConfig, run_metrics
from .. import Extents, Storage, Data, ShatterConfig
from ..resources.taskgraph import Graph

def get_data(extents: Extents, filename: str, storage: Storage):
"""
Expand Down Expand Up @@ -58,14 +60,13 @@ def arrange(points: pd.DataFrame, leaf, attrs: list[str]):

return points

def get_metrics(data_in, storage: Storage):
def run_graph(data_in, metrics):
"""
Run DataFrames through metric processes
"""
if data_in is None:
return None
graph = Graph(metrics)

return run_metrics(data_in, storage.config.metrics)
return graph.run(data_in)

def agg_list(data_in):
"""
Expand All @@ -74,12 +75,6 @@ def agg_list(data_in):
if data_in is None:
return None

# grouped = data_in.groupby(['xi','yi'])
# grouped = grouped.agg(list)

# return grouped.assign(count=lambda x: [len(z) for z in grouped.Z])

# # coerce datatypes to object so we can store np arrays as cell values
old_dtypes = data_in.dtypes
xyi_dtypes = { 'xi': np.float64, 'yi': np.float64 }
o = np.dtype('O')
Expand Down Expand Up @@ -151,10 +146,12 @@ def pc_filter(d: pd.DataFrame):
return False
return not d.empty

graph = Graph(storage.config.metrics).init()

points: db.Bag = leaf_bag.map(get_data, config.filename, storage)
arranged: db.Bag = points.map(arrange, leaf_bag, attrs)
filtered = arranged.filter(pc_filter)
metrics: db.Bag = filtered.map(run_metrics, storage.config.metrics)
filtered: db.Bag = arranged.filter(pc_filter)
metrics: db.Bag = filtered.map(run_graph, storage.config.metrics)
lists: db.Bag = filtered.map(agg_list)
joined: db.Bag = lists.map(join, metrics)
writes: db.Bag = joined.map(write, storage, timestamp)
Expand Down Expand Up @@ -191,13 +188,14 @@ def kill_gracefully(signum, frame):

processes = get_processes(leaves, config, storage)

## If dask is distributed, use the futures feature
## If dask is distributed, use the futures feature
dc = get_client()
if dc is not None:
pc_futures = futures_of(processes.persist())
for batch in as_completed(pc_futures, with_results=True).batches():
for future, pack in batch:
if isinstance(pack, CancelledError):
print('asdfasdf')
continue
for pc in pack:
config.point_count = config.point_count + pc
Expand Down Expand Up @@ -230,10 +228,6 @@ def shatter(config: ShatterConfig) -> int:
:return: Number of points processed.
"""

if get_client() is not None:
raise AttributeError("Dask distributed scheduler is currently disabled "
"for SilviMetric. Use a different scheduler to continue.")

# get start time in milliseconds
config.start_time = datetime.datetime.now().timestamp() * 1000
# set up tiledb
Expand Down
1 change: 1 addition & 0 deletions src/silvimetric/resources/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, name: str, dtype) -> None:
raise AttributeError(f"Invalid dtype passed to Attribute: {dtype}") from e

def make_array(self, data, copy=False):
"""Create Pandas Extension array for TileDB compatibility."""
return AttributeArray(data=data, copy=copy)

def entry_name(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/silvimetric/resources/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

try:
import websocket
from pythonjsonlogger import jsonlogger
from pythonjsonlogger import json as jsonlogger
except ImportError:
WebSocketHandler = None
pass
Expand Down
68 changes: 6 additions & 62 deletions src/silvimetric/resources/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@
import dill
import pandas as pd

import dask
from dask.delayed import Delayed
from distributed import Future
from .attribute import Attribute

MetricFn = Callable[[pd.DataFrame, Any], pd.DataFrame]
FilterFn = Callable[[pd.DataFrame, Optional[Union[Any, None]]], pd.DataFrame]
mutex = Lock()

# Derived information about a cell of points
## TODO should create list of metrics as classes that derive from Metric?
class Metric():
"""
A Metric is a TileDB entry representing derived cell data. There is a base set of
Expand Down Expand Up @@ -89,6 +85,7 @@ def entry_name(self, attr: str) -> str:
return f'm_{attr}_{self.name}'

def sanitize_and_run(self, d, locs, args):
"""Sanitize arguments, find the indices """
# Args are the return values of previous DataFrame aggregations.
# In order to access the correct location, we need a map of groupby
# indices to their locations and then grab the correct index from args
Expand All @@ -97,14 +94,11 @@ def sanitize_and_run(self, d, locs, args):
attrs = [a.entry_name(attr) for a in self.dependencies]

if isinstance(args, pd.DataFrame):
try:
with mutex:
idx = locs.loc[d.index[0]]
xi = idx.xi
yi = idx.yi
pass_args = [args.at[(xi,yi), a] for a in attrs]
except KeyError as e:
print(e)
with mutex:
idx = locs.loc[d.index[0]]
xi = idx.xi
yi = idx.yi
pass_args = [args.at[(xi,yi), a] for a in attrs]
else:
pass_args = args

Expand Down Expand Up @@ -180,7 +174,6 @@ def run_filters(self, data: pd.DataFrame) -> pd.DataFrame:
data = ndf
return data


def to_json(self) -> dict[str, any]:
return {
'name': self.name,
Expand Down Expand Up @@ -238,52 +231,3 @@ def __call__(self, data: pd.DataFrame) -> pd.DataFrame:

def __repr__(self) -> str:
return f"Metric_{self.name}"

def get_methods(data: Union[pd.DataFrame, Delayed], metrics: Metric | list[Metric],
uuid=None) -> list[Delayed]:
"""
Create Metric dependency graph by iterating through desired metrics and
their dependencies, creating Delayed objects that can be run later.
"""
# identitity for this graph, can be created before or during this method
# call, but needs to be the same across this graph, and unique compared
# to other graphs
if uuid is None:
uuid = uuid4()

# don't duplicate a delayed object
if not isinstance(data, Delayed):
data = dask.delayed(data)

if isinstance(metrics, Metric):
metrics = [ metrics ]

# iterate through metrics and their dependencies.
# uuid here will help guide metrics to use the same dependency method
# calls from dask
seq = []
for m in metrics:
if not isinstance(m, Metric):
continue
ddeps = get_methods(data, m.dependencies, uuid)
dd = dask.delayed(m.do)(data, *ddeps,
dask_key_name=f'{m.name}-{str(uuid)}')
seq.append(dd)

return seq

def run_metrics(data: pd.DataFrame, metrics: Union[Metric, list[Metric]]) -> pd.DataFrame:
"""
Collect Metric dependency graph and run it, then merge the results together.
"""
graph = get_methods(data, metrics)

# try returning just the graph and see if that can speed thigns up
# return graph
computed_list = dask.persist(*graph, optimize_graph=True)

def merge(x, y):
return x.merge(y, on=['xi','yi'])
merged = reduce(merge, computed_list)

return merged
39 changes: 35 additions & 4 deletions src/silvimetric/resources/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,44 @@
import copy
from ..attribute import Attributes as A

from ..metric import Metric
from .percentiles import percentiles
from .l_moments import l_moments
from .stats import statistics
from .p_moments import product_moments

all_metrics: dict[str, Metric] = dict(percentiles | l_moments | statistics |
product_moments)

#TODO make each one of these have a version with the NumberOfReturns>2 filter

grid_metrics: dict[str, Metric] = dict(percentiles | l_moments | statistics |
product_moments)
gr_perc = copy.deepcopy(percentiles)
gr_l_moments = copy.deepcopy(l_moments)
gr_stats = copy.deepcopy(statistics)
gr_p_moments = copy.deepcopy(product_moments)

all_metrics: dict[str, Metric] = dict(percentiles | l_moments | statistics |
product_moments)
for p in gr_perc.values():
p.attributes = [A['Z'], A['Intensity']]
for l in gr_l_moments.values():
l.attributes = [A['Z'], A['Intensity']]

gr_stats['cumean'].attributes = [A['Z']]
gr_stats['sqmean'].attributes = [A['Z']]
gr_stats['abovemean'].attributes = [A['Z']]
gr_stats['abovemode'].attributes = [A['Z']]
gr_stats['profilearea'].attributes = [A['Z']]

gr_stats['iq'].attributes = [A['Z'], A['Intensity']]
gr_stats['crr'].attributes = [A['Z'], A['Intensity']]
gr_stats['min'].attributes = [A['Z'], A['Intensity']]
gr_stats['max'].attributes = [A['Z'], A['Intensity']]
gr_stats['mode'].attributes = [A['Z'], A['Intensity']]
gr_stats['median'].attributes = [A['Z'], A['Intensity']]
gr_stats['stddev'].attributes = [A['Z'], A['Intensity']]
gr_stats['cv'].attributes = [A['Z'], A['Intensity']]

for s in gr_stats.values():
s.attributes = [A['Z'], A['Intensity']]

grid_metrics: dict[str, Metric] = dict(gr_perc | gr_l_moments | gr_stats |
gr_p_moments)
Loading

0 comments on commit 86b2f43

Please sign in to comment.