Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/momix-integration' into momix-in…
Browse files Browse the repository at this point in the history
…tegration
  • Loading branch information
linuxpizzacats committed Jun 15, 2023
2 parents 120ef0a + ffd4870 commit cbf86ee
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 23 deletions.
18 changes: 9 additions & 9 deletions scripts/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ def run_bbha_experiment(
dir_path = os.path.join(current_script_dir_name, app_folder)
create_folder_with_permissions(dir_path)

best_metric_with_all_features = f'{metric_description} with all the features'
best_metric_in_runs_key = f'Best {metric_description} (in {number_of_independent_runs} runs)'
res_csv = pd.DataFrame(columns=['dataset', 'Improved BBHA', 'Model',
best_metric_with_all_features = 'best_metric_with_all_features'
best_metric_in_runs_key = 'best_metric'
res_csv = pd.DataFrame(columns=['dataset', 'improved', 'model',
best_metric_with_all_features,
best_metric_in_runs_key,
f'Features with best {metric_description} (in {number_of_independent_runs} runs)',
f'CPU execution time ({number_of_independent_runs} runs) in seconds'])
'features',
'execution_time'])

# Gets survival data
x, y = read_survival_data(molecules_dataset, clinical_dataset)
Expand Down Expand Up @@ -296,12 +296,12 @@ def run_bbha_experiment(

experiment_results_dict = {
'dataset': molecules_dataset,
'Improved BBHA': 1 if run_improved else 0,
'Model': model_name,
'improved': 1 if run_improved else 0,
'model': model_name,
best_metric_with_all_features: round(all_features_concordance_index, 4),
best_metric_in_runs_key: round(best_metric, 4),
f'Features with best {metric_description} (in {number_of_independent_runs} runs)': ' | '.join(final_subset),
f'CPU execution time ({number_of_independent_runs} runs) in seconds': independent_run_time
'features': ' | '.join(final_subset),
'execution_time': independent_run_time
}

# Some extra reporting
Expand Down
3 changes: 1 addition & 2 deletions scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def main():
parameters_description = f'{params.number_of_clusters}_clusters_{params.clustering_algorithm}_algorithm'

# Spark settings
app_name = f"BBHA_{time.time()}".replace('.', '_')
sc = SparkContext()
sc.setLogLevel("ERROR")

Expand All @@ -237,7 +236,7 @@ def main():

# Runs normal Feature Selection experiment using BBHA
run_bbha_experiment(
app_name=app_name,
app_name=params.app_name,
use_load_balancer=params.use_load_balancer and params.model == 'svm', # TODO: implement for the RF and Cox Regression
more_is_better=True, # params.model != 'clustering', # CoxRegression is evaluated by the log likelihood. If it is lower, better! TODO: parametrize the clustering metric
svm_kernel=svm_kernel,
Expand Down
2 changes: 1 addition & 1 deletion scripts/metaheuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def get_best_spark(
by the star that computes the best fitness (from now, Black Hole)
"""
# Converts to a Numpy array to discard the star's index
workers_results_np = np.array(workers_results)
workers_results_np = np.array(workers_results, dtype=object)
workers_results_np_aux = np.array(
[np.array(a_list) for a_list in workers_results_np[:, 1]]) # Creates Numpy arrays from lists
if more_is_better:
Expand Down
6 changes: 6 additions & 0 deletions scripts/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

class Parameters:
"""Gets all the parameters from CLI."""
# App name, a folder with the same name will be created with the results
app_name: str

# URL to connect to the master
master_connection_url: str

# To use a Broadcast value instead of a pd.DataFrame
Expand Down Expand Up @@ -73,6 +77,7 @@ def __init__(self):
parser = ArgumentParser()

# General parameters
parser.add_argument("--app-name", dest='app_name', help="URL to connect to the master", type=str)
parser.add_argument("--master", dest='master', help="URL to connect to the master", type=str,
default="spark://master-node:7077")
parser.add_argument("--molecules-dataset", dest='molecules_dataset',
Expand Down Expand Up @@ -144,6 +149,7 @@ def __init__(self):
args = parser.parse_args()

# Assigns parameters
self.app_name = args.app_name
self.master_connection_url = args.master
self.svm_kernel = args.svm_kernel
self.svm_optimizer = args.svm_optimizer
Expand Down
12 changes: 7 additions & 5 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import emr
import requests
from flask import Flask, url_for, request, make_response, abort,redirect
import logging
import validations
from flask import Flask, url_for, request, make_response, abort


# BioAPI version
VERSION = '0.1.2'
Expand All @@ -17,10 +20,9 @@ def index():
@app.post("/job")
def schedule_job():
request_data = request.get_json()
if request_data is None or \
"name" not in request_data or \
"algorithm" not in request_data or \
"entrypoint_arguments" not in request_data:
if not validations.schedule_request_is_valid(request_data):
logging.warning('Invalid data received:')
logging.warning(request_data)
abort(400)

emr_response = emr.schedule(
Expand Down
10 changes: 4 additions & 6 deletions src/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@ class Algorithms(Enum):
BBHA = 1


def schedule(name: str, algorithm: Algorithms,
def schedule(job_name: str, algorithm: Algorithms,
entrypoint_arguments: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""
TODO: document
:param name:
:param job_name:
:param algorithm:
:param entrypoint_arguments:
:return:
"""
# Replaces all the non-alphanumeric chars from the job name to respect the [\.\-_/#A-Za-z0-9]+ regex
job_name = ''.join(e for e in name if e.isalnum() or e in ['.', '-', '_', '/', '#'])

args = _get_args(job_name, algorithm, entrypoint_arguments)

client = boto3.client('emr-containers')
response = None

Expand Down Expand Up @@ -154,7 +152,7 @@ def _get_args(name: str, algorithm: Algorithms, entrypoint_args=None) -> Dict[st
prefix = os.getenv('ENTRYPOINT_ARGS_KEY_PREFIX', "--")
for element in entrypoint_args:
clean_entrypoint_args.append(prefix + element["name"])
clean_entrypoint_args.append(element["value"])
clean_entrypoint_args.append(str(element["value"]))
entrypoint_args = clean_entrypoint_args

return {
Expand Down
18 changes: 18 additions & 0 deletions src/validations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any, Dict, List


def key_is_in_entrypoint_arguments(key: str, arguments: List[Dict[str, Any]]) -> bool:
"""Checks if a key is in the entrypoint arguments."""
for arg in arguments:
if arg['name'] == key:
return True
return False


def schedule_request_is_valid(request_data: Dict[str, Any]) -> bool:
"""Checks that all the parameters to schedule a task to EMR is valid."""
return request_data is not None and \
"name" in request_data and \
"algorithm" in request_data and \
"entrypoint_arguments" in request_data and \
key_is_in_entrypoint_arguments("app-name", request_data['entrypoint_arguments'])

0 comments on commit cbf86ee

Please sign in to comment.