Skip to content

Commit

Permalink
[MLOP-702] Debug mode for Automate Migration (#322)
Browse files Browse the repository at this point in the history
* Create flag debug-mode.

* Fix tests.

* Fix migrate test.
  • Loading branch information
moromimay authored May 10, 2021
1 parent bb7ed77 commit 5a0a622
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 20 deletions.
35 changes: 24 additions & 11 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
False, help="To generate the logs in local file 'logging.json'."
)

DEBUG_MODE = typer.Option(
False,
help="To view the queries resulting from the migration, DON'T apply the migration.",
)


class Migrate:
"""Execute migration operations in a Database based on pipeline Writer.
Expand All @@ -112,19 +117,27 @@ class Migrate:
def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None:
self.pipelines = pipelines

def _send_logs_to_s3(self, file_local: bool) -> None:
def _send_logs_to_s3(self, file_local: bool, debug_mode: bool) -> None:
"""Send all migration logs to S3."""
file_name = "../logging.json"

if not file_local and os.path.exists(file_name):
s3_client = boto3.client("s3")

timestamp = datetime.datetime.now()
object_name = (
f"logs/migrate/"
f"{timestamp.strftime('%Y-%m-%d')}"
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
)

if debug_mode:
object_name = (
f"logs/migrate-debug-mode/"
f"{timestamp.strftime('%Y-%m-%d')}"
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
)
else:
object_name = (
f"logs/migrate/"
f"{timestamp.strftime('%Y-%m-%d')}"
f"/logging-{timestamp.strftime('%H:%M:%S')}.json"
)
bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET")

try:
Expand All @@ -143,23 +156,23 @@ def _send_logs_to_s3(self, file_local: bool) -> None:
json_data = json.load(json_f)
print(json_data)

def run(self, generate_logs: bool = False) -> None:
def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None:
"""Construct and apply the migrations."""
for pipeline in self.pipelines:
for writer in pipeline.sink.writers:
db = writer.db_config.database
if db == "cassandra":
migration = ALLOWED_DATABASE[db]
migration.apply_migration(pipeline.feature_set, writer)
migration.apply_migration(pipeline.feature_set, writer, debug_mode)
else:
logger.warning(f"Butterfree not supporting {db} Migrations yet.")

self._send_logs_to_s3(generate_logs)
self._send_logs_to_s3(generate_logs, debug_mode)


@app.command("apply")
def migrate(
path: str = PATH, generate_logs: bool = GENERATE_LOGS,
path: str = PATH, generate_logs: bool = GENERATE_LOGS, debug_mode: bool = DEBUG_MODE
) -> Set[FeatureSetPipeline]:
"""Scan and run database migrations for feature set pipelines defined under PATH.
Expand All @@ -172,5 +185,5 @@ def migrate(
import and instantiate them.
"""
pipe_set = __fs_objects(path)
Migrate(pipe_set).run(generate_logs)
Migrate(pipe_set).run(generate_logs, debug_mode)
return pipe_set
21 changes: 16 additions & 5 deletions butterfree/migrations/database_migration/database_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,15 @@ def _get_schema(
db_schema = []
return db_schema

def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
def apply_migration(
self, feature_set: FeatureSet, writer: Writer, debug_mode: bool
) -> None:
"""Apply the migration in the respective database.
Args:
feature_set: the feature set.
writer: the writer being used to load the feature set.
debug_mode: if active, it brings up the queries generated.
"""
logger.info(f"Migrating feature set: {feature_set.name}")

Expand All @@ -280,8 +283,16 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None:
fs_schema, table_name, db_schema, writer.write_to_entity
)

for q in queries:
logger.info(f"Applying this query: {q} ...")
self._client.sql(q)
if debug_mode:
print(
"#### DEBUG MODE ###\n"
f"Feature set: {feature_set.name}\n"
"Queries:\n"
f"{queries}"
)
else:
for q in queries:
logger.info(f"Applying this query: {q} ...")
self._client.sql(q)

logger.info(f"Feature Set migration finished successfully.")
logger.info(f"Feature Set migration finished successfully.")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.2.0.dev16"
__version__ = "1.2.0.dev17"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/butterfree/_cli/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ def test_migrate_success(self, mocker):
assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs)
assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"]

def test_migrate_all_pairs(self, mocker):
def test_migrate_run_methods(self, mocker):
mocker.patch.object(CassandraMigration, "apply_migration")
mocker.patch.object(migrate.Migrate, "_send_logs_to_s3")

all_fs = migrate.migrate("tests/mocks/entities/")
all_fs = migrate.migrate("tests/mocks/entities/", False, False)

assert CassandraMigration.apply_migration.call_count == 2

cassandra_pairs = [
call(pipe.feature_set, pipe.sink.writers[1]) for pipe in all_fs
call(pipe.feature_set, pipe.sink.writers[1], False) for pipe in all_fs
]
CassandraMigration.apply_migration.assert_has_calls(
cassandra_pairs, any_order=True
Expand Down

0 comments on commit 5a0a622

Please sign in to comment.