Skip to content

Commit

Permalink
🕵️ code review with @jonasvdd
Browse files Browse the repository at this point in the history
  • Loading branch information
jvdd committed Jan 23, 2024
1 parent cc8eb8d commit 45aa8bd
Showing 1 changed file with 74 additions and 55 deletions.
129 changes: 74 additions & 55 deletions tsflex/features/feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ def _get_nb_output_features_without_window(self) -> int:
)

def _get_nb_feat_funcs(self) -> int:
return sum(
len(self._feature_desc_dict[k]) for k in self._feature_desc_dict.keys()
)
return sum(map(len, self._feature_desc_dict.values()))

@staticmethod
def _get_collection_key(
Expand Down Expand Up @@ -355,34 +353,6 @@ def f(x: pd.DataFrame):

return pd.DataFrame(feat_out, index=group_ids).rename_axis(index=group_id_name)

def _group_feat_generator(
self,
grouped_df: pd.api.typing.DataFrameGroupBy,
) -> Callable[[int], Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,],]:
"""Return a function that returns the necessary columns of the grouped data and
the function that needs to be applied to the grouped data.
Note that the function does not return groups, but rather the necessary columns
of the grouped data (i.e. the data on which the function needs to be applied).
To access the groups, the global `group_indices` and `group_id_name` are used.
"""
keys_wins = list(self._feature_desc_dict.keys())
lengths = np.cumsum([len(self._feature_desc_dict[k]) for k in keys_wins])

def get_group_function(
idx,
) -> Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,]:
key_idx = np.searchsorted(lengths, idx, "right") # right bc idx starts at 0
key, win = keys_wins[key_idx]

feature = self._feature_desc_dict[keys_wins[key_idx]][
idx - lengths[key_idx]
]
function: FuncWrapper = feature.function
return grouped_df.obj[list(key)], function

return get_group_function

def _stroll_feat_generator(
self,
series_dict: Dict[str, pd.Series],
Expand Down Expand Up @@ -431,6 +401,34 @@ def get_stroll_function(idx) -> Tuple[StridedRolling, FuncWrapper]:

return get_stroll_function

def _group_feat_generator(
self,
grouped_df: pd.api.typing.DataFrameGroupBy,
) -> Callable[[int], Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,],]:
"""Return a function that returns the necessary columns of the grouped data and
the function that needs to be applied to the grouped data.
Note that the function does not return groups, but rather the necessary columns
of the grouped data (i.e. the data on which the function needs to be applied).
To access the groups, the global `group_indices` and `group_id_name` are used.
"""
keys_wins = list(self._feature_desc_dict.keys())
lengths = np.cumsum([len(self._feature_desc_dict[k]) for k in keys_wins])

def get_group_function(
idx,
) -> Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,]:
key_idx = np.searchsorted(lengths, idx, "right") # right bc idx starts at 0
key, win = keys_wins[key_idx]

feature = self._feature_desc_dict[keys_wins[key_idx]][
idx - lengths[key_idx]
]
function: FuncWrapper = feature.function
return grouped_df.obj[list(key)], function

return get_group_function

def _check_no_multiple_windows(self, error_case: str):
"""Check whether there are no multiple windows in the feature collection.
Expand All @@ -449,8 +447,8 @@ def _check_no_multiple_windows(self, error_case: str):
+ " (or None)"
)

@staticmethod
def _data_to_series_dict(
self,
data: Union[pd.DataFrame, pd.Series, List[Union[pd.Series, pd.DataFrame]]],
required_series: List[str],
) -> Dict[str, pd.Series]:
Expand Down Expand Up @@ -508,6 +506,15 @@ def _group_by_all(
df = pd.DataFrame(series_dict)
assert col_name in df.columns

# Check if there are nan values in the column on which we group
if df[col_name].isna().any():
warnings.warn(
f"NaN values were found in the column '{col_name}' (when expanding the "
+ f"data to a pd.DataFrame which contains {df.columns}. "
+ "Rows with NaN values for the grouping column will be ignored.",
RuntimeWarning,
)

# GroupBy ignores all rows with NaN values for the column on which we group
return df.groupby(col_name)

Expand All @@ -518,7 +525,7 @@ def _calculate_group_by_all(
show_progress: Optional[bool],
n_jobs: Optional[int],
f_handler: Optional[logging.FileHandler],
):
) -> Union[List[pd.DataFrame], pd.DataFrame]:
"""Calculate features on each group of the grouped data.
Parameters
Expand Down Expand Up @@ -589,6 +596,15 @@ def _group_by_consecutive(
"end",
], "Grouping column cannot be 'start' or 'end'"

