From 5a0a62244b4c1ac3fe6e199575141bddff5d710e Mon Sep 17 00:00:00 2001 From: Mayara Moromisato <44944954+moromimay@users.noreply.github.com> Date: Mon, 10 May 2021 17:43:08 -0300 Subject: [PATCH] [MLOP-702] Debug mode for Automate Migration (#322) * Create flag debug-mode. * Fix tests. * Fix migrate test. --- butterfree/_cli/migrate.py | 35 +++++++++++++------ .../database_migration/database_migration.py | 21 ++++++++--- setup.py | 2 +- tests/unit/butterfree/_cli/test_migrate.py | 6 ++-- 4 files changed, 44 insertions(+), 20 deletions(-) diff --git a/butterfree/_cli/migrate.py b/butterfree/_cli/migrate.py index ebd21142..bfa18b46 100644 --- a/butterfree/_cli/migrate.py +++ b/butterfree/_cli/migrate.py @@ -101,6 +101,11 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]: False, help="To generate the logs in local file 'logging.json'." ) +DEBUG_MODE = typer.Option( + False, + help="To view the queries resulting from the migration, DON'T apply the migration.", +) + class Migrate: """Execute migration operations in a Database based on pipeline Writer. @@ -112,7 +117,7 @@ class Migrate: def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None: self.pipelines = pipelines - def _send_logs_to_s3(self, file_local: bool) -> None: + def _send_logs_to_s3(self, file_local: bool, debug_mode: bool) -> None: """Send all migration logs to S3.""" file_name = "../logging.json" @@ -120,11 +125,19 @@ def _send_logs_to_s3(self, file_local: bool) -> None: s3_client = boto3.client("s3") timestamp = datetime.datetime.now() - object_name = ( - f"logs/migrate/" - f"{timestamp.strftime('%Y-%m-%d')}" - f"/logging-{timestamp.strftime('%H:%M:%S')}.json" - ) + + if debug_mode: + object_name = ( + f"logs/migrate-debug-mode/" + f"{timestamp.strftime('%Y-%m-%d')}" + f"/logging-{timestamp.strftime('%H:%M:%S')}.json" + ) + else: + object_name = ( + f"logs/migrate/" + f"{timestamp.strftime('%Y-%m-%d')}" + f"/logging-{timestamp.strftime('%H:%M:%S')}.json" + ) bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET") try: @@ -143,23 +156,23 @@ def _send_logs_to_s3(self, file_local: bool) -> None: json_data = json.load(json_f) print(json_data) - def run(self, generate_logs: bool = False) -> None: + def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None: """Construct and apply the migrations.""" for pipeline in self.pipelines: for writer in pipeline.sink.writers: db = writer.db_config.database if db == "cassandra": migration = ALLOWED_DATABASE[db] - migration.apply_migration(pipeline.feature_set, writer) + migration.apply_migration(pipeline.feature_set, writer, debug_mode) else: logger.warning(f"Butterfree not supporting {db} Migrations yet.") - self._send_logs_to_s3(generate_logs) + self._send_logs_to_s3(generate_logs, debug_mode) @app.command("apply") def migrate( - path: str = PATH, generate_logs: bool = GENERATE_LOGS, + path: str = PATH, generate_logs: bool = GENERATE_LOGS, debug_mode: bool = DEBUG_MODE ) -> Set[FeatureSetPipeline]: """Scan and run database migrations for feature set pipelines defined under PATH. @@ -172,5 +185,5 @@ def migrate( import and instantiate them. """ pipe_set = __fs_objects(path) - Migrate(pipe_set).run(generate_logs) + Migrate(pipe_set).run(generate_logs, debug_mode) return pipe_set diff --git a/butterfree/migrations/database_migration/database_migration.py b/butterfree/migrations/database_migration/database_migration.py index de6b2f80..40192ff7 100644 --- a/butterfree/migrations/database_migration/database_migration.py +++ b/butterfree/migrations/database_migration/database_migration.py @@ -260,12 +260,15 @@ def _get_schema( db_schema = [] return db_schema - def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None: + def apply_migration( + self, feature_set: FeatureSet, writer: Writer, debug_mode: bool + ) -> None: """Apply the migration in the respective database. Args: feature_set: the feature set. writer: the writer being used to load the feature set. + debug_mode: if active, it brings up the queries generated. """ logger.info(f"Migrating feature set: {feature_set.name}") @@ -280,8 +283,16 @@ def apply_migration(self, feature_set: FeatureSet, writer: Writer,) -> None: fs_schema, table_name, db_schema, writer.write_to_entity ) - for q in queries: - logger.info(f"Applying this query: {q} ...") - self._client.sql(q) + if debug_mode: + print( + "#### DEBUG MODE ###\n" + f"Feature set: {feature_set.name}\n" + "Queries:\n" + f"{queries}" + ) + else: + for q in queries: + logger.info(f"Applying this query: {q} ...") + self._client.sql(q) - logger.info(f"Feature Set migration finished successfully.") + logger.info(f"Feature Set migration finished successfully.") diff --git a/setup.py b/setup.py index 6c2c2b46..56cf8842 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup __package_name__ = "butterfree" -__version__ = "1.2.0.dev16" +__version__ = "1.2.0.dev17" __repository_url__ = "https://github.com/quintoandar/butterfree" with open("requirements.txt") as f: diff --git a/tests/unit/butterfree/_cli/test_migrate.py b/tests/unit/butterfree/_cli/test_migrate.py index 475db15f..c0751c88 100644 --- a/tests/unit/butterfree/_cli/test_migrate.py +++ b/tests/unit/butterfree/_cli/test_migrate.py @@ -17,16 +17,16 @@ def test_migrate_success(self, mocker): assert all(isinstance(fs, FeatureSetPipeline) for fs in all_fs) assert sorted([fs.feature_set.name for fs in all_fs]) == ["first", "second"] - def test_migrate_all_pairs(self, mocker): + def test_migrate_run_methods(self, mocker): mocker.patch.object(CassandraMigration, "apply_migration") mocker.patch.object(migrate.Migrate, "_send_logs_to_s3") - all_fs = migrate.migrate("tests/mocks/entities/") + all_fs = migrate.migrate("tests/mocks/entities/", False, False) assert CassandraMigration.apply_migration.call_count == 2 cassandra_pairs = [ - call(pipe.feature_set, pipe.sink.writers[1]) for pipe in all_fs + call(pipe.feature_set, pipe.sink.writers[1], False) for pipe in all_fs ] CassandraMigration.apply_migration.assert_has_calls( cassandra_pairs, any_order=True