Skip to content

Commit

Permalink
Release/1.4.1 (#375)
Browse files Browse the repository at this point in the history
* fix: performance improvements (#374)
  • Loading branch information
ralphrass authored Sep 16, 2024
1 parent 8a15b10 commit 8d04d0b
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 248 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each

## [Unreleased]

## [1.4.1](https://github.com/quintoandar/butterfree/releases/tag/1.4.1)
* Performance Improvements ([#374](https://github.com/quintoandar/butterfree/pull/374))

## [1.4.0](https://github.com/quintoandar/butterfree/releases/tag/1.4.0)
* Add Delta support ([#370](https://github.com/quintoandar/butterfree/pull/370))

Expand Down
14 changes: 12 additions & 2 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import pkgutil
import sys
from typing import Set
from typing import Set, Type

import boto3
import setuptools
Expand Down Expand Up @@ -90,8 +90,18 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:

instances.add(value)

def create_instance(cls: Type[FeatureSetPipeline]) -> FeatureSetPipeline:
sig = inspect.signature(cls.__init__)
parameters = sig.parameters

if "run_date" in parameters:
run_date = datetime.datetime.today().strftime("%y-%m-%d")
return cls(run_date)

return cls()

logger.info("Creating instances...")
return set(value() for value in instances) # type: ignore
return set(create_instance(value) for value in instances) # type: ignore


PATH = typer.Argument(
Expand Down
14 changes: 10 additions & 4 deletions butterfree/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List, Optional

from pyspark.sql import DataFrame
from pyspark.storagelevel import StorageLevel

from butterfree.clients import SparkClient
from butterfree.extract.readers.reader import Reader
Expand Down Expand Up @@ -95,16 +96,21 @@ def construct(
DataFrame with the query result against all readers.
"""
# Step 1: Build temporary views for each reader
for reader in self.readers:
reader.build(
client=client, start_date=start_date, end_date=end_date
) # create temporary views for each reader
reader.build(client=client, start_date=start_date, end_date=end_date)

# Step 2: Execute SQL query on the combined readers
dataframe = client.sql(self.query)

# Step 3: Cache the dataframe if necessary, using memory and disk storage
if not dataframe.isStreaming and self.eager_evaluation:
dataframe.cache().count()
# Persist to ensure the DataFrame is stored in mem and disk (if necessary)
dataframe.persist(StorageLevel.MEMORY_AND_DISK)
# Trigger the cache/persist operation by performing an action
dataframe.count()

# Step 4: Run post-processing hooks on the dataframe
post_hook_df = self.run_post_hooks(dataframe)

return post_hook_df
30 changes: 22 additions & 8 deletions butterfree/pipelines/feature_set_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import List, Optional

from pyspark.storagelevel import StorageLevel

from butterfree.clients import SparkClient
from butterfree.dataframe_service import repartition_sort_df
from butterfree.extract import Source
Expand Down Expand Up @@ -209,35 +211,47 @@ def run(
soon. Use only if strictly necessary.
"""

# Step 1: Construct input dataframe from the source.
dataframe = self.source.construct(
client=self.spark_client,
start_date=self.feature_set.define_start_date(start_date),
end_date=end_date,
)

# Step 2: Repartition and sort if required, avoid if not necessary.
if partition_by:
order_by = order_by or partition_by
dataframe = repartition_sort_df(
dataframe, partition_by, order_by, num_processors
)

dataframe = self.feature_set.construct(
current_partitions = dataframe.rdd.getNumPartitions()
optimal_partitions = num_processors or current_partitions
if current_partitions != optimal_partitions:
dataframe = repartition_sort_df(
dataframe, partition_by, order_by, num_processors
)

# Step 3: Construct the feature set dataframe using defined transformations.
transformed_dataframe = self.feature_set.construct(
dataframe=dataframe,
client=self.spark_client,
start_date=start_date,
end_date=end_date,
num_processors=num_processors,
)

if dataframe.storageLevel != StorageLevel.NONE:
dataframe.unpersist() # Clear the data from the cache (disk and memory)

# Step 4: Load the data into the configured sink.
self.sink.flush(
dataframe=dataframe,
dataframe=transformed_dataframe,
feature_set=self.feature_set,
spark_client=self.spark_client,
)

if not dataframe.isStreaming:
# Step 5: Validate the output if not streaming and data volume is reasonable.
if not transformed_dataframe.isStreaming:
self.sink.validate(
dataframe=dataframe,
dataframe=transformed_dataframe,
feature_set=self.feature_set,
spark_client=self.spark_client,
)
Expand Down
32 changes: 19 additions & 13 deletions butterfree/transform/aggregated_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def _aggregate(
]

groupby = self.keys_columns.copy()

if window is not None:
dataframe = dataframe.withColumn("window", window.get())
groupby.append("window")
Expand All @@ -410,19 +411,23 @@ def _aggregate(
"keep_rn", functions.row_number().over(partition_window)
).filter("keep_rn = 1")

# repartition to have all rows for each group at the same partition
# by doing that, we won't have to shuffle data on grouping by id
dataframe = repartition_df(
dataframe,
partition_by=groupby,
num_processors=num_processors,
)
current_partitions = dataframe.rdd.getNumPartitions()
optimal_partitions = num_processors or current_partitions

if current_partitions != optimal_partitions:
dataframe = repartition_df(
dataframe,
partition_by=groupby,
num_processors=optimal_partitions,
)

grouped_data = dataframe.groupby(*groupby)

if self._pivot_column:
if self._pivot_column and self._pivot_values:
grouped_data = grouped_data.pivot(self._pivot_column, self._pivot_values)

aggregated = grouped_data.agg(*aggregations)

return self._with_renamed_columns(aggregated, features, window)

def _with_renamed_columns(
Expand Down Expand Up @@ -637,12 +642,13 @@ def construct(
output_df = output_df.select(*self.columns).replace( # type: ignore
float("nan"), None
)
if not output_df.isStreaming:
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()

if not output_df.isStreaming and self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)

post_hook_df = self.run_post_hooks(output_df)

if not output_df.isStreaming and self.eager_evaluation:
post_hook_df.cache().count()

return post_hook_df
7 changes: 2 additions & 5 deletions butterfree/transform/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,8 @@ def construct(
pre_hook_df,
).select(*self.columns)

if not output_df.isStreaming:
if self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)
if self.eager_evaluation:
output_df.cache().count()
if not output_df.isStreaming and self.deduplicate_rows:
output_df = self._filter_duplicated_rows(output_df)

output_df = self.incremental_strategy.filter_with_incremental_strategy(
dataframe=output_df, start_date=start_date, end_date=end_date
Expand Down
20 changes: 0 additions & 20 deletions docs/source/butterfree.configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,6 @@ butterfree.configs.environment module
butterfree.configs.logger module
--------------------------------

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.configs.logger
:members:
:undoc-members:
Expand Down
42 changes: 0 additions & 42 deletions docs/source/butterfree.constants.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,6 @@ butterfree.constants.migrations module
butterfree.constants.spark\_constants module
--------------------------------------------

.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:


.. automodule:: butterfree.constants.migrations
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.spark_constants
:members:
Expand All @@ -62,26 +40,6 @@ butterfree.constants.spark\_constants module
butterfree.constants.window\_definitions module
-----------------------------------------------

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.constants.window_definitions
:members:
:undoc-members:
Expand Down
8 changes: 0 additions & 8 deletions docs/source/butterfree.dataframe_service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ butterfree.dataframe\_service.partitioning module
:undoc-members:
:show-inheritance:

butterfree.dataframe\_service.repartition module
------------------------------------------------

.. automodule:: butterfree.dataframe_service.repartition
:members:
:undoc-members:
:show-inheritance:

.. automodule:: butterfree.dataframe_service.repartition
:members:
:undoc-members:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.4.0"
__version__ = "1.4.1"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
Loading

0 comments on commit 8d04d0b

Please sign in to comment.