-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* [MLOP-636] Create migration classes (#282) * [MLOP-635] Rebase Incremental Job/Interval Run branch for test on selected feature sets (#278) * Add interval branch modifications. * Add interval_runs notebook. * Add tests. * Apply style (black, flack8 and mypy). * Fix tests. * Change version to create package dev. * Allow slide selection (#293) * Fix Slide Duration Typo (#295) * [MLOP-637] Implement diff method (#292) * [MLOP-640] Create CLI with migrate command (#298) * [MLOP-645] Implement query method, cassandra (#291) * [MLOP-671] Implement get_schema on Spark client (#301) * [MLOP-648] Implement query method, metastore (#294) * Fix Validation Step (#302) * [MLOP-647] [MLOP-646] Apply migrations (#300) * add apply migration method * add test apply migration * add migrate actor with tests * mypy compliant * fix test interaction with mocked object * Rebase and some adjusts. Co-authored-by: Mayara Moromisato <[email protected]> * [BUG] Apply create_partitions to historical validate (#303) * Apply create_partitions to historical validate. * Remove comments and adjusts. * [BUG] Fix key path for validate read (#304) * Fix key path * bump version Co-authored-by: AlvaroMarquesAndrade <1a789766b1c4c8b679e80f11fa6d63d42fa4bcdf> * [FIX] Add Partition types for Metastore (#305) * [MLOP-639] Track logs in S3 (#306) * Apply tracking logs and logging config. * Adjusts in CLI and logging.conf. * Some adjusts. * Change version to generate new dev package * Fix version. * Apply style. * Add new assert in the migrate unit test. * [BUG] Change logging config (#307) * Change logging config. * Some adjusts. * Remove a code smell. * Change solution for tracking logs (#308) * Change tracking logs method. * Change version to generate dev package. * Change path name in S3 * Read and write consistency level options (#309) * modify cassandra client to be region aware * add option for the user to set read and write consistency levels on cassandra config * add tests * use env vars instead * Update butterfree/configs/db/cassandra_config.py Co-authored-by: Rodrigo Martins de Oliveira <[email protected]> * Update butterfree/configs/db/cassandra_config.py Co-authored-by: Rodrigo Martins de Oliveira <[email protected]> Co-authored-by: Rodrigo Martins de Oliveira <[email protected]> * Fix kafka reader. (#310) * Fix path validate. (#311) * Add local dc property (#312) * add local dc property * update version * Remove metastore migrate (#313) * Remove metastore migrate. * Change version to create a dev package. * Fix link in our docs. (#315) * [BUG] Fix Cassandra Connect Session (#316) * Fix Cassandra Connect Session. * Apply style. * Fix migration query. (#318) * Fix migration query add type key. (#319) * Fix db-config condition (#321) * Fix db-config condition. * Apply style. * MLOP-642 Document migration in Butterfree (#320) * update docs * add more information and reference new cli.md file * [MLOP-702] Debug mode for Automate Migration (#322) * Create flag debug-mode. * Fix tests. * Fix migrate test. * [MLOP-727] Improve logging messages (#325) * Fix logging message for local file * Remove json import * [MLOP-728] Improve logging messages (#324) * Improve logs. * Revert debug-mode condition. * Fix method to generate agg feature name. (#326) * [MLOP-691] Include step to add partition to SparkMetastore during writing of Butterfree (#327) * Change writer type for interval mode. * Some adjusts. * Release 1.2.0 Co-authored-by: AlvaroMarquesAndrade <[email protected]> Co-authored-by: Igor Gustavo Hoelscher <[email protected]> Co-authored-by: Felipe Victorino Caputo <[email protected]> Co-authored-by: Rodrigo Martins de Oliveira <[email protected]> Co-authored-by: Gabriel Brandão <[email protected]>
- Loading branch information
1 parent
e443db9
commit 1058c31
Showing
124 changed files
with
6,475 additions
and
553 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ coverage.xml | |
*.cover | ||
.hypothesis/ | ||
*cov.xml | ||
test_folder/ | ||
|
||
# Translations | ||
*.mo | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import typer | ||
|
||
from butterfree._cli import migrate | ||
|
||
app = typer.Typer() | ||
app.add_typer(migrate.app, name="migrate") | ||
|
||
if __name__ == "__main__": | ||
app() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
import datetime | ||
import importlib | ||
import inspect | ||
import os | ||
import pkgutil | ||
import sys | ||
from typing import Set | ||
|
||
import boto3 | ||
import setuptools | ||
import typer | ||
from botocore.exceptions import ClientError | ||
|
||
from butterfree.configs import environment | ||
from butterfree.configs.logger import __logger | ||
from butterfree.migrations.database_migration import ALLOWED_DATABASE | ||
from butterfree.pipelines import FeatureSetPipeline | ||
|
||
app = typer.Typer(help="Apply the automatic migrations in a database.") | ||
|
||
logger = __logger("migrate", True) | ||
|
||
|
||
def __find_modules(path: str) -> Set[str]: | ||
modules = set() | ||
for pkg in setuptools.find_packages(path): | ||
modules.add(pkg) | ||
pkg_path = path + "/" + pkg.replace(".", "/") | ||
|
||
# different usage for older python3 versions | ||
if sys.version_info.minor < 6: | ||
for _, name, is_pkg in pkgutil.iter_modules([pkg_path]): | ||
if not is_pkg: | ||
modules.add(pkg + "." + name) | ||
else: | ||
for info in pkgutil.iter_modules([pkg_path]): | ||
if not info.ispkg: | ||
modules.add(pkg + "." + info.name) | ||
return modules | ||
|
||
|
||
def __fs_objects(path: str) -> Set[FeatureSetPipeline]: | ||
logger.info(f"Looking for python modules under {path}...") | ||
modules = __find_modules(path) | ||
if not modules: | ||
logger.error(f"Path: {path} not found!") | ||
return set() | ||
|
||
logger.info(f"Importing modules...") | ||
package = ".".join(path.strip("/").split("/")) | ||
imported = set( | ||
importlib.import_module(f".{name}", package=package) for name in modules | ||
) | ||
|
||
logger.info(f"Scanning modules...") | ||
content = { | ||
module: set( | ||
filter( | ||
lambda x: not x.startswith("__"), # filter "__any__" attributes | ||
set(item for item in dir(module)), | ||
) | ||
) | ||
for module in imported | ||
} | ||
|
||
instances = set() | ||
for module, items in content.items(): | ||
for item in items: | ||
value = getattr(module, item) | ||
if not value: | ||
continue | ||
|
||
# filtering non-classes | ||
if not inspect.isclass(value): | ||
continue | ||
|
||
# filtering abstractions | ||
if inspect.isabstract(value): | ||
continue | ||
|
||
# filtering classes that doesn't inherit from FeatureSetPipeline | ||
if not issubclass(value, FeatureSetPipeline): | ||
continue | ||
|
||
# filtering FeatureSetPipeline itself | ||
if value == FeatureSetPipeline: | ||
continue | ||
|
||
instances.add(value) | ||
|
||
logger.info("Creating instances...") | ||
return set(value() for value in instances) | ||
|
||
|
||
PATH = typer.Argument( | ||
..., help="Full or relative path to where feature set pipelines are being defined.", | ||
) | ||
|
||
GENERATE_LOGS = typer.Option( | ||
False, help="To generate the logs in local file 'logging.json'." | ||
) | ||
|
||
DEBUG_MODE = typer.Option( | ||
False, | ||
help="To view the queries resulting from the migration, DON'T apply the migration.", | ||
) | ||
|
||
|
||
class Migrate: | ||
"""Execute migration operations in a Database based on pipeline Writer. | ||
Attributes: | ||
pipelines: list of Feature Set Pipelines to use to migration. | ||
""" | ||
|
||
def __init__(self, pipelines: Set[FeatureSetPipeline],) -> None: | ||
self.pipelines = pipelines | ||
|
||
def _send_logs_to_s3(self, file_local: bool, debug_mode: bool) -> None: | ||
"""Send all migration logs to S3.""" | ||
file_name = "../logging.json" | ||
|
||
if not file_local and os.path.exists(file_name): | ||
s3_client = boto3.client("s3") | ||
|
||
timestamp = datetime.datetime.now() | ||
|
||
if debug_mode: | ||
object_name = ( | ||
f"logs/migrate-debug-mode/" | ||
f"{timestamp.strftime('%Y-%m-%d')}" | ||
f"/logging-{timestamp.strftime('%H:%M:%S')}.json" | ||
) | ||
else: | ||
object_name = ( | ||
f"logs/migrate/" | ||
f"{timestamp.strftime('%Y-%m-%d')}" | ||
f"/logging-{timestamp.strftime('%H:%M:%S')}.json" | ||
) | ||
bucket = environment.get_variable("FEATURE_STORE_S3_BUCKET") | ||
|
||
try: | ||
s3_client.upload_file( | ||
file_name, | ||
bucket, | ||
object_name, | ||
ExtraArgs={"ACL": "bucket-owner-full-control"}, | ||
) | ||
except ClientError: | ||
raise | ||
|
||
os.remove(file_name) | ||
elif os.path.exists(file_name): | ||
print("Logs written to ../logging.json") | ||
else: | ||
print("No logs were generated.") | ||
|
||
def run(self, generate_logs: bool = False, debug_mode: bool = False) -> None: | ||
"""Construct and apply the migrations.""" | ||
for pipeline in self.pipelines: | ||
for writer in pipeline.sink.writers: | ||
db = writer.db_config.database | ||
if db == "cassandra": | ||
migration = ALLOWED_DATABASE[db] | ||
migration.apply_migration(pipeline.feature_set, writer, debug_mode) | ||
else: | ||
logger.warning(f"Butterfree not supporting {db} Migrations yet.") | ||
|
||
self._send_logs_to_s3(generate_logs, debug_mode) | ||
|
||
|
||
@app.command("apply") | ||
def migrate( | ||
path: str = PATH, generate_logs: bool = GENERATE_LOGS, debug_mode: bool = DEBUG_MODE | ||
) -> Set[FeatureSetPipeline]: | ||
"""Scan and run database migrations for feature set pipelines defined under PATH. | ||
Butterfree will scan a given path for classes that inherit from its | ||
FeatureSetPipeline and create dry instances of it to extract schema and writer | ||
information. By doing this, Butterfree can compare all defined feature set schemas | ||
to their current state on each sink being used. | ||
All pipelines must be under python modules inside path, so we can dynamically | ||
import and instantiate them. | ||
""" | ||
pipe_set = __fs_objects(path) | ||
Migrate(pipe_set).run(generate_logs, debug_mode) | ||
return pipe_set |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.