# Check if there are nan values in the column on which we group
if df[col_name].isna().any():
warnings.warn(
f"NaN values were found in the column '{col_name}' (when expanding the "
+ f"data to a pd.DataFrame which contains {df.columns}. "
+ "Rows with NaN values for the grouping column will be ignored.",
RuntimeWarning,
)

# Drop all rows with NaN values for the column on which we group
df.dropna(subset=[col_name], inplace=True)

Expand Down Expand Up @@ -619,7 +635,7 @@ def _calculate_group_by_consecutive(
group_by: str,
return_df: Optional[bool] = False,
**calculate_kwargs,
):
) -> Union[List[pd.DataFrame], pd.DataFrame]:
"""Calculate features on each consecutive group of the data.
Parameters
Expand Down Expand Up @@ -648,17 +664,17 @@ def _calculate_group_by_consecutive(
the same and are next to each other are grouped together).
"""
# 0. Transform to dataframe
series_dict = self._data_to_series_dict(
series_dict = FeatureCollection._data_to_series_dict(
data, self.get_required_series() + [group_by]
)
df = pd.DataFrame(series_dict)
# 1. Group by `group_by` column
consecutive_grouped_by_df = self._group_by_consecutive(df, col_name=group_by)
# 2. Get start and end idxs of consecutive groups
start_segment_idxs = consecutive_grouped_by_df["start"]
end_segment_idxs = start_segment_idxs.shift(-1).fillna(
consecutive_grouped_by_df["end"]
)
end_segment_idxs = start_segment_idxs.shift(-1)
# fill the nan value with the last end idx
end_segment_idxs.iloc[-1] = consecutive_grouped_by_df["end"].iloc[-1]
# because segment end idxs are exclusive, we need to add an offset to the last
# end idx so that all data gets used
segment_vals = end_segment_idxs.values
Expand All @@ -668,21 +684,22 @@ def _calculate_group_by_consecutive(
segment_vals[-1] += 1
# 3. Calculate features
try:
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message="^.*segment indexes.*$"
)
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message="^.*gaps.*$"
)
# 3.1. Calculate features using the groupby segment idxs
calc_results = self.calculate(
data=df,
segment_start_idxs=start_segment_idxs,
segment_end_idxs=end_segment_idxs,
**calculate_kwargs,
)

warnings.resetwarnings()
# Filter out the warnings that are raised when segment indices are passed
# (since users expect irregular window sizes when grouping)
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message="^.*segment indexes.*$"
)
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message="^.*gaps.*$"
)
# 3.1. Calculate features using the groupby segment idxs
calc_results = self.calculate(
data=df,
segment_start_idxs=start_segment_idxs,
segment_end_idxs=end_segment_idxs,
**calculate_kwargs,
)

# 3.2 Concatenate results and add the group_by column as well as the
# start and end idxs of the segments
Expand Down Expand Up @@ -1124,7 +1141,7 @@ def calculate(
if not isinstance(data, pd.core.groupby.generic.DataFrameGroupBy):
# group_by_all should not be None (checked by asserts above)
# 0. Transform to dataframe
series_dict = self._data_to_series_dict(
series_dict = FeatureCollection._data_to_series_dict(
data, self.get_required_series() + [group_by_all]
)
# 1. Group by `group_by_all` column
Expand Down Expand Up @@ -1189,7 +1206,9 @@ def calculate(
self._check_feature_descriptors(skip_none=False, calc_stride=stride)

# Convert the data to a series_dict
series_dict = self._data_to_series_dict(data, self.get_required_series())
series_dict = FeatureCollection._data_to_series_dict(
data, self.get_required_series()
)

# Determine the bounds of the series dict items and slice on them
# TODO: is dit wel nodig `hier? want we doen dat ook in de strided rolling
Expand Down

0 comments on commit 45aa8bd

Please sign in to comment.