Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Masking gaps for LSTM #1011

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
aa38b2c
Introduce Preprocessor
koropets Jul 19, 2020
c4dc4f5
Add Masking for LSTM models
koropets Jul 19, 2020
a497c79
Black reformating
koropets Jul 19, 2020
11112ce
Fix typo
koropets Jul 19, 2020
3040117
Additional logger
koropets Jul 19, 2020
c88672d
Fix with_masking condition in lstm_model()
koropets Jul 19, 2020
a163262
Fixing test test_machine_from_config()
koropets Jul 19, 2020
d15ddc3
Additional anotations
koropets Jul 20, 2020
8115ac7
Fixing type anotation
koropets Jul 20, 2020
9ea6bad
Fixing type anotation for _gap variable
koropets Jul 20, 2020
b47a705
Black reformating
koropets Jul 20, 2020
92504fc
Trying to ignore mypy
koropets Jul 20, 2020
bca1f82
Black reformating
koropets Jul 20, 2020
b1de5dc
Introduce Preprocessor
koropets Jul 19, 2020
2f9ef31
Add Masking for LSTM models
koropets Jul 19, 2020
5508a95
Black reformating
koropets Jul 19, 2020
9c3d463
Fix typo
koropets Jul 19, 2020
d849df7
Additional logger
koropets Jul 19, 2020
11c927f
Fix with_masking condition in lstm_model()
koropets Jul 19, 2020
7275631
Fixing test test_machine_from_config()
koropets Jul 19, 2020
6e7f8cc
Additional anotations
koropets Jul 20, 2020
49f08cb
Fixing type anotation
koropets Jul 20, 2020
dbd6fe1
Fixing type anotation for _gap variable
koropets Jul 20, 2020
5d19438
Black reformating
koropets Jul 20, 2020
3f86902
Trying to ignore mypy
koropets Jul 20, 2020
2dc75bf
Black reformating
koropets Jul 20, 2020
c1e0e0b
Merge branch 'masking_gaps' of github.com:equinor/gordo into masking_…
koropets Jul 20, 2020
d49145a
Additinal logging
koropets Jul 23, 2020
8b77a4b
One additional logger
koropets Jul 23, 2020
406402d
FillGapsPreprocessor.replace_lower_values
koropets Jul 23, 2020
1fea9ad
Black reformating
koropets Jul 23, 2020
289346c
Additinal logging
koropets Jul 24, 2020
0524dd8
fill_gaps
koropets Jul 31, 2020
6f2d1d7
Fix issue with preprocessor=None
koropets Aug 5, 2020
3299abb
fill_gaps_in_data
koropets Aug 5, 2020
1b06f73
Replace nans
koropets Aug 6, 2020
f2f0da0
mark_gaps
koropets Aug 14, 2020
b246664
fill_nan
koropets Aug 14, 2020
1ee0b1c
Additional refactoring for preprocessor
koropets Aug 17, 2020
f37b966
Support for custom TimeseriesGenerators
koropets Aug 17, 2020
7ed8979
Type
koropets Aug 17, 2020
6a87206
Fixing issue with ValueError
koropets Aug 17, 2020
13b1e3c
black reformating
koropets Aug 17, 2020
a2d6c65
Fix AttributeError
koropets Aug 17, 2020
eed8107
Correct preprocessor
koropets Aug 17, 2020
f861468
Additional TODO
koropets Aug 17, 2020
f52656d
Additional loggers
koropets Aug 17, 2020
fb51066
serialize generator
koropets Aug 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions gordo/machine/dataset/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime
from dateutil.parser import isoparse
from functools import wraps
from copy import copy

