-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain.py
381 lines (313 loc) · 13.3 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
import click
import json
import mlflow
import logging
from mlflow.models.signature import infer_signature
import numpy as np
import pandas as pd
from sklearn.metrics import recall_score, accuracy_score
from lightgbm import LGBMClassifier
import os
from dotenv import load_dotenv
from sklearn.pipeline import Pipeline
from explanation.explainers import run_explainer
import models
from evaluation import evaluate_cv, evaluate
from etl import etl
from preprocessing import OneHotTransformer, myPCA
from utils import create_veris_csv, load_dataset, download_veris_csv, check_y_statistics, train_test_split_and_log, mlflow_register_model
from inference import ModelOut, mlflow_serve_conda_env
from distutils import util
# Reset mlflow tracking uri
load_dotenv()
# env variables
MLFLOW_TRACKING_URI = os.environ.get('MLFLOW_TRACKING_URI')
# os.environ["MLFLOW_TRACKING_URI"] = os.environ.get('MLFLOW_TRACKING_URI')
# MLFLOW_TRACKING_URI = os.environ.get('MLFLOW_TRACKING_URI')
# Parse default settings from training json
# with open('./train_config.json') as json_file:
# default_arguments = json.load(json_file)\
# # str to bool
# default_arguments["merge"] = util.strtobool(default_arguments["merge"])
# default_arguments["explain"] = util.strtobool(default_arguments["explain"])
def train_evaluate(X, y, estimator, train_size, n_folds,
split_random_state, n_components, explain,
shap_data_percentage, shap_test_over_train_percentage):
"""
Performs a training pipeline and evaluation for soft classification
given a dataset y|X, the classifier model and other training parameters
Parameters
----------
X: Pandas DataFrame
Input dataset
y: Pandas Series
Output vector (probabilistic values)
estimator: sklearn classifier
train_size: float
The portion of the training set 0 < train_size <= 1
If train_size == 1 cross validation is performed instead of
train / test split.
n_folds: int
Number of folds in case of cross validation (in case train_size == 1)
split_random_state: int
The random state for train / test split or CV
n_components: int
Components of the PCA step of the pipeline. If 0 then PCA
is not performed
explain: boolean
Whether to use SHAP for explanations
shap_data_percentage: float
shap_test_over_train_percentage: float
Returns
----------
Dict
Dictionary of all evaluation metrics
"""
logging.info("Checking capability of training")
error_tags = check_y_statistics(y)
mlflow.set_tags(error_tags)
# skip loop if y is small
error = error_tags['error_class']
if error != '-':
raise(ValueError(f"{error}, preferrably seek another way of learning!\n"))
else:
logging.info("Passed!\n")
if train_size < 1:
logging.info("Splitting dataset\n")
# shuffle is set to False to keep date order
X_train, X_test, y_train, y_test = \
train_test_split_and_log(X, y,
train_size,
split_random_state,
shuffle=False)
else:
X_train = X
y_train = y
logging.info("Preprocessing, training & evaluation\n")
pipeline = Pipeline(steps=[
('one_hot_encoder', OneHotTransformer(
feature_name='victim.industry.name',
feature_labels=X['victim.industry.name'].unique().tolist())),
('PCA', myPCA(n_components)),
('classifier', estimator)
])
if np.allclose(train_size, 1): # CV evaluation for train_size = 1
metrix_dict = evaluate_cv(pipeline, X_train, y_train,
n_folds, split_random_state)
# Refit model to log it afterwards
model = pipeline.fit(X_train, y_train)
else: # evaluation for train_size < 1 (no cross validation)
model = pipeline.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_test)
y_pred = model.predict(X_test)
metrix_dict = evaluate(y_test, y_pred, y_pred_proba)
# Get model signature and log model
signature = infer_signature(X, model.predict_proba(X.iloc[0:2]))
# input example exclusively for training step of pipeline
# input_example = pipeline['PCA'].fit_transform(pipeline['one_hot_encoder'].fit_transform(X_train.head(1500))).head(1)
mlflow.pyfunc.log_model(artifact_path="model",
python_model=ModelOut(model=pipeline),
code_path=['inference.py'],
conda_env=mlflow_serve_conda_env,
input_example=X.head(1))
# Log parameters of used estimator
mlflow.log_params(estimator.get_params())
# Log metrix
mlflow.log_metrics(metrix_dict)
# explainers
if explain and train_size < 1:
try:
X_shap_test, model_shap = \
run_explainer(estimator,
X_train,
X_test,
y_train,
y_test,
pipeline,
data_percentage=shap_data_percentage,
test_percentage=shap_test_over_train_percentage
)
# check if there is an explaination produced
# for the current estimator
#if not isinstance(X_shap_test, str) and not isinstance(model_shap, str):
logging.info("Explainer finished")
# log explanation
mlflow.shap.log_explanation(model_shap.predict,
X_shap_test,
artifact_path="model/explanation")
mlflow.log_artifacts("explain_plots", artifact_path="model/explanation")
except TypeError:
pass
return metrix_dict
@click.command()
@click.option("--task", "-t",
type=click.Choice(
['attribute',
'asset.variety',
'asset.assets.variety.S',
'asset.assets.variety.M',
'asset.assets.variety.U',
'asset.assets.variety.P',
'asset.assets.variety.T',
'action',
'action.error.variety',
'action.hacking.variety',
'action.misuse.variety',
'action.physical.variety',
'action.malware.variety',
'action.social.variety']),
# default=default_arguments['task'],
help="Learning task"
)
@click.option("--target", "-tt",
type=str,
# default=default_arguments['target'],
help="Specific target variable. Omit this option to get list of options according to task"
)
@click.option("--algo", "-a",
type=click.Choice(
['SVM',
'RF',
'LR',
'GNB',
'LGBM',
'KNN']),
multiple=False,
# default=default_arguments['algo'],
help="Algorithm"
)
@click.option("--hyperparams", "-h",
type=str,
# default=default_arguments["hyperparams"],
help=""" "Hyperapameters of algorithm. e.g. '{"C": 1, "gamma": 0.1}' """
)
@click.option("--imputer", "-i",
type=click.Choice(['dropnan', 'default']),
# default=default_arguments["imputer"],
help="Imputation strategy"
)
@click.option("--train-size", "-ts",
type=float,
# default=default_arguments['train_size'],
help="Training set size. If equals 1 then cross validation is performed to evaluate the models"
)
@click.option("--split-random-state", "-rs",
type=int,
# default=default_arguments['split_random_state'],
help="Random state for splitting train / test or cv"
)
@click.option("--n-folds", "-f",
type=int,
# default=default_arguments['n_folds'],
help="Number of folds for CV if there training set is all dataset"
)
@click.option("--pca", "-p",
type=int,
# default=default_arguments['pca'],
help="Number of PCA components. 0 means no PCA"
)
@click.option("--explain", "-e",
type=str,
# default=default_arguments["explain"],
help="Whether to use SHAP for explanations. Requires train_size < 1 and it is generally a slow process. \
Accepted values: ['y', 'yes', 't', 'true', 'on'] \
and ['n', 'no', 'f', false', 'off']"
)
@click.option("--merge", "-m",
type=str,
# default=default_arguments["merge"],
help="Whether to merge Brute force, SQL injection and DoS columns for hacking \
and malware cases. Accepted values: ['y', 'yes', 't', 'true', 'on'] \
and ['n', 'no', 'f', false', 'off']"
)
@click.option("--train-size", "-ts",
type=float,
# default=default_arguments['train_size'],
help="Training set size. If equals 1 then cross validation is performed to evaluate the models"
)
@click.option("--shap-data-percentage", "-sdp",
type=float,
# default=default_arguments['train_size'],
help="Dataset fraction to be used with SHAP for explanations"
)
@click.option("--shap-test-over-train-percentage", "-sdp",
type=float,
# default=default_arguments['train_size'],
help="Training set fraction to be used as test with SHAP for explanations"
)
def train_evaluate_register(task,
target,
algo,
hyperparams,
imputer,
merge,
train_size,
split_random_state,
n_folds,
pca,
explain,
shap_data_percentage,
shap_test_over_train_percentage):
"""
Performs all training, evaluation and model registration with MLflow.
If parameters are unset values from train_config.json are imported
Parameters
----------
See click argument parser
Returns
----------
"""
# Convert some str parameters to boolean (MLproject does not permit boolean)
explain = util.strtobool(explain)
merge = util.strtobool(merge)
# Load hyperparam str as json
hyperparams = json.loads(hyperparams)
# Set tracking URI
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
logging.info("Current tracking uri: {}".format(mlflow.get_tracking_uri()))
if np.allclose(train_size, 1) and n_folds < 2:
mlflow.set_tags({'error class': 'train < 1 & n_folds < 2'})
raise(ValueError('\nSelect train_size < 1 or n_folds > = 2.\n'))
# Activate probability for svm
if algo == 'SVM':
hyperparams['probability'] = True
estimator = [i for i in models.models
if i['family_name']==algo][0]['class'](**hyperparams)
# Load data
#veris_df = download_veris_csv()
#veris_df = load_dataset()
veris_df = create_veris_csv()
# Start training workflow and logs
with mlflow.start_run(run_name=target):
# ETL
X, y, predictors, target = etl(veris_df,
task,
target,
merge)
logging.info(f'\nTraining {algo} on: {target}\n')
# Tags
mlflow.set_tag("mlflow.runName", f'{target}')
mlflow.set_tags({'target': target,
'predictors': predictors,
'n_samples': len(y),
'class_balance': sum(y)/len(y),
'imputer': imputer,
'merge': merge
}
)
# Train
train_evaluate(X,
y,
estimator,
train_size,
n_folds,
split_random_state,
pca,
explain,
shap_data_percentage,
shap_test_over_train_percentage)
# Update Model Registry if model is better than current
mlflow_register_model(model_name=target)
return
if __name__ == '__main__':
train_evaluate_register()