Skip to content

Commit

Permalink
feat: add fit_curve and predict_curve (#139)
Browse files Browse the repository at this point in the history
* add prototype for fit_curve

* add proper parameter parsing

* use partial function in test

* add assert to test

* cleanups in fit_curve

* minor edits

* progress with parameter passing

* revert changes to process decorator

* add to tests

* progress on predict

* get predict to work

* remove comment

* fix up output datacube

* add assertions

* preserve dimension order

* add cast to datetime if appropriate

* keep attrs

* fix typo

* add ignore_nodata

* bump submodule
  • Loading branch information
LukeWeidenwalker authored Aug 22, 2023
1 parent c4aacb2 commit 3a55003
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .curve_fitting import *
from .random_forest import *
125 changes: 125 additions & 0 deletions openeo_processes_dask/process_implementations/ml/curve_fitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Callable, Optional

import numpy as np
import pandas as pd
import xarray as xr
from numpy.typing import ArrayLike

from openeo_processes_dask.process_implementations.cubes import apply_dimension
from openeo_processes_dask.process_implementations.data_model import RasterCube
from openeo_processes_dask.process_implementations.exceptions import (
DimensionNotAvailable,
)

__all__ = ["fit_curve", "predict_curve"]


def fit_curve(
data: RasterCube,
parameters: list,
function: Callable,
dimension: str,
ignore_nodata: bool = True,
):
if dimension not in data.dims:
raise DimensionNotAvailable(
f"Provided dimension ({dimension}) not found in data.dims: {data.dims}"
)

dims_before = list(data.dims)

# In the spec, parameters is a list, but xr.curvefit requires names for them,
# so we do this to generate names locally
parameters = {f"param_{i}": v for i, v in enumerate(parameters)}

# The dimension along which to fit the curves cannot be chunked!
rechunked_data = data.chunk({dimension: -1})

def wrapper(f):
def _wrap(*args, **kwargs):
return f(
*args,
**kwargs,
positional_parameters={"x": 0, "parameters": slice(1, None)},
)

return _wrap

expected_dims_after = list(dims_before)
expected_dims_after[dims_before.index(dimension)] = "param"

# .curvefit returns some extra information that isn't required by the OpenEO process
# so we simply drop these here.
fit_result = (
rechunked_data.curvefit(
dimension,
wrapper(function),
p0=parameters,
param_names=list(parameters.keys()),
skipna=ignore_nodata,
)
.drop_dims(["cov_i", "cov_j"])
.to_array()
.squeeze()
.transpose(*expected_dims_after)
)

fit_result.attrs = data.attrs

return fit_result


def predict_curve(
parameters: RasterCube,
function: Callable,
dimension: str,
labels: ArrayLike,
):
labels_were_datetime = False
dims_before = list(parameters.dims)

try:
# Try parsing as datetime first
labels = np.asarray(labels, dtype=np.datetime64)
except ValueError:
labels = np.asarray(labels)

if np.issubdtype(labels.dtype, np.datetime64):
labels = labels.astype(int)
labels_were_datetime = True

# This is necessary to pipe the arguments correctly through @process
def wrapper(f):
def _wrap(*args, **kwargs):
return f(
*args,
positional_parameters={"parameters": 0},
named_parameters={"x": labels},
**kwargs,
)

return _wrap

expected_dims_after = list(dims_before)
expected_dims_after[dims_before.index("param")] = dimension

predictions = xr.apply_ufunc(
wrapper(function),
parameters,
vectorize=True,
input_core_dims=[["param"]],
output_core_dims=[[dimension]],
dask="parallelized",
output_dtypes=[np.float64],
dask_gufunc_kwargs={
"allow_rechunk": True,
"output_sizes": {dimension: len(labels)},
},
).transpose(*expected_dims_after)

predictions = predictions.assign_coords({dimension: labels.data})

if labels_were_datetime:
predictions[dimension] = pd.DatetimeIndex(predictions[dimension].values)

return predictions
2 changes: 1 addition & 1 deletion openeo_processes_dask/specs/openeo-processes
62 changes: 61 additions & 1 deletion tests/test_ml.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
from functools import partial

import dask
import geopandas as gpd
import numpy as np
import pytest
import xgboost as xgb
from openeo_pg_parser_networkx.pg_schema import ParameterReference

from openeo_processes_dask.process_implementations.ml import fit_regr_random_forest
from openeo_processes_dask.process_implementations.core import process
from openeo_processes_dask.process_implementations.cubes.apply import apply_dimension
from openeo_processes_dask.process_implementations.cubes.general import dimension_labels
from openeo_processes_dask.process_implementations.ml import (
fit_curve,
fit_regr_random_forest,
predict_curve,
)
from tests.mockdata import create_fake_rastercube


def test_fit_regr_random_forest(vector_data_cube, dask_client):
Expand Down Expand Up @@ -32,3 +46,49 @@ def test_fit_regr_random_forest_inline_geojson(
)

assert isinstance(model, xgb.core.Booster)


@pytest.mark.parametrize("size", [(6, 5, 4, 3)])
@pytest.mark.parametrize("dtype", [np.float64])
def test_curve_fitting(temporal_interval, bounding_box, random_raster_data):
origin_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04"],
backend="dask",
)

@process
def fitFunction(x, parameters):
t0 = 2 * np.pi / 31557600 * x
return parameters[0] + parameters[1] * np.cos(t0) + parameters[2] * np.sin(t0)

_process = partial(
fitFunction,
x=ParameterReference(from_parameter="x"),
parameters=ParameterReference(from_parameter="parameters"),
)

parameters = [1, 0, 0]
result = fit_curve(
origin_cube, parameters=parameters, function=_process, dimension="t"
)
assert len(result.param) == 3
assert isinstance(result.data, dask.array.Array)

assert len(result.coords["bands"]) == len(origin_cube.coords["bands"])
assert len(result.coords["x"]) == len(origin_cube.coords["x"])
assert len(result.coords["y"]) == len(origin_cube.coords["y"])
assert len(result.coords["param"]) == len(parameters)

labels = dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0])
predictions = predict_curve(
result,
_process,
origin_cube.openeo.temporal_dims[0],
labels=labels,
).compute()

assert len(predictions.coords[origin_cube.openeo.temporal_dims[0]]) == len(labels)
assert "param" not in predictions.dims

0 comments on commit 3a55003

Please sign in to comment.