Skip to content

Commit

Permalink
Merge branch 'rebase-staging-from-master' of github.com:quintoandar/b…
Browse files Browse the repository at this point in the history
…utterfree into rebase-staging-from-master
  • Loading branch information
ralphrass committed Apr 24, 2024
2 parents f6f1f09 + 2959f43 commit e6d2062
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 269 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each
* [BUG] Fix Cassandra Connect Session ([#316](https://github.com/quintoandar/butterfree/pull/316))
* Fix method to generate agg feature name. ([#326](https://github.com/quintoandar/butterfree/pull/326))

## [1.1.3](https://github.com/quintoandar/butterfree/releases/tag/1.1.3)
### Added
* [MLOP-636] Create migration classes ([#282](https://github.com/quintoandar/butterfree/pull/282))

## [1.1.3](https://github.com/quintoandar/butterfree/releases/tag/1.1.3)
### Added
* [MLOP-599] Apply mypy to ButterFree ([#273](https://github.com/quintoandar/butterfree/pull/273))
Expand Down
1 change: 0 additions & 1 deletion butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None:

self._send_logs_to_s3(generate_logs, debug_mode)

self._send_logs_to_s3(generate_logs)

@app.command("apply")
def migrate(
Expand Down
5 changes: 0 additions & 5 deletions butterfree/configs/db/cassandra_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ def database(self) -> str:
"""Database name."""
return "cassandra"

@property
def database(self) -> str:
"""Database name."""
return "cassandra"

@property
def username(self) -> Optional[str]:
"""Username used in connection to Cassandra DB."""
Expand Down
13 changes: 0 additions & 13 deletions butterfree/load/writers/historical_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,6 @@ def write(

dataframe = self._apply_transformations(dataframe)

if self.interval_mode:
partition_overwrite_mode = spark_client.conn.conf.get(
"spark.sql.sources.partitionOverwriteMode"
).lower()

if partition_overwrite_mode != "dynamic":
raise RuntimeError(
"m=load_incremental_table, "
"spark.sql.sources.partitionOverwriteMode={}, "
"msg=partitionOverwriteMode have to "
"be configured to 'dynamic'".format(partition_overwrite_mode)
)

if self.interval_mode:
partition_overwrite_mode = spark_client.conn.conf.get(
"spark.sql.sources.partitionOverwriteMode"
Expand Down
242 changes: 0 additions & 242 deletions tests/integration/butterfree/pipelines/test_feature_set_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,245 +411,3 @@ def test_pipeline_interval_run(

# tear down
shutil.rmtree("test_folder")

def test_feature_set_pipeline_with_dates(
self,
mocked_date_df,
spark_session,
fixed_windows_output_feature_set_date_dataframe,
feature_set_pipeline,
):
# arrange
table_reader_table = "b_table"
create_temp_view(dataframe=mocked_date_df, name=table_reader_table)

historical_writer = HistoricalFeatureStoreWriter(debug_mode=True)

feature_set_pipeline.sink.writers = [historical_writer]

# act
feature_set_pipeline.run(start_date="2016-04-12", end_date="2016-04-13")

df = spark_session.sql("select * from historical_feature_store__feature_set")

# assert
assert_dataframe_equality(df, fixed_windows_output_feature_set_date_dataframe)

def test_feature_set_pipeline_with_execution_date(
self,
mocked_date_df,
spark_session,
fixed_windows_output_feature_set_date_dataframe,
feature_set_pipeline,
):
# arrange
table_reader_table = "b_table"
create_temp_view(dataframe=mocked_date_df, name=table_reader_table)

target_df = fixed_windows_output_feature_set_date_dataframe.filter(
"timestamp < '2016-04-13'"
)

historical_writer = HistoricalFeatureStoreWriter(debug_mode=True)

feature_set_pipeline.sink.writers = [historical_writer]

# act
feature_set_pipeline.run_for_date(execution_date="2016-04-12")

df = spark_session.sql("select * from historical_feature_store__feature_set")

# assert
assert_dataframe_equality(df, target_df)

def test_pipeline_with_hooks(self, spark_session):
# arrange
hook1 = AddHook(value=1)

spark_session.sql(
"select 1 as id, timestamp('2020-01-01') as timestamp, 0 as feature"
).createOrReplaceTempView("test")

target_df = spark_session.sql(
"select 1 as id, timestamp('2020-01-01') as timestamp, 6 as feature, 2020 "
"as year, 1 as month, 1 as day"
)

historical_writer = HistoricalFeatureStoreWriter(debug_mode=True)

test_pipeline = FeatureSetPipeline(
source=Source(
readers=[TableReader(id="reader", table="test",).add_post_hook(hook1)],
query="select * from reader",
).add_post_hook(hook1),
feature_set=FeatureSet(
name="feature_set",
entity="entity",
description="description",
features=[
Feature(
name="feature",
description="test",
transformation=SQLExpressionTransform(expression="feature + 1"),
dtype=DataType.INTEGER,
),
],
keys=[
KeyFeature(
name="id",
description="The user's Main ID or device ID",
dtype=DataType.INTEGER,
)
],
timestamp=TimestampFeature(),
)
.add_pre_hook(hook1)
.add_post_hook(hook1),
sink=Sink(writers=[historical_writer],).add_pre_hook(hook1),
)

# act
test_pipeline.run()
output_df = spark_session.table("historical_feature_store__feature_set")

# assert
output_df.show()
assert_dataframe_equality(output_df, target_df)

def test_pipeline_interval_run(
self, mocked_date_df, pipeline_interval_run_target_dfs, spark_session
):
"""Testing pipeline's idempotent interval run feature.
Source data:
+-------+---+-------------------+-------------------+
|feature| id| ts| timestamp|
+-------+---+-------------------+-------------------+
| 200| 1|2016-04-11 11:31:11|2016-04-11 11:31:11|
| 300| 1|2016-04-12 11:44:12|2016-04-12 11:44:12|
| 400| 1|2016-04-13 11:46:24|2016-04-13 11:46:24|
| 500| 1|2016-04-14 12:03:21|2016-04-14 12:03:21|
+-------+---+-------------------+-------------------+
The test executes 3 runs for different time intervals. The input data has 4 data
points: 2016-04-11, 2016-04-12, 2016-04-13 and 2016-04-14. The following run
specifications are:
1) Interval: from 2016-04-11 to 2016-04-13
Target table result:
+---+-------+---+-----+------+-------------------+----+
|day|feature| id|month|run_id| timestamp|year|
+---+-------+---+-----+------+-------------------+----+
| 11| 200| 1| 4| 1|2016-04-11 11:31:11|2016|
| 12| 300| 1| 4| 1|2016-04-12 11:44:12|2016|
| 13| 400| 1| 4| 1|2016-04-13 11:46:24|2016|
+---+-------+---+-----+------+-------------------+----+
2) Interval: only 2016-04-14.
Target table result:
+---+-------+---+-----+------+-------------------+----+
|day|feature| id|month|run_id| timestamp|year|
+---+-------+---+-----+------+-------------------+----+
| 11| 200| 1| 4| 1|2016-04-11 11:31:11|2016|
| 12| 300| 1| 4| 1|2016-04-12 11:44:12|2016|
| 13| 400| 1| 4| 1|2016-04-13 11:46:24|2016|
| 14| 500| 1| 4| 2|2016-04-14 12:03:21|2016|
+---+-------+---+-----+------+-------------------+----+
3) Interval: only 2016-04-11.
Target table result:
+---+-------+---+-----+------+-------------------+----+
|day|feature| id|month|run_id| timestamp|year|
+---+-------+---+-----+------+-------------------+----+
| 11| 200| 1| 4| 3|2016-04-11 11:31:11|2016|
| 12| 300| 1| 4| 1|2016-04-12 11:44:12|2016|
| 13| 400| 1| 4| 1|2016-04-13 11:46:24|2016|
| 14| 500| 1| 4| 2|2016-04-14 12:03:21|2016|
+---+-------+---+-----+------+-------------------+----+
"""
# arrange
create_temp_view(dataframe=mocked_date_df, name="input_data")

db = environment.get_variable("FEATURE_STORE_HISTORICAL_DATABASE")
path = "test_folder/historical/entity/feature_set"

spark_session.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark_session.sql(f"create database if not exists {db}")
spark_session.sql(
f"create table if not exists {db}.feature_set_interval "
f"(id int, timestamp timestamp, feature int, "
f"run_id int, year int, month int, day int);"
)

dbconfig = MetastoreConfig()
dbconfig.get_options = Mock(
return_value={"mode": "overwrite", "format_": "parquet", "path": path}
)

historical_writer = HistoricalFeatureStoreWriter(
db_config=dbconfig, interval_mode=True
)

first_run_hook = RunHook(id=1)
second_run_hook = RunHook(id=2)
third_run_hook = RunHook(id=3)

(
first_run_target_df,
second_run_target_df,
third_run_target_df,
) = pipeline_interval_run_target_dfs

test_pipeline = FeatureSetPipeline(
source=Source(
readers=[
TableReader(id="id", table="input_data",).with_incremental_strategy(
IncrementalStrategy("ts")
),
],
query="select * from id ",
),
feature_set=FeatureSet(
name="feature_set_interval",
entity="entity",
description="",
keys=[KeyFeature(name="id", description="", dtype=DataType.INTEGER,)],
timestamp=TimestampFeature(from_column="ts"),
features=[
Feature(name="feature", description="", dtype=DataType.INTEGER),
Feature(name="run_id", description="", dtype=DataType.INTEGER),
],
),
sink=Sink([historical_writer],),
)

# act and assert
dbconfig.get_path_with_partitions = Mock(
return_value=[
"test_folder/historical/entity/feature_set/year=2016/month=4/day=11",
"test_folder/historical/entity/feature_set/year=2016/month=4/day=12",
"test_folder/historical/entity/feature_set/year=2016/month=4/day=13",
]
)
test_pipeline.feature_set.add_pre_hook(first_run_hook)
test_pipeline.run(end_date="2016-04-13", start_date="2016-04-11")
first_run_output_df = spark_session.read.parquet(path)
assert_dataframe_equality(first_run_output_df, first_run_target_df)

dbconfig.get_path_with_partitions = Mock(
return_value=[
"test_folder/historical/entity/feature_set/year=2016/month=4/day=14",
]
)
test_pipeline.feature_set.add_pre_hook(second_run_hook)
test_pipeline.run_for_date("2016-04-14")
second_run_output_df = spark_session.read.parquet(path)
assert_dataframe_equality(second_run_output_df, second_run_target_df)

dbconfig.get_path_with_partitions = Mock(
return_value=[
"test_folder/historical/entity/feature_set/year=2016/month=4/day=11",
]
)
test_pipeline.feature_set.add_pre_hook(third_run_hook)
test_pipeline.run_for_date("2016-04-11")
third_run_output_df = spark_session.read.parquet(path)
assert_dataframe_equality(third_run_output_df, third_run_target_df)

# tear down
shutil.rmtree("test_folder")
4 changes: 0 additions & 4 deletions tests/unit/butterfree/_cli/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

runner = CliRunner()

def test_migrate_all_pairs(self, mocker):
mocker.patch.object(MetastoreMigration, "apply_migration")
mocker.patch.object(CassandraMigration, "apply_migration")
mocker.patch.object(migrate.Migrate, "_send_logs_to_s3")

class TestMigrate:
def test_migrate_success(self, mocker):
Expand Down

0 comments on commit e6d2062

Please sign in to comment.