Skip to content

Commit

Permalink
feat(MLOP-1985): optional params (#347)
Browse files Browse the repository at this point in the history
* feat: optional params
  • Loading branch information
ralphrass authored Nov 13, 2023
1 parent 97e44fa commit 9bcca0e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
13 changes: 11 additions & 2 deletions butterfree/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@ class Source(HookableComponent):
temporary views regarding each reader and, after, will run the
desired query and return a dataframe.
The `eager_evaluation` param forces Spark to apply the currently
mapped changes to the DataFrame. When this parameter is set to
False, Spark follows its standard behaviour of lazy evaluation.
Lazy evaluation can improve Spark's performance as it allows
Spark to build the best version of the execution plan.
"""

def __init__(self, readers: List[Reader], query: str) -> None:
def __init__(
self, readers: List[Reader], query: str, eager_evaluation: bool = True,
) -> None:
super().__init__()
self.enable_pre_hooks = False
self.readers = readers
self.query = query
self.eager_evaluation = eager_evaluation

def construct(
self, client: SparkClient, start_date: str = None, end_date: str = None
Expand Down Expand Up @@ -87,7 +96,7 @@ def construct(

dataframe = client.sql(self.query)

if not dataframe.isStreaming:
if not dataframe.isStreaming and self.eager_evaluation:
dataframe.cache().count()

post_hook_df = self.run_post_hooks(dataframe)
Expand Down
17 changes: 14 additions & 3 deletions butterfree/transform/aggregated_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,23 @@ def __init__(
keys: List[KeyFeature],
timestamp: TimestampFeature,
features: List[Feature],
deduplicate_rows: bool = True,
eager_evaluation: bool = True,
):
self._windows: List[Any] = []
self._pivot_column: Optional[str] = None
self._pivot_values: Optional[List[Union[bool, float, int, str]]] = []
self._distinct_subset: List[Any] = []
self._distinct_keep: Optional[str] = None
super(AggregatedFeatureSet, self).__init__(
name, entity, description, keys, timestamp, features,
name,
entity,
description,
keys,
timestamp,
features,
deduplicate_rows,
eager_evaluation,
)

@property
Expand Down Expand Up @@ -626,8 +635,10 @@ def construct(
float("nan"), None
)
if not output_df.isStreaming:
output_df = self._filter_duplicated_rows(output_df)
output_df.cache().count()
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

post_hook_df = self.run_post_hooks(output_df)

Expand Down
16 changes: 14 additions & 2 deletions butterfree/transform/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class FeatureSet(HookableComponent):
values over key columns and timestamp column, we do this in order to reduce
our dataframe (regarding the number of rows). A detailed explation of this
method can be found at filter_duplicated_rows docstring.
The `eager_evaluation` param forces Spark to apply the currently
mapped changes to the DataFrame. When this parameter is set to
False, Spark follows its standard behaviour of lazy evaluation.
Lazy evaluation can improve Spark's performance as it allows
Spark to build the best version of the execution plan.
"""

def __init__(
Expand All @@ -107,6 +113,8 @@ def __init__(
keys: List[KeyFeature],
timestamp: TimestampFeature,
features: List[Feature],
deduplicate_rows: bool = True,
eager_evaluation: bool = True,
) -> None:
super().__init__()
self.name = name
Expand All @@ -116,6 +124,8 @@ def __init__(
self.timestamp = timestamp
self.features = features
self.incremental_strategy = IncrementalStrategy(column=TIMESTAMP_COLUMN)
self.deduplicate_rows = deduplicate_rows
self.eager_evaluation = eager_evaluation

@property
def name(self) -> str:
Expand Down Expand Up @@ -426,8 +436,10 @@ def construct(
).select(*self.columns)

if not output_df.isStreaming:
output_df = self._filter_duplicated_rows(output_df)
output_df.cache().count()
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

output_df = self.incremental_strategy.filter_with_incremental_strategy(
dataframe=output_df, start_date=start_date, end_date=end_date
Expand Down

1 comment on commit 9bcca0e

@chip-n-dale
Copy link

@chip-n-dale chip-n-dale bot commented on 9bcca0e Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ralphrass!

The GitLeaks SecTool reported some possibly exposed credentials/secrets, how about giving them a look?

GitLeaks Alert Sync
[
  {
    "line": "    webhook: REDACTED",
    "lineNumber": 141,
    "offender": "REDACTED",
    "offenderEntropy": -1,
    "commit": "b6a5daf28abc035f74b9685aab573d384680b9d1",
    "repo": "butterfree",
    "repoURL": "",
    "leakURL": "",
    "rule": "Slack Webhook",
    "commitMessage": "initial commit\n",
    "author": "Alvaro",
    "email": "[email protected]",
    "file": ".drone.yml",
    "date": "2020-01-03T14:21:51-03:00",
    "tags": "key, slack"
  },
  {
    "line": "    webhook: REDACTED",
    "lineNumber": 159,
    "offender": "REDACTED",
    "offenderEntropy": -1,
    "commit": "b6697aa708fec0c5a9e3af0b2713cee6f45ff675",
    "repo": "butterfree",
    "repoURL": "",
    "leakURL": "",
    "rule": "Slack Webhook",
    "commitMessage": "hail to the butterfree\n",
    "author": "Alvaro",
    "email": "[email protected]",
    "file": ".drone.yml",
    "date": "2020-01-03T11:07:44-03:00",
    "tags": "key, slack"
  }
]

In case of false-positives, more information is available on GitLeaks FAQ
If you had any other problem or question during this process, be sure to contact us on the Security space on GChat!

Please sign in to comment.