import pandas as pd
import numpy as np
Expand All @@ -25,6 +26,7 @@
ValidDatasetKwargs,
ValidDataProvider,
)
from .preprocessor import Preprocessor, normalize_preprocessor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -89,6 +91,7 @@ def __init__(
n_samples_threshold: int = 0,
low_threshold=-1000,
high_threshold=50000,
preprocessor: Optional[Union[Preprocessor, Dict]] = None,
**_kwargs,
):
"""
Expand Down Expand Up @@ -170,6 +173,7 @@ def __init__(
self.n_samples_threshold = n_samples_threshold
self.low_threshold = low_threshold
self.high_threshold = high_threshold
self.preprocessor = normalize_preprocessor(preprocessor)

if not self.train_start_date.tzinfo or not self.train_end_date.tzinfo:
raise ValueError(
Expand All @@ -195,6 +199,11 @@ def _validate_dt(dt: Union[str, datetime]) -> datetime:

def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:

preprocessor = None
if self.preprocessor is not None:
preprocessor = copy(self.preprocessor)
preprocessor.reset()

series_iter: Iterable[pd.Series] = self.data_provider.load_series(
train_start_date=self.train_start_date,
train_end_date=self.train_end_date,
Expand Down Expand Up @@ -242,6 +251,9 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
f"specified required threshold for number of rows ({self.n_samples_threshold})."
)

if preprocessor is not None:
data = preprocessor.prepare_df(data)

x_tag_names = [tag.name for tag in self.tag_list]
y_tag_names = [tag.name for tag in self.target_tag_list]

Expand Down
95 changes: 95 additions & 0 deletions gordo/machine/dataset/preprocessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
import pandas as pd

from typing import Iterable, Dict, Tuple, Union, Type, List
from copy import deepcopy
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from math import floor

logger = logging.getLogger(__name__)


class Preprocessor(metaclass=ABCMeta):
@abstractmethod
def reset(self):
...

@abstractmethod
def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame:
...


_types: Dict[str, Type[Preprocessor]] = {}


def preprocessor(preprocessor_type):
def wrapper(cls):
if preprocessor_type in _types:
raise ValueError(
"Preprocessor with name '%s' has already been added" % preprocessor_type
)
_types[preprocessor_type] = cls
return cls

return wrapper


def create_preprocessor(preprocessor_type, *args, **kwargs):
if preprocessor_type not in _types:
raise ValueError("Can't find a preprocessor with name '%s'" % preprocessor_type)
return _types[preprocessor_type](*args, **kwargs)


def normalize_preprocessor(value):
if isinstance(value, dict):
if "type" not in value:
raise ValueError("A preprocessor type is empty")
value = deepcopy(value)
preprocessor_type = value.pop("type")
return create_preprocessor(preprocessor_type, **value)
return value


def gap2str(gap_start: pd.Timestamp, gap_end: pd.Timestamp):
return "from %s to %s" % (gap_start.isoformat(), gap_end.isoformat())


@preprocessor("mark_gaps")
class MarkGapsPreprocessor(Preprocessor):
def __init__(
self,
gap_size: Union[str, pd.Timedelta],
mark_value: float,
fill_nan: bool = False,
):
if isinstance(gap_size, str):
gap_size = pd.Timedelta(gap_size)
self.gap_size = gap_size
self.mark_value = mark_value
self.fill_nan = fill_nan

def reset(self):
pass

def find_gaps(self, series):
name = "Time"
df = pd.concat([series.rename(name), series.diff().rename("Diff")], axis=1)
filtered_df = df[df["Diff"] > self.gap_size]
for _, row in filtered_df.iterrows():
yield row[name], row[name] + row["Diff"]

def prepare_df(self, df: pd.DataFrame) -> pd.DataFrame:
index_series = df.index.to_series()
gaps = list(self.find_gaps(index_series))
if gaps:
for from_ts, to_ts in gaps:
logger.debug("Found gap from %s to %s", from_ts, to_ts)
for ts, _ in gaps:
mark_ts = ts + self.gap_size
for column in df.columns:
if self.fill_nan:
df[column].fillna(self.mark_value)
df.at[mark_ts, column] = self.mark_value
df = df.sort_index()
return df
20 changes: 16 additions & 4 deletions gordo/machine/model/factories/lstm_autoencoder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-

from typing import Tuple, Union, Dict, Any
from typing import Tuple, Union, Dict, Any, Optional

import tensorflow
from tensorflow import keras
from tensorflow.keras.optimizers import Optimizer
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.layers import Dense, LSTM, Masking
from tensorflow.keras.models import Sequential as KerasSequential

from gordo.machine.model.register import register_model_builder
Expand All @@ -26,6 +26,7 @@ def lstm_model(
optimizer: Union[str, Optimizer] = "Adam",
optimizer_kwargs: Dict[str, Any] = dict(),
compile_kwargs: Dict[str, Any] = dict(),
mask_value: Optional[float] = None,
**kwargs,
) -> tensorflow.keras.models.Sequential:
"""
Expand Down Expand Up @@ -63,6 +64,8 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are
default values will be used.
compile_kwargs: Dict[str, Any]
Parameters to pass to ``keras.Model.compile``.
mask_value: Optional[float]
Add Masking layer with this mask_value

Returns
-------
Expand All @@ -71,17 +74,26 @@ class (e.x. Adam(lr=0.01,beta_1=0.9, beta_2=0.999)). If no arguments are

"""
n_features_out = n_features_out or n_features
with_masking = mask_value is not None

check_dim_func_len("encoding", encoding_dim, encoding_func)
check_dim_func_len("decoding", decoding_dim, decoding_func)

model = KerasSequential()

if with_masking:
input_shape = (lookback_window, n_features)
model.add(Masking(mask_value=mask_value, input_shape=input_shape))

# encoding layers
kwargs = {"return_sequences": True}
for i, (n_neurons, activation) in enumerate(zip(encoding_dim, encoding_func)):
input_shape = (lookback_window, n_neurons if i != 0 else n_features)
kwargs.update(dict(activation=activation, input_shape=input_shape))
input_shape = (
lookback_window,
n_neurons if with_masking or i != 0 else n_features,
)
kwargs["activation"] = activation
kwargs["input_shape"] = input_shape
model.add(LSTM(n_neurons, **kwargs))

# decoding layers
Expand Down
Loading