From aa38b2c579d1c079fd983d93aa96a5e5079573f0 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 20:31:38 +0300 Subject: [PATCH 01/47] Introduce Preprocessor --- gordo/machine/dataset/datasets.py | 11 ++++ gordo/machine/dataset/preprocessor.py | 80 +++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 gordo/machine/dataset/preprocessor.py diff --git a/gordo/machine/dataset/datasets.py b/gordo/machine/dataset/datasets.py index c9350108c..4e6f1c4e4 100644 --- a/gordo/machine/dataset/datasets.py +++ b/gordo/machine/dataset/datasets.py @@ -25,6 +25,7 @@ ValidDatasetKwargs, ValidDataProvider, ) +from .preprocessor import Preprocessor, normalize_preprocessor logger = logging.getLogger(__name__) @@ -89,6 +90,7 @@ def __init__( n_samples_threshold: int = 0, low_threshold=-1000, high_threshold=50000, + preprocessor: Optional[Union[Preprocessor, Dict]] = None, **_kwargs, ): """ @@ -170,6 +172,7 @@ def __init__( self.n_samples_threshold = n_samples_threshold self.low_threshold = low_threshold self.high_threshold = high_threshold + self.preprocessor = normalize_preprocessor(preprocessor) if not self.train_start_date.tzinfo or not self.train_end_date.tzinfo: raise ValueError( @@ -195,12 +198,17 @@ def _validate_dt(dt: Union[str, datetime]) -> datetime: def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: + preprocessor = self.preprocessor + series_iter: Iterable[pd.Series] = self.data_provider.load_series( train_start_date=self.train_start_date, train_end_date=self.train_end_date, tag_list=list(set(self.tag_list + self.target_tag_list)), ) + if preprocessor is not None: + series_iter = preprocessor.prepare_series(series_iter) + # Resample if we have a resolution set, otherwise simply join the series. if self.resolution: data = self.join_timeseries( @@ -242,6 +250,9 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: f"specified required threshold for number of rows ({self.n_samples_threshold})." ) + if preprocessor is not None: + data = preprocessor.prepare_data(data) + x_tag_names = [tag.name for tag in self.tag_list] y_tag_names = [tag.name for tag in self.target_tag_list] diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py new file mode 100644 index 000000000..98b7ba7f1 --- /dev/null +++ b/gordo/machine/dataset/preprocessor.py @@ -0,0 +1,80 @@ +import pandas as pd + +from typing import Union, Iterable +from copy import deepcopy +from abc import ABCMeta, abstractmethod +from collections import defaultdict + +_types = {} + + +def preprocessor(preprocessor_type): + def wrapper(cls): + if preprocessor_type in _types: + raise ValueError("Preprocessor with name '%s' has already been added" % preprocessor_type) + _types[preprocessor_type] = cls + return cls + return wrapper + + +def create_preprocessor(preprocessor_type, *args, **kwargs): + if preprocessor_type not in _types: + raise ValueError("Can't find a preprocessor with name '%s'" % preprocessor_type) + return _types[preprocessor_type](*args, **kwargs) + + +def normalize_preprocessor(value): + if isinstance(value, dict): + if 'type' not in value: + raise ValueError("A preprocessor type is empty") + value = deepcopy(value) + preprocessor_type = value.pop('type') + return create_preprocessor(preprocessor_type, **value) + return value + + +class Preprocessor(metaclass=ABCMeta): + @abstractmethod + def reset(self): + ... + + @abstractmethod + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: + ... + + @abstractmethod + def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + ... + + +@preprocessor('fill_gaps') +class FillGapsPreprocessor(Preprocessor): + + def __init__(self, gap_size, replace_value): + if isinstance(gap_size, str): + gap_size = pd.Timedelta(gap_size) + self.gap_size = gap_size + self.replace_value = replace_value + self._gaps = defaultdict(list) + + def reset(self): + self._gaps = defaultdict(list) + + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: + result = [] + for value in series: + result.append(value) + name = value.name + idx = value.index.to_series() + df = pd.concat([idx, idx.diff().rename('Diff')], axis=1) + filtered_df = df[df['Diff'] > self.gap_size] + gaps = ((row['Time'], row['Time']+row['Diff']) for _, row in filtered_df.iterrows()) + self._gaps[name].extend(gaps) + return result + + def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + for name, gaps in self._gaps.items(): + for gap_start, gap_end in gaps: + df.iloc[(df.index > gap_start) & (df.index < gap_end), df.columns.get_loc(name)] = self.replace_value + return df + From c4dc4f5f1b94bd2588e849924c2b120b0a5e9c3c Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 20:54:39 +0300 Subject: [PATCH 02/47] Add Masking for LSTM models --- .../machine/model/factories/lstm_autoencoder.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index adfe0c64f..c9ad98db8 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -from typing import Tuple, Union, Dict, Any +from typing import Tuple, Union, Dict, Any, Optional import tensorflow from tensorflow import keras from tensorflow.keras.optimizers import Optimizer -from tensorflow.keras.layers import Dense, LSTM +from tensorflow.keras.layers import Dense, LSTM, Masking from tensorflow.keras.models import Sequential as KerasSequential from gordo.machine.model.register import register_model_builder @@ -26,6 +26,7 @@ def lstm_model( optimizer: Union[str, Optimizer] = "Adam", optimizer_kwargs: Dict[str, Any] = dict(), compile_kwargs: Dict[str, Any] = dict(), + mask_value: Optional[float] = None, **kwargs, ) -> tensorflow.keras.models.Sequential: """ @@ -63,6 +64,8 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are default values will be used. compile_kwargs: Dict[str, Any] Parameters to pass to ``keras.Model.compile``. + mask_value: Optional[float] + Add Masking layer with this mask_value Returns ------- @@ -71,17 +74,23 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are """ n_features_out = n_features_out or n_features + with_masking = mask_value is not None check_dim_func_len("encoding", encoding_dim, encoding_func) check_dim_func_len("decoding", decoding_dim, decoding_func) model = KerasSequential() + if with_masking: + input_shape = (lookback_window, n_features) + model.add(Masking(mask_value=mask_value, input_shape=input_shape)) + # encoding layers kwargs = {"return_sequences": True} for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)): - input_shape = (lookback_window, n_neurons if i != 0 else n_features) - kwargs.update(dict(activation=activation, input_shape=input_shape)) + input_shape = (lookback_window, n_neurons if not with_masking and i != 0 else n_features) + kwargs["activation"] = activation + kwargs["input_shape "] = input_shape model.add(LSTM(n_neurons, **kwargs)) # decoding layers From a497c793313be4b62bad4fdc76b2fc2e99d3d2e6 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 21:09:57 +0300 Subject: [PATCH 03/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 27 ++++++++++++------- .../model/factories/lstm_autoencoder.py | 5 +++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 98b7ba7f1..023ea869f 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -11,9 +11,12 @@ def preprocessor(preprocessor_type): def wrapper(cls): if preprocessor_type in _types: - raise ValueError("Preprocessor with name '%s' has already been added" % preprocessor_type) + raise ValueError( + "Preprocessor with name '%s' has already been added" % preprocessor_type + ) _types[preprocessor_type] = cls return cls + return wrapper @@ -25,10 +28,10 @@ def create_preprocessor(preprocessor_type, *args, **kwargs): def normalize_preprocessor(value): if isinstance(value, dict): - if 'type' not in value: + if "type" not in value: raise ValueError("A preprocessor type is empty") value = deepcopy(value) - preprocessor_type = value.pop('type') + preprocessor_type = value.pop("type") return create_preprocessor(preprocessor_type, **value) return value @@ -47,9 +50,8 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: ... -@preprocessor('fill_gaps') +@preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): - def __init__(self, gap_size, replace_value): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) @@ -66,15 +68,20 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: result.append(value) name = value.name idx = value.index.to_series() - df = pd.concat([idx, idx.diff().rename('Diff')], axis=1) - filtered_df = df[df['Diff'] > self.gap_size] - gaps = ((row['Time'], row['Time']+row['Diff']) for _, row in filtered_df.iterrows()) + df = pd.concat([idx, idx.diff().rename("Diff")], axis=1) + filtered_df = df[df["Diff"] > self.gap_size] + gaps = ( + (row["Time"], row["Time"] + row["Diff"]) + for _, row in filtered_df.iterrows() + ) self._gaps[name].extend(gaps) return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: for name, gaps in self._gaps.items(): for gap_start, gap_end in gaps: - df.iloc[(df.index > gap_start) & (df.index < gap_end), df.columns.get_loc(name)] = self.replace_value + df.iloc[ + (df.index > gap_start) & (df.index < gap_end), + df.columns.get_loc(name), + ] = self.replace_value return df - diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index c9ad98db8..0114e26a3 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -88,7 +88,10 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are # encoding layers kwargs = {"return_sequences": True} for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)): - input_shape = (lookback_window, n_neurons if not with_masking and i != 0 else n_features) + input_shape = ( + lookback_window, + n_neurons if not with_masking and i != 0 else n_features, + ) kwargs["activation"] = activation kwargs["input_shape "] = input_shape model.add(LSTM(n_neurons, **kwargs)) From 11112ce8cb482786cd4d34989ed51636b244efcc Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 21:16:41 +0300 Subject: [PATCH 04/47] Fix typo --- gordo/machine/model/factories/lstm_autoencoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index 0114e26a3..5ee673ee3 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -93,7 +93,7 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are n_neurons if not with_masking and i != 0 else n_features, ) kwargs["activation"] = activation - kwargs["input_shape "] = input_shape + kwargs["input_shape"] = input_shape model.add(LSTM(n_neurons, **kwargs)) # decoding layers From 304011750bba278348e3b3db6f81bb78f6895461 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 21:26:22 +0300 Subject: [PATCH 05/47] Additional logger --- gordo/machine/dataset/preprocessor.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 023ea869f..36d79e014 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,10 +1,13 @@ +import logging import pandas as pd -from typing import Union, Iterable +from typing import Iterable from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict +logger = logging.getLogger(__name__) + _types = {} @@ -74,7 +77,15 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: (row["Time"], row["Time"] + row["Diff"]) for _, row in filtered_df.iterrows() ) + self._gaps[name].extend(gaps) + for name, gaps in self._gaps.items(): + logger.info( + "Found %d gap%s in '%s' time-series", + len(gaps), + "s" if len(gaps) > 1 else "", + name, + ) return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: From c88672d48e14751ec8bf7db9093684ffd1268743 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 23:41:58 +0300 Subject: [PATCH 06/47] Fix with_masking condition in lstm_model() --- gordo/machine/model/factories/lstm_autoencoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index 5ee673ee3..0bd0ac268 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -90,7 +90,7 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)): input_shape = ( lookback_window, - n_neurons if not with_masking and i != 0 else n_features, + n_neurons if with_masking or i != 0 else n_features, ) kwargs["activation"] = activation kwargs["input_shape"] = input_shape From a163262eaee54461d24d3194ebcc2a06fe344839 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 23:50:25 +0300 Subject: [PATCH 07/47] Fixing test test_machine_from_config() --- tests/gordo/workflow/test_config_elements.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/gordo/workflow/test_config_elements.py b/tests/gordo/workflow/test_config_elements.py index cb5269805..02bc6c3dc 100644 --- a/tests/gordo/workflow/test_config_elements.py +++ b/tests/gordo/workflow/test_config_elements.py @@ -150,6 +150,7 @@ def test_machine_from_config(default_globals: dict): "target_tag_list": ["GRA-TE -123-456"], "train_end_date": "2018-01-02T09:00:30+00:00", "train_start_date": "2018-01-01T09:00:30+00:00", + "preprocessor": None, "type": "TimeSeriesDataset", }, "evaluation": { From d15ddc32c12794fbf847a71b46e36be236426a8b Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:41:19 +0300 Subject: [PATCH 08/47] Additional anotations --- gordo/machine/dataset/preprocessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 36d79e014..eaad4b7d8 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,14 +1,14 @@ import logging import pandas as pd -from typing import Iterable +from typing import Iterable, Dict, Tuple, Union from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict logger = logging.getLogger(__name__) -_types = {} +_types: Dict[str, Tuple[pd.Timestamp, pd.Timestamp]] = {} def preprocessor(preprocessor_type): @@ -55,7 +55,7 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): - def __init__(self, gap_size, replace_value): + def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size From 8115ac7010c8e2ad82f81ccfffaee51da7a4e466 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:55:37 +0300 Subject: [PATCH 09/47] Fixing type anotation --- gordo/machine/dataset/preprocessor.py | 33 ++++++++++++++------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index eaad4b7d8..f3154791d 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,14 +1,29 @@ import logging import pandas as pd -from typing import Iterable, Dict, Tuple, Union +from typing import Iterable, Dict, Tuple, Union, Type from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict logger = logging.getLogger(__name__) -_types: Dict[str, Tuple[pd.Timestamp, pd.Timestamp]] = {} + +class Preprocessor(metaclass=ABCMeta): + @abstractmethod + def reset(self): + ... + + @abstractmethod + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: + ... + + @abstractmethod + def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + ... + + +_types: Dict[str, Type[Preprocessor]] = {} def preprocessor(preprocessor_type): @@ -39,20 +54,6 @@ def normalize_preprocessor(value): return value -class Preprocessor(metaclass=ABCMeta): - @abstractmethod - def reset(self): - ... - - @abstractmethod - def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: - ... - - @abstractmethod - def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: - ... - - @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): From 9ea6bad59f49eca12313f18f6d799e4d92058998 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:58:36 +0300 Subject: [PATCH 10/47] Fixing type anotation for _gap variable --- gordo/machine/dataset/preprocessor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index f3154791d..0fd6e348e 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,7 +1,7 @@ import logging import pandas as pd -from typing import Iterable, Dict, Tuple, Union, Type +from typing import Iterable, Dict, Tuple, Union, Type, List from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict @@ -61,7 +61,7 @@ def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.replace_value = replace_value - self._gaps = defaultdict(list) + self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict(list) def reset(self): self._gaps = defaultdict(list) From b47a7053d1172d809f6de291fc955cdbdc5ab8e7 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:59:34 +0300 Subject: [PATCH 11/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 0fd6e348e..9fc9561cf 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -61,7 +61,9 @@ def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.replace_value = replace_value - self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict(list) + self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( + list + ) def reset(self): self._gaps = defaultdict(list) From 92504fc5e55d15f6430a364f9434fcf911c4cf8d Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 11:22:25 +0300 Subject: [PATCH 12/47] Trying to ignore mypy --- gordo/machine/dataset/preprocessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 9fc9561cf..b00e57a47 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -82,11 +82,11 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: ) self._gaps[name].extend(gaps) - for name, gaps in self._gaps.items(): + for name, gaps in self._gaps.items(): # type: ignore logger.info( "Found %d gap%s in '%s' time-series", - len(gaps), - "s" if len(gaps) > 1 else "", + len(gaps), # type: ignore + "s" if len(gaps) > 1 else "", # type: ignore name, ) return result From bca1f82a131a7d4c4d2e596a2f123675b48941af Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 11:23:35 +0300 Subject: [PATCH 13/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index b00e57a47..68b726c32 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -82,11 +82,11 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: ) self._gaps[name].extend(gaps) - for name, gaps in self._gaps.items(): # type: ignore + for name, gaps in self._gaps.items(): # type: ignore logger.info( "Found %d gap%s in '%s' time-series", - len(gaps), # type: ignore - "s" if len(gaps) > 1 else "", # type: ignore + len(gaps), # type: ignore + "s" if len(gaps) > 1 else "", # type: ignore name, ) return result From b1de5dca898be415d3bd292897acfefb2a0e8036 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 20:31:38 +0300 Subject: [PATCH 14/47] Introduce Preprocessor --- gordo/machine/dataset/datasets.py | 11 ++++ gordo/machine/dataset/preprocessor.py | 80 +++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 gordo/machine/dataset/preprocessor.py diff --git a/gordo/machine/dataset/datasets.py b/gordo/machine/dataset/datasets.py index c9350108c..4e6f1c4e4 100644 --- a/gordo/machine/dataset/datasets.py +++ b/gordo/machine/dataset/datasets.py @@ -25,6 +25,7 @@ ValidDatasetKwargs, ValidDataProvider, ) +from .preprocessor import Preprocessor, normalize_preprocessor logger = logging.getLogger(__name__) @@ -89,6 +90,7 @@ def __init__( n_samples_threshold: int = 0, low_threshold=-1000, high_threshold=50000, + preprocessor: Optional[Union[Preprocessor, Dict]] = None, **_kwargs, ): """ @@ -170,6 +172,7 @@ def __init__( self.n_samples_threshold = n_samples_threshold self.low_threshold = low_threshold self.high_threshold = high_threshold + self.preprocessor = normalize_preprocessor(preprocessor) if not self.train_start_date.tzinfo or not self.train_end_date.tzinfo: raise ValueError( @@ -195,12 +198,17 @@ def _validate_dt(dt: Union[str, datetime]) -> datetime: def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: + preprocessor = self.preprocessor + series_iter: Iterable[pd.Series] = self.data_provider.load_series( train_start_date=self.train_start_date, train_end_date=self.train_end_date, tag_list=list(set(self.tag_list + self.target_tag_list)), ) + if preprocessor is not None: + series_iter = preprocessor.prepare_series(series_iter) + # Resample if we have a resolution set, otherwise simply join the series. if self.resolution: data = self.join_timeseries( @@ -242,6 +250,9 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: f"specified required threshold for number of rows ({self.n_samples_threshold})." ) + if preprocessor is not None: + data = preprocessor.prepare_data(data) + x_tag_names = [tag.name for tag in self.tag_list] y_tag_names = [tag.name for tag in self.target_tag_list] diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py new file mode 100644 index 000000000..98b7ba7f1 --- /dev/null +++ b/gordo/machine/dataset/preprocessor.py @@ -0,0 +1,80 @@ +import pandas as pd + +from typing import Union, Iterable +from copy import deepcopy +from abc import ABCMeta, abstractmethod +from collections import defaultdict + +_types = {} + + +def preprocessor(preprocessor_type): + def wrapper(cls): + if preprocessor_type in _types: + raise ValueError("Preprocessor with name '%s' has already been added" % preprocessor_type) + _types[preprocessor_type] = cls + return cls + return wrapper + + +def create_preprocessor(preprocessor_type, *args, **kwargs): + if preprocessor_type not in _types: + raise ValueError("Can't find a preprocessor with name '%s'" % preprocessor_type) + return _types[preprocessor_type](*args, **kwargs) + + +def normalize_preprocessor(value): + if isinstance(value, dict): + if 'type' not in value: + raise ValueError("A preprocessor type is empty") + value = deepcopy(value) + preprocessor_type = value.pop('type') + return create_preprocessor(preprocessor_type, **value) + return value + + +class Preprocessor(metaclass=ABCMeta): + @abstractmethod + def reset(self): + ... + + @abstractmethod + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: + ... + + @abstractmethod + def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + ... + + +@preprocessor('fill_gaps') +class FillGapsPreprocessor(Preprocessor): + + def __init__(self, gap_size, replace_value): + if isinstance(gap_size, str): + gap_size = pd.Timedelta(gap_size) + self.gap_size = gap_size + self.replace_value = replace_value + self._gaps = defaultdict(list) + + def reset(self): + self._gaps = defaultdict(list) + + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: + result = [] + for value in series: + result.append(value) + name = value.name + idx = value.index.to_series() + df = pd.concat([idx, idx.diff().rename('Diff')], axis=1) + filtered_df = df[df['Diff'] > self.gap_size] + gaps = ((row['Time'], row['Time']+row['Diff']) for _, row in filtered_df.iterrows()) + self._gaps[name].extend(gaps) + return result + + def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + for name, gaps in self._gaps.items(): + for gap_start, gap_end in gaps: + df.iloc[(df.index > gap_start) & (df.index < gap_end), df.columns.get_loc(name)] = self.replace_value + return df + From 2f9ef310fbafba18391ab7493c8af38eb3061b17 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 20:54:39 +0300 Subject: [PATCH 15/47] Add Masking for LSTM models --- .../machine/model/factories/lstm_autoencoder.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index adfe0c64f..c9ad98db8 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -from typing import Tuple, Union, Dict, Any +from typing import Tuple, Union, Dict, Any, Optional import tensorflow from tensorflow import keras from tensorflow.keras.optimizers import Optimizer -from tensorflow.keras.layers import Dense, LSTM +from tensorflow.keras.layers import Dense, LSTM, Masking from tensorflow.keras.models import Sequential as KerasSequential from gordo.machine.model.register import register_model_builder @@ -26,6 +26,7 @@ def lstm_model( optimizer: Union[str, Optimizer] = "Adam", optimizer_kwargs: Dict[str, Any] = dict(), compile_kwargs: Dict[str, Any] = dict(), + mask_value: Optional[float] = None, **kwargs, ) -> tensorflow.keras.models.Sequential: """ @@ -63,6 +64,8 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are default values will be used. compile_kwargs: Dict[str, Any] Parameters to pass to ``keras.Model.compile``. + mask_value: Optional[float] + Add Masking layer with this mask_value Returns ------- @@ -71,17 +74,23 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are """ n_features_out = n_features_out or n_features + with_masking = mask_value is not None check_dim_func_len("encoding", encoding_dim, encoding_func) check_dim_func_len("decoding", decoding_dim, decoding_func) model = KerasSequential() + if with_masking: + input_shape = (lookback_window, n_features) + model.add(Masking(mask_value=mask_value, input_shape=input_shape)) + # encoding layers kwargs = {"return_sequences": True} for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)): - input_shape = (lookback_window, n_neurons if i != 0 else n_features) - kwargs.update(dict(activation=activation, input_shape=input_shape)) + input_shape = (lookback_window, n_neurons if not with_masking and i != 0 else n_features) + kwargs["activation"] = activation + kwargs["input_shape "] = input_shape model.add(LSTM(n_neurons, **kwargs)) # decoding layers From 5508a953a5813b72ccd754182b72ebb04faec7b1 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 21:09:57 +0300 Subject: [PATCH 16/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 27 ++++++++++++------- .../model/factories/lstm_autoencoder.py | 5 +++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 98b7ba7f1..023ea869f 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -11,9 +11,12 @@ def preprocessor(preprocessor_type): def wrapper(cls): if preprocessor_type in _types: - raise ValueError("Preprocessor with name '%s' has already been added" % preprocessor_type) + raise ValueError( + "Preprocessor with name '%s' has already been added" % preprocessor_type + ) _types[preprocessor_type] = cls return cls + return wrapper @@ -25,10 +28,10 @@ def create_preprocessor(preprocessor_type, *args, **kwargs): def normalize_preprocessor(value): if isinstance(value, dict): - if 'type' not in value: + if "type" not in value: raise ValueError("A preprocessor type is empty") value = deepcopy(value) - preprocessor_type = value.pop('type') + preprocessor_type = value.pop("type") return create_preprocessor(preprocessor_type, **value) return value @@ -47,9 +50,8 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: ... -@preprocessor('fill_gaps') +@preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): - def __init__(self, gap_size, replace_value): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) @@ -66,15 +68,20 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: result.append(value) name = value.name idx = value.index.to_series() - df = pd.concat([idx, idx.diff().rename('Diff')], axis=1) - filtered_df = df[df['Diff'] > self.gap_size] - gaps = ((row['Time'], row['Time']+row['Diff']) for _, row in filtered_df.iterrows()) + df = pd.concat([idx, idx.diff().rename("Diff")], axis=1) + filtered_df = df[df["Diff"] > self.gap_size] + gaps = ( + (row["Time"], row["Time"] + row["Diff"]) + for _, row in filtered_df.iterrows() + ) self._gaps[name].extend(gaps) return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: for name, gaps in self._gaps.items(): for gap_start, gap_end in gaps: - df.iloc[(df.index > gap_start) & (df.index < gap_end), df.columns.get_loc(name)] = self.replace_value + df.iloc[ + (df.index > gap_start) & (df.index < gap_end), + df.columns.get_loc(name), + ] = self.replace_value return df - diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index c9ad98db8..0114e26a3 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -88,7 +88,10 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are # encoding layers kwargs = {"return_sequences": True} for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)): - input_shape = (lookback_window, n_neurons if not with_masking and i != 0 else n_features) + input_shape = ( + lookback_window, + n_neurons if not with_masking and i != 0 else n_features, + ) kwargs["activation"] = activation kwargs["input_shape "] = input_shape model.add(LSTM(n_neurons, **kwargs)) From 9c3d4634b41985cd0385418810dceb01cc5ed11c Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 21:16:41 +0300 Subject: [PATCH 17/47] Fix typo --- gordo/machine/model/factories/lstm_autoencoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index 0114e26a3..5ee673ee3 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -93,7 +93,7 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are n_neurons if not with_masking and i != 0 else n_features, ) kwargs["activation"] = activation - kwargs["input_shape "] = input_shape + kwargs["input_shape"] = input_shape model.add(LSTM(n_neurons, **kwargs)) # decoding layers From d849df72c4dc75fc1881d0f21b8a351dbb1743ef Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 21:26:22 +0300 Subject: [PATCH 18/47] Additional logger --- gordo/machine/dataset/preprocessor.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 023ea869f..36d79e014 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,10 +1,13 @@ +import logging import pandas as pd -from typing import Union, Iterable +from typing import Iterable from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict +logger = logging.getLogger(__name__) + _types = {} @@ -74,7 +77,15 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: (row["Time"], row["Time"] + row["Diff"]) for _, row in filtered_df.iterrows() ) + self._gaps[name].extend(gaps) + for name, gaps in self._gaps.items(): + logger.info( + "Found %d gap%s in '%s' time-series", + len(gaps), + "s" if len(gaps) > 1 else "", + name, + ) return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: From 11c927f62317f1a15ef6056265a5f29b8394ca16 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 23:41:58 +0300 Subject: [PATCH 19/47] Fix with_masking condition in lstm_model() --- gordo/machine/model/factories/lstm_autoencoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gordo/machine/model/factories/lstm_autoencoder.py b/gordo/machine/model/factories/lstm_autoencoder.py index 5ee673ee3..0bd0ac268 100644 --- a/gordo/machine/model/factories/lstm_autoencoder.py +++ b/gordo/machine/model/factories/lstm_autoencoder.py @@ -90,7 +90,7 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)): input_shape = ( lookback_window, - n_neurons if not with_masking and i != 0 else n_features, + n_neurons if with_masking or i != 0 else n_features, ) kwargs["activation"] = activation kwargs["input_shape"] = input_shape From 72756316955bfcf4bc915cdc851df1863f9fe446 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Sun, 19 Jul 2020 23:50:25 +0300 Subject: [PATCH 20/47] Fixing test test_machine_from_config() --- tests/gordo/workflow/test_config_elements.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/gordo/workflow/test_config_elements.py b/tests/gordo/workflow/test_config_elements.py index cb5269805..02bc6c3dc 100644 --- a/tests/gordo/workflow/test_config_elements.py +++ b/tests/gordo/workflow/test_config_elements.py @@ -150,6 +150,7 @@ def test_machine_from_config(default_globals: dict): "target_tag_list": ["GRA-TE -123-456"], "train_end_date": "2018-01-02T09:00:30+00:00", "train_start_date": "2018-01-01T09:00:30+00:00", + "preprocessor": None, "type": "TimeSeriesDataset", }, "evaluation": { From 6e7f8cc7cb4b288ec00c094741b4683f34197eab Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:41:19 +0300 Subject: [PATCH 21/47] Additional anotations --- gordo/machine/dataset/preprocessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 36d79e014..eaad4b7d8 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,14 +1,14 @@ import logging import pandas as pd -from typing import Iterable +from typing import Iterable, Dict, Tuple, Union from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict logger = logging.getLogger(__name__) -_types = {} +_types: Dict[str, Tuple[pd.Timestamp, pd.Timestamp]] = {} def preprocessor(preprocessor_type): @@ -55,7 +55,7 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): - def __init__(self, gap_size, replace_value): + def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size From 49f08cb01990e2046f758a5e7903d27a50f2cd95 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:55:37 +0300 Subject: [PATCH 22/47] Fixing type anotation --- gordo/machine/dataset/preprocessor.py | 33 ++++++++++++++------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index eaad4b7d8..f3154791d 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,14 +1,29 @@ import logging import pandas as pd -from typing import Iterable, Dict, Tuple, Union +from typing import Iterable, Dict, Tuple, Union, Type from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict logger = logging.getLogger(__name__) -_types: Dict[str, Tuple[pd.Timestamp, pd.Timestamp]] = {} + +class Preprocessor(metaclass=ABCMeta): + @abstractmethod + def reset(self): + ... + + @abstractmethod + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: + ... + + @abstractmethod + def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + ... + + +_types: Dict[str, Type[Preprocessor]] = {} def preprocessor(preprocessor_type): @@ -39,20 +54,6 @@ def normalize_preprocessor(value): return value -class Preprocessor(metaclass=ABCMeta): - @abstractmethod - def reset(self): - ... - - @abstractmethod - def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: - ... - - @abstractmethod - def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: - ... - - @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): From dbd6fe1c4360dc828fabe9aeba5a8be7bff609b2 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:58:36 +0300 Subject: [PATCH 23/47] Fixing type anotation for _gap variable --- gordo/machine/dataset/preprocessor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index f3154791d..0fd6e348e 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -1,7 +1,7 @@ import logging import pandas as pd -from typing import Iterable, Dict, Tuple, Union, Type +from typing import Iterable, Dict, Tuple, Union, Type, List from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict @@ -61,7 +61,7 @@ def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.replace_value = replace_value - self._gaps = defaultdict(list) + self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict(list) def reset(self): self._gaps = defaultdict(list) From 5d19438fc4ba539449e0235d4af90f121ff6d7e5 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 10:59:34 +0300 Subject: [PATCH 24/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 0fd6e348e..9fc9561cf 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -61,7 +61,9 @@ def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.replace_value = replace_value - self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict(list) + self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( + list + ) def reset(self): self._gaps = defaultdict(list) From 3f86902946a57db547f114ca613209ebf919a8ae Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 11:22:25 +0300 Subject: [PATCH 25/47] Trying to ignore mypy --- gordo/machine/dataset/preprocessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 9fc9561cf..b00e57a47 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -82,11 +82,11 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: ) self._gaps[name].extend(gaps) - for name, gaps in self._gaps.items(): + for name, gaps in self._gaps.items(): # type: ignore logger.info( "Found %d gap%s in '%s' time-series", - len(gaps), - "s" if len(gaps) > 1 else "", + len(gaps), # type: ignore + "s" if len(gaps) > 1 else "", # type: ignore name, ) return result From 2dc75bf545b3d197a89ce088b0ce821701df6ea7 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 20 Jul 2020 11:23:35 +0300 Subject: [PATCH 26/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index b00e57a47..68b726c32 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -82,11 +82,11 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: ) self._gaps[name].extend(gaps) - for name, gaps in self._gaps.items(): # type: ignore + for name, gaps in self._gaps.items(): # type: ignore logger.info( "Found %d gap%s in '%s' time-series", - len(gaps), # type: ignore - "s" if len(gaps) > 1 else "", # type: ignore + len(gaps), # type: ignore + "s" if len(gaps) > 1 else "", # type: ignore name, ) return result From d49145a793d43e94aa980bc70d08dce9ab6d09d4 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Thu, 23 Jul 2020 14:50:36 +0300 Subject: [PATCH 27/47] Additinal logging --- gordo/machine/dataset/preprocessor.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 68b726c32..07cd4fdd5 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -54,6 +54,10 @@ def normalize_preprocessor(value): return value +def gap2str(gap_start: pd.Timestamp, gap_end: pd.Timestamp): + return "from %s to %s" % (gap_start.isoformat(), gap_end.isoformat()) + + @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): @@ -89,10 +93,23 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: "s" if len(gaps) > 1 else "", # type: ignore name, ) + gaps_str = ", ".join( + gap2str(gap_start, gap_end) for gap_start, gap_end in gaps + ) + logger.debug("Gaps for '%s': %s", gaps_str) return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: for name, gaps in self._gaps.items(): + if len(gaps): + values_count = df.loc[df[name] == self.replace_value, name].count() + if values_count: + logger.warning( + "Found %d values replace_value='%s' in '%s'", + values_count, + self.replace_value, + name, + ) for gap_start, gap_end in gaps: df.iloc[ (df.index > gap_start) & (df.index < gap_end), From 8b77a4bd91e8eb88752bf3dd14a0333d188af447 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Thu, 23 Jul 2020 15:00:50 +0300 Subject: [PATCH 28/47] One additional logger --- gordo/machine/dataset/preprocessor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 07cd4fdd5..d366d79e4 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -97,6 +97,8 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: gap2str(gap_start, gap_end) for gap_start, gap_end in gaps ) logger.debug("Gaps for '%s': %s", gaps_str) + else: + logger.info("Have not found any gaps in all time-series") return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: From 406402d24860ca24f71d6978d313ae11caa2f057 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Thu, 23 Jul 2020 18:13:43 +0300 Subject: [PATCH 29/47] FillGapsPreprocessor.replace_lower_values --- gordo/machine/dataset/preprocessor.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index d366d79e4..52a74ce1b 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -60,11 +60,12 @@ def gap2str(gap_start: pd.Timestamp, gap_end: pd.Timestamp): @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): - def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float): + def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float, replace_lower_values: bool = False): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.replace_value = replace_value + self.replace_lower_values = replace_lower_values self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( list ) @@ -102,19 +103,27 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: return result def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + replace_value = self.replace_value for name, gaps in self._gaps.items(): if len(gaps): - values_count = df.loc[df[name] == self.replace_value, name].count() + if self.replace_lower_values: + condition = df[name] <= replace_value + else: + condition = df[name] == replace_value + values_count = df.loc[condition, name].count() if values_count: logger.warning( - "Found %d values replace_value='%s' in '%s'", + "Found %d values %s to replace_value='%s' in '%s'", values_count, - self.replace_value, + "lower or equal" if self.replace_lower_values else "equal", + replace_value, name, ) + if self.replace_lower_values: + df.loc[df[name] <= replace_value, name] = replace_value for gap_start, gap_end in gaps: df.iloc[ (df.index > gap_start) & (df.index < gap_end), df.columns.get_loc(name), - ] = self.replace_value + ] = replace_value return df From 1fea9ad4a1d40bc1279626cc7d60dc78d86bd231 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Thu, 23 Jul 2020 18:49:22 +0300 Subject: [PATCH 30/47] Black reformating --- gordo/machine/dataset/preprocessor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 52a74ce1b..b9904aaea 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -60,7 +60,12 @@ def gap2str(gap_start: pd.Timestamp, gap_end: pd.Timestamp): @preprocessor("fill_gaps") class FillGapsPreprocessor(Preprocessor): - def __init__(self, gap_size: Union[str, pd.Timedelta], replace_value: float, replace_lower_values: bool = False): + def __init__( + self, + gap_size: Union[str, pd.Timedelta], + replace_value: float, + replace_lower_values: bool = False, + ): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size @@ -120,7 +125,7 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: name, ) if self.replace_lower_values: - df.loc[df[name] <= replace_value, name] = replace_value + df.loc[df[name] < replace_value, name] = replace_value for gap_start, gap_end in gaps: df.iloc[ (df.index > gap_start) & (df.index < gap_end), From 289346cbe85c54f004c27e87bcdabc5bd42babc8 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Fri, 24 Jul 2020 12:25:34 +0300 Subject: [PATCH 31/47] Additinal logging --- gordo/machine/dataset/preprocessor.py | 32 +++++++++++++++------------ 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index b9904aaea..691b384e4 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -109,21 +109,25 @@ def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: replace_value = self.replace_value + logger.info( + "Preparing %d tags data DataFrame with %d gaps", + len(self._gaps), + sum(len(gaps) for gaps in self._gaps.values()), + ) for name, gaps in self._gaps.items(): - if len(gaps): - if self.replace_lower_values: - condition = df[name] <= replace_value - else: - condition = df[name] == replace_value - values_count = df.loc[condition, name].count() - if values_count: - logger.warning( - "Found %d values %s to replace_value='%s' in '%s'", - values_count, - "lower or equal" if self.replace_lower_values else "equal", - replace_value, - name, - ) + if self.replace_lower_values: + condition = df[name] <= replace_value + else: + condition = df[name] == replace_value + values_count = df.loc[condition, name].count() + if values_count: + logger.warning( + "Found %d values %s to replace_value='%s' in '%s'", + values_count, + "lower or equal" if self.replace_lower_values else "equal", + replace_value, + name, + ) if self.replace_lower_values: df.loc[df[name] < replace_value, name] = replace_value for gap_start, gap_end in gaps: From 0524dd85357e888aba0d5f03a7b3c6f9a7ab14a4 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Fri, 31 Jul 2020 17:04:16 +0300 Subject: [PATCH 32/47] fill_gaps --- gordo/machine/dataset/datasets.py | 4 ++- gordo/machine/dataset/preprocessor.py | 38 ++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/gordo/machine/dataset/datasets.py b/gordo/machine/dataset/datasets.py index 4e6f1c4e4..05872b2ed 100644 --- a/gordo/machine/dataset/datasets.py +++ b/gordo/machine/dataset/datasets.py @@ -5,6 +5,7 @@ from datetime import datetime from dateutil.parser import isoparse from functools import wraps +from copy import copy import pandas as pd import numpy as np @@ -198,7 +199,8 @@ def _validate_dt(dt: Union[str, datetime]) -> datetime: def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: - preprocessor = self.preprocessor + preprocessor = copy(self.preprocessor) + preprocessor.reset() series_iter: Iterable[pd.Series] = self.data_provider.load_series( train_start_date=self.train_start_date, diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 691b384e4..964c061c9 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -5,6 +5,7 @@ from copy import deepcopy from abc import ABCMeta, abstractmethod from collections import defaultdict +from math import floor logger = logging.getLogger(__name__) @@ -65,12 +66,21 @@ def __init__( gap_size: Union[str, pd.Timedelta], replace_value: float, replace_lower_values: bool = False, + fill_gaps: bool = False, + resolution: Union[str, pd.Timedelta] = None, ): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.replace_value = replace_value self.replace_lower_values = replace_lower_values + self.fill_gaps = fill_gaps + if resolution is None: + resolution = gap_size + else: + if isinstance(resolution, str): + resolution = pd.Timedelta(resolution) + self.resolution = resolution self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( list ) @@ -131,8 +141,28 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: if self.replace_lower_values: df.loc[df[name] < replace_value, name] = replace_value for gap_start, gap_end in gaps: - df.iloc[ - (df.index > gap_start) & (df.index < gap_end), - df.columns.get_loc(name), - ] = replace_value + if self.fill_gaps: + column = df[name] + drop_index = column.iloc[ + (column.index > gap_start) & (column.index < gap_end) + ].index + df[name] = column.drop(index=drop_index) + values_count = floor((gap_end - gap_start) / self.resolution) + logger.info( + "name=%s gap_start=%s gap_end=%s len(drop_index)=%d values_count=%d", + name, + gap_start, + gap_end, + len(drop_index), + values_count, + ) + curr_ts = deepcopy(gap_start) + for _ in range(values_count): + curr_ts += self.resolution + df.at[curr_ts, name] = replace_value + else: + df.iloc[ + (df.index > gap_start) & (df.index < gap_end), + df.columns.get_loc(name), + ] = replace_value return df From 6f2d1d7c079b67806b6e2ec2d31c104859172fbf Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Wed, 5 Aug 2020 17:44:19 +0300 Subject: [PATCH 33/47] Fix issue with preprocessor=None --- gordo/machine/dataset/datasets.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gordo/machine/dataset/datasets.py b/gordo/machine/dataset/datasets.py index 05872b2ed..f27f7292b 100644 --- a/gordo/machine/dataset/datasets.py +++ b/gordo/machine/dataset/datasets.py @@ -199,8 +199,10 @@ def _validate_dt(dt: Union[str, datetime]) -> datetime: def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: - preprocessor = copy(self.preprocessor) - preprocessor.reset() + preprocessor = None + if self.preprocessor is not None: + preprocessor = copy(self.preprocessor) + preprocessor.reset() series_iter: Iterable[pd.Series] = self.data_provider.load_series( train_start_date=self.train_start_date, From 3299abb71e7817dd54350c8fb99df411d4610c92 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Wed, 5 Aug 2020 17:44:35 +0300 Subject: [PATCH 34/47] fill_gaps_in_data --- gordo/machine/dataset/preprocessor.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 964c061c9..ec0cb1467 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -68,6 +68,7 @@ def __init__( replace_lower_values: bool = False, fill_gaps: bool = False, resolution: Union[str, pd.Timedelta] = None, + fill_gaps_in_data: bool = True, ): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) @@ -81,6 +82,7 @@ def __init__( if isinstance(resolution, str): resolution = pd.Timedelta(resolution) self.resolution = resolution + self.fill_gaps_in_data = fill_gaps_in_data self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( list ) @@ -88,19 +90,20 @@ def __init__( def reset(self): self._gaps = defaultdict(list) + def find_gaps(self, series): + name = 'Time' + df = pd.concat([series.rename(name), series.diff().rename("Diff")], axis=1) + filtered_df = df[df["Diff"] > self.gap_size] + for _, row in filtered_df.iterrows(): + yield row[name], row[name] + row["Diff"] + def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: result = [] for value in series: result.append(value) name = value.name idx = value.index.to_series() - df = pd.concat([idx, idx.diff().rename("Diff")], axis=1) - filtered_df = df[df["Diff"] > self.gap_size] - gaps = ( - (row["Time"], row["Time"] + row["Diff"]) - for _, row in filtered_df.iterrows() - ) - + gaps = list(self.find_gaps(idx)) self._gaps[name].extend(gaps) for name, gaps in self._gaps.items(): # type: ignore logger.info( @@ -124,6 +127,10 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: len(self._gaps), sum(len(gaps) for gaps in self._gaps.values()), ) + if self.fill_gaps_in_data: + for name in df.columns: + gaps = list(self.find_gaps(df[name].index.to_series())) + self._gaps[name].extend(gaps) for name, gaps in self._gaps.items(): if self.replace_lower_values: condition = df[name] <= replace_value From 1b06f7374a255e461ca017292efdfa1848cedab5 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Thu, 6 Aug 2020 14:58:37 +0300 Subject: [PATCH 35/47] Replace nans --- gordo/machine/dataset/preprocessor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index ec0cb1467..197b93e96 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -69,6 +69,7 @@ def __init__( fill_gaps: bool = False, resolution: Union[str, pd.Timedelta] = None, fill_gaps_in_data: bool = True, + replace_nans: bool = True, ): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) @@ -83,6 +84,7 @@ def __init__( resolution = pd.Timedelta(resolution) self.resolution = resolution self.fill_gaps_in_data = fill_gaps_in_data + self.replace_nans = replace_nans self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( list ) @@ -172,4 +174,6 @@ def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: (df.index > gap_start) & (df.index < gap_end), df.columns.get_loc(name), ] = replace_value + if self.replace_nans: + df = df.fillna(self.replace_value) return df From f2f0da0e258586069956e21eb3f2c6f68ca15578 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Fri, 14 Aug 2020 11:37:24 +0300 Subject: [PATCH 36/47] mark_gaps --- gordo/machine/dataset/preprocessor.py | 118 ++++---------------------- 1 file changed, 16 insertions(+), 102 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 197b93e96..e1d75c88d 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -16,11 +16,7 @@ def reset(self): ... @abstractmethod - def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: - ... - - @abstractmethod - def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: + def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: ... @@ -59,38 +55,20 @@ def gap2str(gap_start: pd.Timestamp, gap_end: pd.Timestamp): return "from %s to %s" % (gap_start.isoformat(), gap_end.isoformat()) -@preprocessor("fill_gaps") -class FillGapsPreprocessor(Preprocessor): +@preprocessor("mark_gaps") +class MarkGapsPreprocessor(Preprocessor): def __init__( self, gap_size: Union[str, pd.Timedelta], - replace_value: float, - replace_lower_values: bool = False, - fill_gaps: bool = False, - resolution: Union[str, pd.Timedelta] = None, - fill_gaps_in_data: bool = True, - replace_nans: bool = True, + mark_value: float, ): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size - self.replace_value = replace_value - self.replace_lower_values = replace_lower_values - self.fill_gaps = fill_gaps - if resolution is None: - resolution = gap_size - else: - if isinstance(resolution, str): - resolution = pd.Timedelta(resolution) - self.resolution = resolution - self.fill_gaps_in_data = fill_gaps_in_data - self.replace_nans = replace_nans - self._gaps: Dict[str, List[Tuple[pd.Timestamp, pd.Timestamp]]] = defaultdict( - list - ) + self.mark_value = mark_value def reset(self): - self._gaps = defaultdict(list) + pass def find_gaps(self, series): name = 'Time' @@ -99,81 +77,17 @@ def find_gaps(self, series): for _, row in filtered_df.iterrows(): yield row[name], row[name] + row["Diff"] - def prepare_series(self, series: Iterable[pd.Series]) -> Iterable[pd.Series]: - result = [] - for value in series: - result.append(value) - name = value.name - idx = value.index.to_series() - gaps = list(self.find_gaps(idx)) - self._gaps[name].extend(gaps) - for name, gaps in self._gaps.items(): # type: ignore - logger.info( - "Found %d gap%s in '%s' time-series", - len(gaps), # type: ignore - "s" if len(gaps) > 1 else "", # type: ignore - name, - ) - gaps_str = ", ".join( - gap2str(gap_start, gap_end) for gap_start, gap_end in gaps - ) - logger.debug("Gaps for '%s': %s", gaps_str) - else: - logger.info("Have not found any gaps in all time-series") - return result - - def prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: - replace_value = self.replace_value + def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: logger.info( - "Preparing %d tags data DataFrame with %d gaps", + "Preparing %d tags data DataFrame", len(self._gaps), - sum(len(gaps) for gaps in self._gaps.values()), ) - if self.fill_gaps_in_data: - for name in df.columns: - gaps = list(self.find_gaps(df[name].index.to_series())) - self._gaps[name].extend(gaps) - for name, gaps in self._gaps.items(): - if self.replace_lower_values: - condition = df[name] <= replace_value - else: - condition = df[name] == replace_value - values_count = df.loc[condition, name].count() - if values_count: - logger.warning( - "Found %d values %s to replace_value='%s' in '%s'", - values_count, - "lower or equal" if self.replace_lower_values else "equal", - replace_value, - name, - ) - if self.replace_lower_values: - df.loc[df[name] < replace_value, name] = replace_value - for gap_start, gap_end in gaps: - if self.fill_gaps: - column = df[name] - drop_index = column.iloc[ - (column.index > gap_start) & (column.index < gap_end) - ].index - df[name] = column.drop(index=drop_index) - values_count = floor((gap_end - gap_start) / self.resolution) - logger.info( - "name=%s gap_start=%s gap_end=%s len(drop_index)=%d values_count=%d", - name, - gap_start, - gap_end, - len(drop_index), - values_count, - ) - curr_ts = deepcopy(gap_start) - for _ in range(values_count): - curr_ts += self.resolution - df.at[curr_ts, name] = replace_value - else: - df.iloc[ - (df.index > gap_start) & (df.index < gap_end), - df.columns.get_loc(name), - ] = replace_value - if self.replace_nans: - df = df.fillna(self.replace_value) + index_series = df.index.to_series() + gaps = list(self.find_gaps(index_series)) + if gaps: + for ts, _ in gaps: + mark_ts = ts + self.gap_size + for column in df.columns: + df.at[mark_ts, column] = self.mark_value + df = df.sort_index() return df From b246664ce70bd810be500ce58eecfed27c4cb4f9 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Fri, 14 Aug 2020 21:21:51 +0300 Subject: [PATCH 37/47] fill_nan --- gordo/machine/dataset/preprocessor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index e1d75c88d..ee16016e8 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -61,11 +61,13 @@ def __init__( self, gap_size: Union[str, pd.Timedelta], mark_value: float, + fill_nan: bool = False, ): if isinstance(gap_size, str): gap_size = pd.Timedelta(gap_size) self.gap_size = gap_size self.mark_value = mark_value + self.fill_nan = fill_nan def reset(self): pass @@ -88,6 +90,8 @@ def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: for ts, _ in gaps: mark_ts = ts + self.gap_size for column in df.columns: + if self.fill_nan: + df[column].fillna(self.mark_value) df.at[mark_ts, column] = self.mark_value df = df.sort_index() return df From 1ee0b1c50e209d662dc2a865d269450d3d3e3c47 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 16:03:22 +0300 Subject: [PATCH 38/47] Additional refactoring for preprocessor --- gordo/machine/dataset/datasets.py | 5 +---- gordo/machine/dataset/preprocessor.py | 2 ++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/gordo/machine/dataset/datasets.py b/gordo/machine/dataset/datasets.py index f27f7292b..7ed24ca44 100644 --- a/gordo/machine/dataset/datasets.py +++ b/gordo/machine/dataset/datasets.py @@ -210,9 +210,6 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: tag_list=list(set(self.tag_list + self.target_tag_list)), ) - if preprocessor is not None: - series_iter = preprocessor.prepare_series(series_iter) - # Resample if we have a resolution set, otherwise simply join the series. if self.resolution: data = self.join_timeseries( @@ -255,7 +252,7 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: ) if preprocessor is not None: - data = preprocessor.prepare_data(data) + data = preprocessor.prepare_df(data) x_tag_names = [tag.name for tag in self.tag_list] y_tag_names = [tag.name for tag in self.target_tag_list] diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index ee16016e8..48c0c6431 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -87,6 +87,8 @@ def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: index_series = df.index.to_series() gaps = list(self.find_gaps(index_series)) if gaps: + for ts, gap_size in gaps: + logger.debug("Found gap in index on %s for %s", ts, gap_size) for ts, _ in gaps: mark_ts = ts + self.gap_size for column in df.columns: From f37b966b2d7cae9b13baf1595c0b48c46c4ff622 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 16:03:43 +0300 Subject: [PATCH 39/47] Support for custom TimeseriesGenerators --- gordo/machine/model/models.py | 152 ++++++++++++++++++++++++++++++++-- 1 file changed, 144 insertions(+), 8 deletions(-) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index f1710fcec..649cc7126 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -4,7 +4,7 @@ import logging import io from pprint import pformat -from typing import Union, Callable, Dict, Any, Optional +from typing import Union, Callable, Dict, Any, Optional, List, Tuple, Type from abc import ABCMeta from copy import copy, deepcopy @@ -13,6 +13,7 @@ from tensorflow.keras.models import load_model, save_model from tensorflow.keras.preprocessing.sequence import pad_sequences, TimeseriesGenerator from tensorflow.keras.wrappers.scikit_learn import KerasRegressor as BaseWrapper +from tensorflow.python.keras.utils import data_utils import numpy as np import pandas as pd @@ -28,6 +29,8 @@ from gordo.machine.model.register import register_model_builder +from dataclasses import dataclass + logger = logging.getLogger(__name__) @@ -399,6 +402,7 @@ def __init__( kind: Union[Callable, str], lookback_window: int = 1, batch_size: int = 32, + generator: Optional[Dict] = None, **kwargs, ) -> None: """ @@ -427,6 +431,9 @@ def __init__( """ self.lookback_window = lookback_window self.batch_size = batch_size + if generator is None: + generator = {} + self.generator = generator kwargs["lookback_window"] = lookback_window kwargs["kind"] = kind kwargs["batch_size"] = batch_size @@ -522,6 +529,7 @@ def fit(self, X: np.ndarray, y: np.ndarray, **kwargs) -> "KerasLSTMForecast": batch_size=1, lookback_window=self.lookback_window, lookahead=self.lookahead, + **self.generator, ) primer_x, primer_y = tsg[0] @@ -534,6 +542,7 @@ def fit(self, X: np.ndarray, y: np.ndarray, **kwargs) -> "KerasLSTMForecast": batch_size=self.batch_size, lookback_window=self.lookback_window, lookahead=self.lookahead, + **self.generator, ) gen_kwargs = { @@ -642,12 +651,135 @@ def lookahead(self) -> int: return 0 +_timeseries_generator_types: Dict[str, Type[data_utils.Sequence]] = { + "default": TimeseriesGenerator +} + + +def timeseries_generator(generator_type: str): + def wrapper(cls: Type[data_utils.Sequence]): + if generator_type is not _timeseries_generator_types: + raise ValueError( + "TimeseriesGenerator with type '%s' has already been added" % generator_type + ) + _timeseries_generator_types[generator_type] = cls + return cls + return wrapper + + +def create_timeseeries_generator(generator_type: str, *args, **kwargs): + if generator_type not in _timeseries_generator_types: + raise ValueError("Can't find a TimeseriesGenerator with type '%s'" % generator_type) + return _timeseries_generator_types[generator_type](*args, **kwargs) + + +def marked_value_slices(arr: np.ndarray, mark_value: float) -> List[slice]: + indexes = np.where(np.all(arr == mark_value, axis=1)) + slices = [] + curr_index = 0 + for index in indexes: + slices.append(slice(curr_index, index)) + curr_index = index+1 + return slices + + +@dataclass +class SliceGeneratorContainer: + data_slice: slice + generator: Optional[data_utils.Sequence] = None + length: int = 0 + + +@timeseries_generator('split_by_mark_value') +class SplitByMarkValueTimeseriesGenerator(data_utils.Sequence): + + def __init__( + self, + data: np.ndarray, + targets: np.ndarray, + length: int, + mark_value: float, + batch_size: int = 128, + shuffle: bool = False, + min_split_size: Optional[int] = None, + ): + if len(data) != len(targets): + raise ValueError( + "Data and targets have to be of same length. " + f"Data length is {len(data)}" + f" while target length is {len(targets)}" + ) + self.succeeded_gen_containers, self.failed_gen_containers = self.create_gen_containers( + data, targets, length, mark_value, batch_size, shuffle, min_split_size=min_split_size + ) + logger.debug( + "SplitByMarkValueTimeseriesGenerator with succeeded_gen_containers=%s", + self.succeeded_gen_containers, + ) + logger.debug( + "SplitByMarkValueTimeseriesGenerator with failed_gen_containers=%s", + self.failed_gen_containers, + ) + if not self.succeeded_gen_containers: + raise ValueError( + "Seems like the time series are too small" + ) + + @staticmethod + def create_gen_containers( + data: np.ndarray, + targets: np.ndarray, + length: int, + mark_value: float, + batch_size: int, + shuffle: bool, + min_split_size: Optional[int] = None + ) -> Tuple[List[SliceGeneratorContainer], List[SliceGeneratorContainer]]: + data_slices = marked_value_slices(data, mark_value) + succeeded_gen_containers, failed_gen_containers = [], [] + for data_slice in data_slices: + gen_data = data[data_slice] + gen_targets = targets[data_slice] + try: + generator = TimeseriesGenerator( + gen_data, + gen_targets, + length=length, + batch_size=batch_size, + shuffle=shuffle, + ) + except ValueError: + failed_gen_containers.append(SliceGeneratorContainer(data_slice)) + else: + length = len(generator) + gen_container = SliceGeneratorContainer(data_slice, generator, length) + if min_split_size is not None and length < min_split_size: + failed_gen_containers.append(gen_container) + else: + succeeded_gen_containers.append(gen_container) + return succeeded_gen_containers, failed_gen_containers + + def __len__(self): + return sum(gen_container.length for gen_container in self.succeeded_gen_containers) + + def __getitem__(self, index): + i = -1 + for gen_container in self.succeeded_gen_containers: + new_i = i + gen_container.length + if index <= new_i: + gen_i = index - i - 1 + return gen_container.generator[gen_i] + i = new_i + raise IndexError(index) + + def create_keras_timeseriesgenerator( X: np.ndarray, y: Optional[np.ndarray], batch_size: int, lookback_window: int, lookahead: int, + **kwargs, ) -> tensorflow.keras.preprocessing.sequence.TimeseriesGenerator: """ Provides a `keras.preprocessing.sequence.TimeseriesGenerator` for use with @@ -701,26 +833,30 @@ def create_keras_timeseriesgenerator( 2 """ new_length = len(X) + 1 - lookahead - kwargs: Dict[str, Any] = dict(length=lookback_window, batch_size=batch_size) + gen_kwargs: Dict[str, Any] = dict(length=lookback_window, batch_size=batch_size) if lookahead == 1: - kwargs.update(dict(data=X, targets=y)) + gen_kwargs.update(dict(data=X, targets=y)) elif lookahead >= 0: pad_kw = dict(maxlen=new_length, dtype=X.dtype) if lookahead == 0: - kwargs["data"] = pad_sequences([X], padding="post", **pad_kw)[0] - kwargs["targets"] = pad_sequences([y], padding="pre", **pad_kw)[0] + gen_kwargs["data"] = pad_sequences([X], padding="post", **pad_kw)[0] + gen_kwargs["targets"] = pad_sequences([y], padding="pre", **pad_kw)[0] elif lookahead > 1: - kwargs["data"] = pad_sequences( + gen_kwargs["data"] = pad_sequences( [X], padding="post", truncating="post", **pad_kw )[0] - kwargs["targets"] = pad_sequences( + gen_kwargs["targets"] = pad_sequences( [y], padding="pre", truncating="pre", **pad_kw )[0] else: raise ValueError(f"Value of `lookahead` can not be negative, is {lookahead}") - return TimeseriesGenerator(**kwargs) + kwargs = deepcopy(kwargs) + generator_type = kwargs.pop("generator_type", "default") + kwargs.update(gen_kwargs) + + return create_timeseeries_generator(generator_type, **kwargs) From 7ed8979afe160d7343e21521872a4612fa216be5 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 16:29:28 +0300 Subject: [PATCH 40/47] Type --- gordo/machine/model/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index 649cc7126..aac31e147 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -667,7 +667,7 @@ def wrapper(cls: Type[data_utils.Sequence]): return wrapper -def create_timeseeries_generator(generator_type: str, *args, **kwargs): +def create_timeseries_generator(generator_type: str, *args, **kwargs): if generator_type not in _timeseries_generator_types: raise ValueError("Can't find a TimeseriesGenerator with type '%s'" % generator_type) return _timeseries_generator_types[generator_type](*args, **kwargs) @@ -859,4 +859,4 @@ def create_keras_timeseriesgenerator( generator_type = kwargs.pop("generator_type", "default") kwargs.update(gen_kwargs) - return create_timeseeries_generator(generator_type, **kwargs) + return create_timeseries_generator(generator_type, **kwargs) From 6a87206fe9148078036621bff45744a3b3b5f961 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 16:31:11 +0300 Subject: [PATCH 41/47] Fixing issue with ValueError --- gordo/machine/model/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index aac31e147..a14000c18 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -658,7 +658,7 @@ def lookahead(self) -> int: def timeseries_generator(generator_type: str): def wrapper(cls: Type[data_utils.Sequence]): - if generator_type is not _timeseries_generator_types: + if generator_type in _timeseries_generator_types: raise ValueError( "TimeseriesGenerator with type '%s' has already been added" % generator_type ) From 13b1e3ceccba447cf2ea1dac3f03df09284cdbd5 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 17:05:29 +0300 Subject: [PATCH 42/47] black reformating --- gordo/machine/dataset/preprocessor.py | 5 ++-- gordo/machine/model/models.py | 36 ++++++++++++++++++--------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 48c0c6431..b8aa1d8b7 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -73,7 +73,7 @@ def reset(self): pass def find_gaps(self, series): - name = 'Time' + name = "Time" df = pd.concat([series.rename(name), series.diff().rename("Diff")], axis=1) filtered_df = df[df["Diff"] > self.gap_size] for _, row in filtered_df.iterrows(): @@ -81,8 +81,7 @@ def find_gaps(self, series): def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: logger.info( - "Preparing %d tags data DataFrame", - len(self._gaps), + "Preparing %d tags data DataFrame", len(self._gaps), ) index_series = df.index.to_series() gaps = list(self.find_gaps(index_series)) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index a14000c18..b7f1e97a1 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -660,16 +660,20 @@ def timeseries_generator(generator_type: str): def wrapper(cls: Type[data_utils.Sequence]): if generator_type in _timeseries_generator_types: raise ValueError( - "TimeseriesGenerator with type '%s' has already been added" % generator_type + "TimeseriesGenerator with type '%s' has already been added" + % generator_type ) _timeseries_generator_types[generator_type] = cls return cls + return wrapper def create_timeseries_generator(generator_type: str, *args, **kwargs): if generator_type not in _timeseries_generator_types: - raise ValueError("Can't find a TimeseriesGenerator with type '%s'" % generator_type) + raise ValueError( + "Can't find a TimeseriesGenerator with type '%s'" % generator_type + ) return _timeseries_generator_types[generator_type](*args, **kwargs) @@ -679,7 +683,7 @@ def marked_value_slices(arr: np.ndarray, mark_value: float) -> List[slice]: curr_index = 0 for index in indexes: slices.append(slice(curr_index, index)) - curr_index = index+1 + curr_index = index + 1 return slices @@ -690,9 +694,8 @@ class SliceGeneratorContainer: length: int = 0 -@timeseries_generator('split_by_mark_value') +@timeseries_generator("split_by_mark_value") class SplitByMarkValueTimeseriesGenerator(data_utils.Sequence): - def __init__( self, data: np.ndarray, @@ -709,8 +712,17 @@ def __init__( f"Data length is {len(data)}" f" while target length is {len(targets)}" ) - self.succeeded_gen_containers, self.failed_gen_containers = self.create_gen_containers( - data, targets, length, mark_value, batch_size, shuffle, min_split_size=min_split_size + ( + self.succeeded_gen_containers, + self.failed_gen_containers, + ) = self.create_gen_containers( + data, + targets, + length, + mark_value, + batch_size, + shuffle, + min_split_size=min_split_size, ) logger.debug( "SplitByMarkValueTimeseriesGenerator with succeeded_gen_containers=%s", @@ -721,9 +733,7 @@ def __init__( self.failed_gen_containers, ) if not self.succeeded_gen_containers: - raise ValueError( - "Seems like the time series are too small" - ) + raise ValueError("Seems like the time series are too small") @staticmethod def create_gen_containers( @@ -733,7 +743,7 @@ def create_gen_containers( mark_value: float, batch_size: int, shuffle: bool, - min_split_size: Optional[int] = None + min_split_size: Optional[int] = None, ) -> Tuple[List[SliceGeneratorContainer], List[SliceGeneratorContainer]]: data_slices = marked_value_slices(data, mark_value) succeeded_gen_containers, failed_gen_containers = [], [] @@ -760,7 +770,9 @@ def create_gen_containers( return succeeded_gen_containers, failed_gen_containers def __len__(self): - return sum(gen_container.length for gen_container in self.succeeded_gen_containers) + return sum( + gen_container.length for gen_container in self.succeeded_gen_containers + ) def __getitem__(self, index): i = -1 From a2d6c65c9fe17870b2d14e9b4e13b63ac124ede9 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 18:33:52 +0300 Subject: [PATCH 43/47] Fix AttributeError --- gordo/machine/dataset/preprocessor.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index b8aa1d8b7..3a55c263a 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -80,9 +80,6 @@ def find_gaps(self, series): yield row[name], row[name] + row["Diff"] def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info( - "Preparing %d tags data DataFrame", len(self._gaps), - ) index_series = df.index.to_series() gaps = list(self.find_gaps(index_series)) if gaps: From eed810742d2118f1e0ff3b8440245915bc5e124e Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 18:49:01 +0300 Subject: [PATCH 44/47] Correct preprocessor --- gordo/machine/dataset/preprocessor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gordo/machine/dataset/preprocessor.py b/gordo/machine/dataset/preprocessor.py index 3a55c263a..670c394a2 100644 --- a/gordo/machine/dataset/preprocessor.py +++ b/gordo/machine/dataset/preprocessor.py @@ -83,8 +83,8 @@ def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame: index_series = df.index.to_series() gaps = list(self.find_gaps(index_series)) if gaps: - for ts, gap_size in gaps: - logger.debug("Found gap in index on %s for %s", ts, gap_size) + for from_ts, to_ts in gaps: + logger.debug("Found gap from %s to %s", from_ts, to_ts) for ts, _ in gaps: mark_ts = ts + self.gap_size for column in df.columns: From f86146855abe7fb41ec7f4513c1ace5d7cfbca08 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 18:51:04 +0300 Subject: [PATCH 45/47] Additional TODO --- gordo/machine/model/models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index b7f1e97a1..6673223b2 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -594,6 +594,7 @@ def predict(self, X: np.ndarray, **kwargs) -> np.ndarray: X = X.values if isinstance(X, pd.DataFrame) else X X = self._validate_and_fix_size_of_X(X) + #TODO custom timeseries generator tsg = create_keras_timeseriesgenerator( X=X, y=X, From f52656d61870ab48c1ea3f9be73fb080ecea7615 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 18:57:39 +0300 Subject: [PATCH 46/47] Additional loggers --- gordo/machine/model/models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index 6673223b2..1d72a11cd 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -434,6 +434,7 @@ def __init__( if generator is None: generator = {} self.generator = generator + logger.debug("KerasLSTMBaseEstimator generator %s", generator) kwargs["lookback_window"] = lookback_window kwargs["kind"] = kind kwargs["batch_size"] = batch_size @@ -871,5 +872,6 @@ def create_keras_timeseriesgenerator( kwargs = deepcopy(kwargs) generator_type = kwargs.pop("generator_type", "default") kwargs.update(gen_kwargs) + logger.debug('create_timeseries_generator %s with %s', generator_type, kwargs) return create_timeseries_generator(generator_type, **kwargs) From fb510660a864379386e9b4b86d42fd9c55fec677 Mon Sep 17 00:00:00 2001 From: Serhii Koropets Date: Mon, 17 Aug 2020 20:51:00 +0300 Subject: [PATCH 47/47] serialize generator --- gordo/machine/model/models.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gordo/machine/model/models.py b/gordo/machine/model/models.py index 1d72a11cd..d9aa13f75 100644 --- a/gordo/machine/model/models.py +++ b/gordo/machine/model/models.py @@ -402,7 +402,6 @@ def __init__( kind: Union[Callable, str], lookback_window: int = 1, batch_size: int = 32, - generator: Optional[Dict] = None, **kwargs, ) -> None: """ @@ -431,10 +430,13 @@ def __init__( """ self.lookback_window = lookback_window self.batch_size = batch_size + + generator = kwargs.get("generator") if generator is None: generator = {} self.generator = generator logger.debug("KerasLSTMBaseEstimator generator %s", generator) + kwargs["lookback_window"] = lookback_window kwargs["kind"] = kind kwargs["batch_size"] = batch_size