From 76cf9643a66ef4d790dbd4459155c11d014c48af Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 17 Jul 2023 16:08:30 +0200 Subject: [PATCH 01/20] add prototype for fit_curve --- .../process_implementations/ml/__init__.py | 1 + .../ml/curve_fitting.py | 23 +++++++++++++ tests/test_ml.py | 32 ++++++++++++++++++- 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 openeo_processes_dask/process_implementations/ml/curve_fitting.py diff --git a/openeo_processes_dask/process_implementations/ml/__init__.py b/openeo_processes_dask/process_implementations/ml/__init__.py index 093eb051..9ce3ce0e 100644 --- a/openeo_processes_dask/process_implementations/ml/__init__.py +++ b/openeo_processes_dask/process_implementations/ml/__init__.py @@ -1 +1,2 @@ +from .curve_fitting import * from .random_forest import * diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py new file mode 100644 index 00000000..806fb2bf --- /dev/null +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -0,0 +1,23 @@ +from openeo_processes_dask.process_implementations.data_model import RasterCube +from openeo_processes_dask.process_implementations.exceptions import ( + DimensionNotAvailable, +) + +__all__ = ["fit_curve", "predict_curve"] + + +def fit_curve(data: RasterCube, parameters, function, dimension): + if dimension not in data.dims: + raise DimensionNotAvailable( + f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" + ) + + rechunked_data = data.chunk({dimension: -1}) + fit_result = rechunked_data.curvefit(dimension, function, p0=parameters).drop_dims( + ["cov_i", "cov_j"] + ) + return fit_result + + +def predict_curve(): + pass diff --git a/tests/test_ml.py b/tests/test_ml.py index 58e90ca2..82547820 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -1,7 +1,17 @@ +from functools import partial + import geopandas as gpd +import numpy as np +import pytest import xgboost as xgb +from openeo_pg_parser_networkx.pg_schema import ParameterReference -from openeo_processes_dask.process_implementations.ml import fit_regr_random_forest +from openeo_processes_dask.process_implementations.cubes.apply import apply_dimension +from openeo_processes_dask.process_implementations.ml import ( + fit_curve, + fit_regr_random_forest, +) +from tests.mockdata import create_fake_rastercube def test_fit_regr_random_forest(vector_data_cube, dask_client): @@ -32,3 +42,23 @@ def test_fit_regr_random_forest_inline_geojson( ) assert isinstance(model, xgb.core.Booster) + + +@pytest.mark.parametrize("size", [(6, 5, 4, 3)]) +@pytest.mark.parametrize("dtype", [np.float64]) +def test_fit_curve( + temporal_interval, bounding_box, random_raster_data, process_registry +): + origin_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04"], + backend="dask", + ) + + def fitFunction(t, a, b, c): + t0 = 2 * np.pi / 31557600 * t + return a + b * np.cos(t0) + c * np.sin(t0) + + result = fit_curve(origin_cube, parameters={}, function=fitFunction, dimension="t") From 40a7c43ccb2dfa24ccba6a5b200f2a3ac58631c2 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 17 Jul 2023 17:04:01 +0200 Subject: [PATCH 02/20] add proper parameter parsing --- .../ml/curve_fitting.py | 10 ++++++---- tests/test_ml.py | 19 +++++++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 806fb2bf..a2ffb257 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -6,16 +6,18 @@ __all__ = ["fit_curve", "predict_curve"] -def fit_curve(data: RasterCube, parameters, function, dimension): +def fit_curve(data: RasterCube, parameters: list, function, dimension): + parameters = {f"param_{i}": v for i, v in enumerate(parameters)} + if dimension not in data.dims: raise DimensionNotAvailable( f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" ) rechunked_data = data.chunk({dimension: -1}) - fit_result = rechunked_data.curvefit(dimension, function, p0=parameters).drop_dims( - ["cov_i", "cov_j"] - ) + fit_result = rechunked_data.curvefit( + dimension, function, p0=parameters, param_names=list(parameters.keys()) + ).drop_dims(["cov_i", "cov_j"]) return fit_result diff --git a/tests/test_ml.py b/tests/test_ml.py index 82547820..6b9d802c 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -6,6 +6,7 @@ import xgboost as xgb from openeo_pg_parser_networkx.pg_schema import ParameterReference +from openeo_processes_dask.process_implementations.core import process from openeo_processes_dask.process_implementations.cubes.apply import apply_dimension from openeo_processes_dask.process_implementations.ml import ( fit_curve, @@ -57,8 +58,18 @@ def test_fit_curve( backend="dask", ) - def fitFunction(t, a, b, c): - t0 = 2 * np.pi / 31557600 * t - return a + b * np.cos(t0) + c * np.sin(t0) + @process + def fitFunction(x, parameters): + t0 = 2 * np.pi / 31557600 * x + return parameters[0] + parameters[1] * np.cos(t0) + parameters[2] * np.sin(t0) - result = fit_curve(origin_cube, parameters={}, function=fitFunction, dimension="t") + _process = partial( + fitFunction, + x=ParameterReference(from_parameter="x"), + parameters=ParameterReference(from_parameter="parameters"), + ) + + result = fit_curve( + origin_cube, parameters=[0, 0, 0], function=fitFunction, dimension="t" + ) + assert len(result.param) == 3 From 9a66759de956acf212cee08ed4034a7ed5c61736 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 17 Jul 2023 17:05:25 +0200 Subject: [PATCH 03/20] use partial function in test --- tests/test_ml.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ml.py b/tests/test_ml.py index 6b9d802c..7f6faa4d 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -70,6 +70,6 @@ def fitFunction(x, parameters): ) result = fit_curve( - origin_cube, parameters=[0, 0, 0], function=fitFunction, dimension="t" + origin_cube, parameters=[0, 0, 0], function=_process, dimension="t" ) assert len(result.param) == 3 From e99dceb34bedc669e519adad0b5d9e936516d7f1 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Thu, 20 Jul 2023 16:12:45 +0200 Subject: [PATCH 04/20] add assert to test --- tests/test_ml.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_ml.py b/tests/test_ml.py index 7f6faa4d..bc222aad 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -1,5 +1,6 @@ from functools import partial +import dask import geopandas as gpd import numpy as np import pytest @@ -73,3 +74,4 @@ def fitFunction(x, parameters): origin_cube, parameters=[0, 0, 0], function=_process, dimension="t" ) assert len(result.param) == 3 + assert isinstance(result.to_array().data, dask.array.Array) From a55decd502693b9e2b1c85a5612c3506970a236e Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 24 Jul 2023 10:39:06 +0200 Subject: [PATCH 05/20] cleanups in fit_curve --- .../process_implementations/ml/curve_fitting.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index a2ffb257..a0b5c7b3 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -1,3 +1,5 @@ +from typing import Callable + from openeo_processes_dask.process_implementations.data_model import RasterCube from openeo_processes_dask.process_implementations.exceptions import ( DimensionNotAvailable, @@ -6,15 +8,21 @@ __all__ = ["fit_curve", "predict_curve"] -def fit_curve(data: RasterCube, parameters: list, function, dimension): - parameters = {f"param_{i}": v for i, v in enumerate(parameters)} - +def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: str): if dimension not in data.dims: raise DimensionNotAvailable( f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" ) + # In the spec, parameters is a list, but xr.curvefit requires names for them, + # so we do this to generate names locally + parameters = {f"param_{i}": v for i, v in enumerate(parameters)} + + # The dimension along which to predict cannot be chunked! rechunked_data = data.chunk({dimension: -1}) + + # .curvefit returns some extra information that isn't required by the OpenEO process + # so we simply drop these here. fit_result = rechunked_data.curvefit( dimension, function, p0=parameters, param_names=list(parameters.keys()) ).drop_dims(["cov_i", "cov_j"]) From 23a68b282d2f21469c30a29a3e99f9cedf3010a8 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Thu, 27 Jul 2023 12:40:31 +0200 Subject: [PATCH 06/20] minor edits --- .../process_implementations/ml/curve_fitting.py | 10 +++++++++- tests/test_ml.py | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index a0b5c7b3..4fb24e0c 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -1,5 +1,6 @@ from typing import Callable +from openeo_processes_dask.process_implementations.cubes import apply_dimension from openeo_processes_dask.process_implementations.data_model import RasterCube from openeo_processes_dask.process_implementations.exceptions import ( DimensionNotAvailable, @@ -29,5 +30,12 @@ def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: return fit_result -def predict_curve(): +def predict_curve( + data: RasterCube, + function: Callable, + parameters: RasterCube, + dimension: str, + labels: list, +): + # TODO: constrain prediction to nodata values pass diff --git a/tests/test_ml.py b/tests/test_ml.py index bc222aad..f334174a 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -48,7 +48,7 @@ def test_fit_regr_random_forest_inline_geojson( @pytest.mark.parametrize("size", [(6, 5, 4, 3)]) @pytest.mark.parametrize("dtype", [np.float64]) -def test_fit_curve( +def test_curve_fitting( temporal_interval, bounding_box, random_raster_data, process_registry ): origin_cube = create_fake_rastercube( From 991970abfb1921f013657114754e0a659ad56eb8 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Fri, 11 Aug 2023 16:58:45 +0200 Subject: [PATCH 07/20] progress with parameter passing --- .../process_implementations/core.py | 17 +++++++++++++++-- .../process_implementations/ml/curve_fitting.py | 12 +++++++++++- tests/test_ml.py | 3 ++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/openeo_processes_dask/process_implementations/core.py b/openeo_processes_dask/process_implementations/core.py index 1d78ec85..835c4b0f 100644 --- a/openeo_processes_dask/process_implementations/core.py +++ b/openeo_processes_dask/process_implementations/core.py @@ -44,14 +44,25 @@ def wrapper( resolved_args = [] resolved_kwargs = {} + already_resolved_params = [] # If an arg is specified in positional_parameters, put the correct key-value pair into named_parameters for arg_name, i in positional_parameters.items(): - named_parameters[arg_name] = args[i] + if isinstance(i, slice): + for n, arg_in_slice in enumerate(args[i]): + resolved_args.insert(i.start + n, arg_in_slice) + else: + resolved_args.insert(i, args[i]) + already_resolved_params.append(arg_name) for arg in args: if isinstance(arg, ParameterReference): - if arg.from_parameter in named_parameters: + if ( + arg.from_parameter in named_parameters + and arg.from_parameter not in already_resolved_params + ): resolved_args.append(named_parameters[arg.from_parameter]) + elif arg.from_parameter in already_resolved_params: + continue else: raise ProcessParameterMissing( f"Error: Process Parameter {arg.from_parameter} was missing for process {f.__name__}" @@ -61,6 +72,8 @@ def wrapper( if isinstance(arg, ParameterReference): if arg.from_parameter in named_parameters: resolved_kwargs[k] = named_parameters[arg.from_parameter] + elif arg.from_parameter in already_resolved_params: + continue else: raise ProcessParameterMissing( f"Error: Process Parameter {arg.from_parameter} was missing for process {f.__name__}" diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 4fb24e0c..011261ca 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -22,10 +22,20 @@ def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: # The dimension along which to predict cannot be chunked! rechunked_data = data.chunk({dimension: -1}) + def wrapper(f): + def _wrap(*args, **kwargs): + return f( + *args, + **kwargs, + positional_parameters={"x": 0, "parameters": slice(1, None)}, + ) + + return _wrap + # .curvefit returns some extra information that isn't required by the OpenEO process # so we simply drop these here. fit_result = rechunked_data.curvefit( - dimension, function, p0=parameters, param_names=list(parameters.keys()) + dimension, wrapper(function), p0=parameters, param_names=list(parameters.keys()) ).drop_dims(["cov_i", "cov_j"]) return fit_result diff --git a/tests/test_ml.py b/tests/test_ml.py index f334174a..159934d0 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -60,7 +60,7 @@ def test_curve_fitting( ) @process - def fitFunction(x, parameters): + def fitFunction(x, *parameters): t0 = 2 * np.pi / 31557600 * x return parameters[0] + parameters[1] * np.cos(t0) + parameters[2] * np.sin(t0) @@ -75,3 +75,4 @@ def fitFunction(x, parameters): ) assert len(result.param) == 3 assert isinstance(result.to_array().data, dask.array.Array) + result.compute() From f4e68f87f3d4f8dd3253c1539858b7c4e95441a0 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Sat, 12 Aug 2023 22:31:08 +0200 Subject: [PATCH 08/20] revert changes to process decorator --- .../process_implementations/core.py | 17 ++--------------- tests/test_ml.py | 4 ++-- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/openeo_processes_dask/process_implementations/core.py b/openeo_processes_dask/process_implementations/core.py index 835c4b0f..1d78ec85 100644 --- a/openeo_processes_dask/process_implementations/core.py +++ b/openeo_processes_dask/process_implementations/core.py @@ -44,25 +44,14 @@ def wrapper( resolved_args = [] resolved_kwargs = {} - already_resolved_params = [] # If an arg is specified in positional_parameters, put the correct key-value pair into named_parameters for arg_name, i in positional_parameters.items(): - if isinstance(i, slice): - for n, arg_in_slice in enumerate(args[i]): - resolved_args.insert(i.start + n, arg_in_slice) - else: - resolved_args.insert(i, args[i]) - already_resolved_params.append(arg_name) + named_parameters[arg_name] = args[i] for arg in args: if isinstance(arg, ParameterReference): - if ( - arg.from_parameter in named_parameters - and arg.from_parameter not in already_resolved_params - ): + if arg.from_parameter in named_parameters: resolved_args.append(named_parameters[arg.from_parameter]) - elif arg.from_parameter in already_resolved_params: - continue else: raise ProcessParameterMissing( f"Error: Process Parameter {arg.from_parameter} was missing for process {f.__name__}" @@ -72,8 +61,6 @@ def wrapper( if isinstance(arg, ParameterReference): if arg.from_parameter in named_parameters: resolved_kwargs[k] = named_parameters[arg.from_parameter] - elif arg.from_parameter in already_resolved_params: - continue else: raise ProcessParameterMissing( f"Error: Process Parameter {arg.from_parameter} was missing for process {f.__name__}" diff --git a/tests/test_ml.py b/tests/test_ml.py index 159934d0..8d2bd019 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -60,7 +60,7 @@ def test_curve_fitting( ) @process - def fitFunction(x, *parameters): + def fitFunction(x, parameters): t0 = 2 * np.pi / 31557600 * x return parameters[0] + parameters[1] * np.cos(t0) + parameters[2] * np.sin(t0) @@ -75,4 +75,4 @@ def fitFunction(x, *parameters): ) assert len(result.param) == 3 assert isinstance(result.to_array().data, dask.array.Array) - result.compute() + output = result.compute() From 7b903c6d9bf6d81d2836b6e62d8bd0708159c230 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Sun, 13 Aug 2023 00:55:39 +0200 Subject: [PATCH 09/20] add to tests --- .../process_implementations/ml/curve_fitting.py | 14 +++++++++++--- tests/test_ml.py | 10 ++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 011261ca..093a5474 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -34,9 +34,17 @@ def _wrap(*args, **kwargs): # .curvefit returns some extra information that isn't required by the OpenEO process # so we simply drop these here. - fit_result = rechunked_data.curvefit( - dimension, wrapper(function), p0=parameters, param_names=list(parameters.keys()) - ).drop_dims(["cov_i", "cov_j"]) + fit_result = ( + rechunked_data.curvefit( + dimension, + wrapper(function), + p0=parameters, + param_names=list(parameters.keys()), + ) + .drop_dims(["cov_i", "cov_j"]) + .to_array() + .squeeze() + ) return fit_result diff --git a/tests/test_ml.py b/tests/test_ml.py index 8d2bd019..82fa8b28 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -70,9 +70,15 @@ def fitFunction(x, parameters): parameters=ParameterReference(from_parameter="parameters"), ) + parameters = [1, 0, 0] result = fit_curve( - origin_cube, parameters=[0, 0, 0], function=_process, dimension="t" + origin_cube, parameters=parameters, function=_process, dimension="t" ) assert len(result.param) == 3 - assert isinstance(result.to_array().data, dask.array.Array) + assert isinstance(result.data, dask.array.Array) output = result.compute() + + assert len(output.coords["bands"]) == len(origin_cube.coords["bands"]) + assert len(output.coords["x"]) == len(origin_cube.coords["x"]) + assert len(output.coords["y"]) == len(origin_cube.coords["y"]) + assert len(output.coords["param"]) == len(parameters) From f626450b5826f93a57e3d649ac808afd578ea2c9 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Sun, 13 Aug 2023 04:29:47 +0200 Subject: [PATCH 10/20] progress on predict --- .../ml/curve_fitting.py | 31 ++++++++++++++++--- tests/test_ml.py | 5 +++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 093a5474..871a7ee5 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -1,4 +1,7 @@ -from typing import Callable +from typing import Callable, Optional + +import numpy as np +import xarray as xr from openeo_processes_dask.process_implementations.cubes import apply_dimension from openeo_processes_dask.process_implementations.data_model import RasterCube @@ -53,7 +56,27 @@ def predict_curve( function: Callable, parameters: RasterCube, dimension: str, - labels: list, + labels: Optional[list] = None, ): - # TODO: constrain prediction to nodata values - pass + # data: (x, y, t, bands) + # parameters: (x, y, bands, param) + labels = data.coords[dimension] + if np.issubdtype(labels.coords[dimension].dtype, np.datetime64): + labels = labels.astype(int) + + def wrapper(f): + def _wrap(*args, **kwargs): + return f( + parameters=args[0], + x=labels, + ) + + return _wrap + + xr.apply_ufunc( + wrapper(function), + parameters, + input_core_dims=[["param"]], + output_core_dims=[[dimension]], + dask="allowed", + ) diff --git a/tests/test_ml.py b/tests/test_ml.py index 82fa8b28..22d26b07 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -12,6 +12,7 @@ from openeo_processes_dask.process_implementations.ml import ( fit_curve, fit_regr_random_forest, + predict_curve, ) from tests.mockdata import create_fake_rastercube @@ -82,3 +83,7 @@ def fitFunction(x, parameters): assert len(output.coords["x"]) == len(origin_cube.coords["x"]) assert len(output.coords["y"]) == len(origin_cube.coords["y"]) assert len(output.coords["param"]) == len(parameters) + + predictions = predict_curve( + origin_cube, _process, output, origin_cube.openeo.temporal_dims[0] + ) From 623626b13cb61d4177b735d0d2afbde2fc31be22 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 14 Aug 2023 14:41:54 +0200 Subject: [PATCH 11/20] get predict to work --- .../ml/curve_fitting.py | 26 ++++++++++++------- tests/test_ml.py | 21 ++++++++------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 871a7ee5..dc1a0d13 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -2,6 +2,7 @@ import numpy as np import xarray as xr +from numpy.typing import ArrayLike from openeo_processes_dask.process_implementations.cubes import apply_dimension from openeo_processes_dask.process_implementations.data_model import RasterCube @@ -52,31 +53,36 @@ def _wrap(*args, **kwargs): def predict_curve( - data: RasterCube, - function: Callable, parameters: RasterCube, + function: Callable, dimension: str, - labels: Optional[list] = None, + labels: Optional[ArrayLike] = None, ): - # data: (x, y, t, bands) # parameters: (x, y, bands, param) - labels = data.coords[dimension] - if np.issubdtype(labels.coords[dimension].dtype, np.datetime64): + if np.issubdtype(labels.dtype, np.datetime64): labels = labels.astype(int) def wrapper(f): def _wrap(*args, **kwargs): return f( - parameters=args[0], - x=labels, + *args, + positional_parameters={"parameters": 0}, + named_parameters={"x": labels}, + **kwargs, ) return _wrap - xr.apply_ufunc( + return xr.apply_ufunc( wrapper(function), parameters, + vectorize=True, input_core_dims=[["param"]], output_core_dims=[[dimension]], - dask="allowed", + dask="parallelized", + output_dtypes=[np.float32], + dask_gufunc_kwargs={ + "allow_rechunk": True, + "output_sizes": {dimension: len(labels)}, + }, ) diff --git a/tests/test_ml.py b/tests/test_ml.py index 22d26b07..2231639f 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -9,6 +9,7 @@ from openeo_processes_dask.process_implementations.core import process from openeo_processes_dask.process_implementations.cubes.apply import apply_dimension +from openeo_processes_dask.process_implementations.cubes.general import dimension_labels from openeo_processes_dask.process_implementations.ml import ( fit_curve, fit_regr_random_forest, @@ -49,9 +50,7 @@ def test_fit_regr_random_forest_inline_geojson( @pytest.mark.parametrize("size", [(6, 5, 4, 3)]) @pytest.mark.parametrize("dtype", [np.float64]) -def test_curve_fitting( - temporal_interval, bounding_box, random_raster_data, process_registry -): +def test_curve_fitting(temporal_interval, bounding_box, random_raster_data): origin_cube = create_fake_rastercube( data=random_raster_data, spatial_extent=bounding_box, @@ -77,13 +76,15 @@ def fitFunction(x, parameters): ) assert len(result.param) == 3 assert isinstance(result.data, dask.array.Array) - output = result.compute() - assert len(output.coords["bands"]) == len(origin_cube.coords["bands"]) - assert len(output.coords["x"]) == len(origin_cube.coords["x"]) - assert len(output.coords["y"]) == len(origin_cube.coords["y"]) - assert len(output.coords["param"]) == len(parameters) + assert len(result.coords["bands"]) == len(origin_cube.coords["bands"]) + assert len(result.coords["x"]) == len(origin_cube.coords["x"]) + assert len(result.coords["y"]) == len(origin_cube.coords["y"]) + assert len(result.coords["param"]) == len(parameters) predictions = predict_curve( - origin_cube, _process, output, origin_cube.openeo.temporal_dims[0] - ) + result, + _process, + origin_cube.openeo.temporal_dims[0], + labels=dimension_labels(origin_cube, "t"), + ).compute() From a40449e4b232e2b483479810175809532edce449 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 14 Aug 2023 14:45:28 +0200 Subject: [PATCH 12/20] remove comment --- .../process_implementations/ml/curve_fitting.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index dc1a0d13..5ca140f0 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -58,7 +58,6 @@ def predict_curve( dimension: str, labels: Optional[ArrayLike] = None, ): - # parameters: (x, y, bands, param) if np.issubdtype(labels.dtype, np.datetime64): labels = labels.astype(int) @@ -80,7 +79,7 @@ def _wrap(*args, **kwargs): input_core_dims=[["param"]], output_core_dims=[[dimension]], dask="parallelized", - output_dtypes=[np.float32], + output_dtypes=[np.float64], dask_gufunc_kwargs={ "allow_rechunk": True, "output_sizes": {dimension: len(labels)}, From f42f812bf8da48947fa6ea741b11eb015dc42492 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 14 Aug 2023 15:03:38 +0200 Subject: [PATCH 13/20] fix up output datacube --- .../process_implementations/ml/curve_fitting.py | 12 +++++++++++- tests/test_ml.py | 4 +++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 5ca140f0..950d9b29 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -1,6 +1,7 @@ from typing import Callable, Optional import numpy as np +import pandas as pd import xarray as xr from numpy.typing import ArrayLike @@ -58,8 +59,10 @@ def predict_curve( dimension: str, labels: Optional[ArrayLike] = None, ): + labels_were_datetime = False if np.issubdtype(labels.dtype, np.datetime64): labels = labels.astype(int) + labels_were_datetime = True def wrapper(f): def _wrap(*args, **kwargs): @@ -72,7 +75,7 @@ def _wrap(*args, **kwargs): return _wrap - return xr.apply_ufunc( + predictions = xr.apply_ufunc( wrapper(function), parameters, vectorize=True, @@ -85,3 +88,10 @@ def _wrap(*args, **kwargs): "output_sizes": {dimension: len(labels)}, }, ) + + predictions = predictions.assign_coords({dimension: labels.data}) + + if labels_were_datetime: + predictions[dimension] = pd.DatetimeIndex(predictions[dimension].values) + + return predictions diff --git a/tests/test_ml.py b/tests/test_ml.py index 2231639f..af005a45 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -86,5 +86,7 @@ def fitFunction(x, parameters): result, _process, origin_cube.openeo.temporal_dims[0], - labels=dimension_labels(origin_cube, "t"), + labels=dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0]), ).compute() + + assert True From 688a3f120bb60563a610679fc7a8980abc75b9ff Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 14 Aug 2023 15:14:15 +0200 Subject: [PATCH 14/20] add assertions --- .../process_implementations/ml/curve_fitting.py | 1 + tests/test_ml.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 950d9b29..873f7d0e 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -64,6 +64,7 @@ def predict_curve( labels = labels.astype(int) labels_were_datetime = True + # This is necessary to pipe the arguments correctly through @process def wrapper(f): def _wrap(*args, **kwargs): return f( diff --git a/tests/test_ml.py b/tests/test_ml.py index af005a45..1325bace 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -82,11 +82,13 @@ def fitFunction(x, parameters): assert len(result.coords["y"]) == len(origin_cube.coords["y"]) assert len(result.coords["param"]) == len(parameters) + labels = dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0]) predictions = predict_curve( result, _process, origin_cube.openeo.temporal_dims[0], - labels=dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0]), + labels=labels, ).compute() - assert True + assert len(predictions.coords[origin_cube.openeo.temporal_dims[0]]) == len(labels) + assert "param" not in predictions.dims From 0b2b64adb77d0eb81da2dc3b11f86b41d3d081ef Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 14 Aug 2023 15:43:35 +0200 Subject: [PATCH 15/20] preserve dimension order --- .../process_implementations/ml/curve_fitting.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 873f7d0e..97c547a7 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -20,6 +20,8 @@ def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" ) + dims_before = list(data.dims) + # In the spec, parameters is a list, but xr.curvefit requires names for them, # so we do this to generate names locally parameters = {f"param_{i}": v for i, v in enumerate(parameters)} @@ -37,6 +39,9 @@ def _wrap(*args, **kwargs): return _wrap + expected_dims_after = list(dims_before) + expected_dims_after[dims_before.index(dimension)] = "param" + # .curvefit returns some extra information that isn't required by the OpenEO process # so we simply drop these here. fit_result = ( @@ -49,7 +54,9 @@ def _wrap(*args, **kwargs): .drop_dims(["cov_i", "cov_j"]) .to_array() .squeeze() + .transpose(*expected_dims_after) ) + return fit_result @@ -60,6 +67,8 @@ def predict_curve( labels: Optional[ArrayLike] = None, ): labels_were_datetime = False + dims_before = list(parameters.dims) + if np.issubdtype(labels.dtype, np.datetime64): labels = labels.astype(int) labels_were_datetime = True @@ -76,6 +85,9 @@ def _wrap(*args, **kwargs): return _wrap + expected_dims_after = list(dims_before) + expected_dims_after[dims_before.index("param")] = dimension + predictions = xr.apply_ufunc( wrapper(function), parameters, @@ -88,7 +100,7 @@ def _wrap(*args, **kwargs): "allow_rechunk": True, "output_sizes": {dimension: len(labels)}, }, - ) + ).transpose(*expected_dims_after) predictions = predictions.assign_coords({dimension: labels.data}) From 1fbbdd581dc0acee86c52c7fd7c71ffa48d7f9a9 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Thu, 17 Aug 2023 13:57:21 +0200 Subject: [PATCH 16/20] add cast to datetime if appropriate --- .../process_implementations/ml/curve_fitting.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 97c547a7..a428f561 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -64,11 +64,17 @@ def predict_curve( parameters: RasterCube, function: Callable, dimension: str, - labels: Optional[ArrayLike] = None, + labels: ArrayLike, ): labels_were_datetime = False dims_before = list(parameters.dims) + try: + # Try parsing as datetime first + labels = np.asarray(labels, dtype=np.datetime64) + except ValueError: + labels = np.asarray(labels) + if np.issubdtype(labels.dtype, np.datetime64): labels = labels.astype(int) labels_were_datetime = True From 021e459a097dac9b68f4b6b380238c670cce1f9c Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Thu, 17 Aug 2023 13:59:29 +0200 Subject: [PATCH 17/20] keep attrs --- .../process_implementations/ml/curve_fitting.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index a428f561..a0cd7f05 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -57,6 +57,8 @@ def _wrap(*args, **kwargs): .transpose(*expected_dims_after) ) + fit_result.attrs = data.attrs + return fit_result From 407f4fe39034207419071f04ce0337e84a1b7352 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Mon, 21 Aug 2023 11:11:39 +0200 Subject: [PATCH 18/20] fix typo --- .../process_implementations/ml/curve_fitting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index a0cd7f05..5a27f701 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -26,7 +26,7 @@ def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: # so we do this to generate names locally parameters = {f"param_{i}": v for i, v in enumerate(parameters)} - # The dimension along which to predict cannot be chunked! + # The dimension along which to fit the curves cannot be chunked! rechunked_data = data.chunk({dimension: -1}) def wrapper(f): From 53fbfbd327985aaf39ab577b1aa04278121ee186 Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Tue, 22 Aug 2023 14:18:24 +0200 Subject: [PATCH 19/20] add ignore_nodata --- .../process_implementations/ml/curve_fitting.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py index 5a27f701..425ce030 100644 --- a/openeo_processes_dask/process_implementations/ml/curve_fitting.py +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -14,7 +14,13 @@ __all__ = ["fit_curve", "predict_curve"] -def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: str): +def fit_curve( + data: RasterCube, + parameters: list, + function: Callable, + dimension: str, + ignore_nodata: bool = True, +): if dimension not in data.dims: raise DimensionNotAvailable( f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" @@ -50,6 +56,7 @@ def _wrap(*args, **kwargs): wrapper(function), p0=parameters, param_names=list(parameters.keys()), + skipna=ignore_nodata, ) .drop_dims(["cov_i", "cov_j"]) .to_array() From 20a83262efff21297954f56f7cf357dc846d15fa Mon Sep 17 00:00:00 2001 From: Lukas Weidenholzer Date: Tue, 22 Aug 2023 14:37:40 +0200 Subject: [PATCH 20/20] bump submodule --- openeo_processes_dask/specs/openeo-processes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 51cdcd8b..ad4be393 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 51cdcd8b8bb79cf5caf89222b9c2ac8a4effb89b +Subproject commit ad4be3937b799a42c291fa3ad1d87572f4bcd026