Skip to content

Commit

Permalink
Refactor CPMAnalysis class and prepare permutation test via SLURM
Browse files Browse the repository at this point in the history
  • Loading branch information
NilsWinter committed Aug 2, 2024
1 parent ff9868c commit b6bee24
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 139 deletions.
2 changes: 1 addition & 1 deletion cpm/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from cpm.cpm_analysis import CPMAnalysis
from cpm.cpm_analysis import CPMRegression
130 changes: 102 additions & 28 deletions cpm/cpm_analysis.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,85 @@
import os
import typer
import pickle
from typing import Union

import numpy as np
import pandas as pd

from sklearn.model_selection import BaseCrossValidator, BaseShuffleSplit
from sklearn.model_selection import BaseCrossValidator, BaseShuffleSplit, KFold

from cpm.models import LinearCPMModel
from cpm.edge_selection import UnivariateEdgeSelection
from cpm.utils import (score_regression, score_regression_models, regression_metrics,
from cpm.edge_selection import UnivariateEdgeSelection, PThreshold
from cpm.utils import (score_regression_models, regression_metrics,
train_test_split, vector_to_upper_triangular_matrix)
from cpm.fold import compute_inner_folds


class CPMAnalysis:
class CPMRegression:
def __init__(self,
results_directory: str,
cv: Union[BaseCrossValidator, BaseShuffleSplit],
cv_edge_selection: Union[BaseCrossValidator, BaseShuffleSplit],
edge_selection: UnivariateEdgeSelection,
estimate_model_increments: bool = True,
add_edge_filter: bool = True):
cv: Union[BaseCrossValidator, BaseShuffleSplit] = KFold(n_splits=10, shuffle=True, random_state=42),
cv_edge_selection: Union[BaseCrossValidator, BaseShuffleSplit] = KFold(n_splits=10, shuffle=True, random_state=42),
edge_selection: UnivariateEdgeSelection = UnivariateEdgeSelection(edge_statistic=['pearson'],
edge_selection=[PThreshold(threshold=[0.05],
correction=[None])]),
add_edge_filter: bool = True,
n_permutations: int = 0):
self.results_directory = results_directory
self.cv = cv
self.inner_cv = cv_edge_selection
self.edge_selection = edge_selection
self.estimate_model_increments = estimate_model_increments
self.add_edge_filter = add_edge_filter
self.n_permutations = n_permutations

self.results_outer_cv = None
self.results_inner_cv = list()

os.makedirs(results_directory, exist_ok=True)
def save_configuration(self, config_filename: str):
with open(os.path.splitext(config_filename)[0] + '.pkl', 'wb') as file:
pickle.dump({'cv': self.cv,
'inner_cv': self.inner_cv,
'edge_selection': self.edge_selection,
'add_edge_filter': self.add_edge_filter,
'n_permutations': self.n_permutations}, file)

def fit(self,
X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, pd.DataFrame, np.ndarray],
covariates: Union[pd.Series, pd.DataFrame, np.ndarray],
save_memory: bool = False):
os.makedirs(self.results_directory, exist_ok=True)
def load_configuration(self, results_directory, config_filename):
with open(config_filename, 'rb') as file:
loaded_config = pickle.load(file)

self.results_directory = results_directory
self.cv = loaded_config['cv']
self.inner_cv = loaded_config['inner_cv']
self.edge_selection = loaded_config['edge_selection']
self.add_edge_filter = loaded_config['add_edge_filter']
self.n_permutations = loaded_config['n_permutations']

def estimate(self,
X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, pd.DataFrame, np.ndarray],
covariates: Union[pd.Series, pd.DataFrame, np.ndarray]):

np.random.seed(42)

# estimate models
self._estimate(X=X, y=y, covariates=covariates, perm_run=0)

# run permutation test
for perm_id in range(1, self.n_permutations + 1):
self._estimate(X=X, y=y, covariates=covariates, perm_run=perm_id)

def _estimate(self,
X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, pd.DataFrame, np.ndarray],
covariates: Union[pd.Series, pd.DataFrame, np.ndarray],
perm_run: int = 0):
np.random.seed(42)
if perm_run > 0:
y = np.random.permutation(y)
current_results_directory = os.path.join(self.results_directory, 'permutation', f'{perm_run}')
else:
current_results_directory = self.results_directory
os.makedirs(current_results_directory, exist_ok=True)
self._write_info()

n_outer_folds = self.cv.n_splits
Expand All @@ -56,7 +97,7 @@ def fit(self,
for outer_fold, (train, test) in enumerate(self.cv.split(X, y)):
print(f"Running fold {outer_fold}")
fold_dir = os.path.join(self.results_directory, f'fold_{outer_fold}')
if not save_memory:
if not perm_run:
os.makedirs(fold_dir, exist_ok=True)

X_train, X_test, y_train, y_test, cov_train, cov_test = train_test_split(train, test, X, y, covariates)
Expand All @@ -77,7 +118,7 @@ def fit(self,
# aggregate over folds to calculate mean and std
agg_results = inner_cv_results.groupby(['network', 'param_id', 'model'])[regression_metrics].agg(['mean', 'std'])

if not save_memory:
if not perm_run:
inner_cv_results.to_csv(os.path.join(fold_dir, 'inner_cv_results.csv'))
agg_results.to_csv(os.path.join(fold_dir, 'inner_cv_results_mean_std.csv'))

Expand Down Expand Up @@ -119,24 +160,24 @@ def fit(self,

cv_results = self._calculate_model_increments(cv_results=cv_results, metrics=regression_metrics)

cv_results.to_csv(os.path.join(self.results_directory, 'cv_results.csv'))
cv_results.to_csv(os.path.join(current_results_directory, 'cv_results.csv'))

self.results_outer_cv = cv_results

agg_results = cv_results.groupby(['network', 'model'])[regression_metrics].agg(['mean', 'std'])
agg_results.to_csv(os.path.join(self.results_directory, 'cv_results_mean_std.csv'), float_format='%.4f')
agg_results.to_csv(os.path.join(current_results_directory, 'cv_results_mean_std.csv'), float_format='%.4f')

if not save_memory:
predictions.to_csv(os.path.join(self.results_directory, 'predictions.csv'))
if not perm_run:
predictions.to_csv(os.path.join(current_results_directory, 'predictions.csv'))

for sign, edges in [('positive', positive_edges), ('negative', negative_edges)]:
np.save(os.path.join(self.results_directory, f'{sign}_edges.npy'), vector_to_upper_triangular_matrix(edges[0]))
np.save(os.path.join(current_results_directory, f'{sign}_edges.npy'), vector_to_upper_triangular_matrix(edges[0]))

weights_edges = np.sum(edges, axis=0) / edges.shape[0]
overlap_edges = weights_edges == 1

np.save(os.path.join(self.results_directory, f'weights_{sign}_edges.npy'), vector_to_upper_triangular_matrix(weights_edges))
np.save(os.path.join(self.results_directory, f'overlap_{sign}_edges.npy'), vector_to_upper_triangular_matrix(overlap_edges))
np.save(os.path.join(current_results_directory, f'weights_{sign}_edges.npy'), vector_to_upper_triangular_matrix(weights_edges))
np.save(os.path.join(current_results_directory, f'overlap_{sign}_edges.npy'), vector_to_upper_triangular_matrix(overlap_edges))

return agg_results

Expand Down Expand Up @@ -222,7 +263,7 @@ def permutation_test(self,
y_perm = np.random.permutation(y)
self.results_directory = os.path.join(original_results_directory, 'permutation', f'{i}')
if not os.path.exists(self.results_directory):
self.fit(X, y_perm, covariates, save_memory=True)
self.estimate(X, y_perm, covariates, save_memory=True)
res = pd.read_csv(os.path.join(self.results_directory, 'cv_results_mean_std.csv'), header=[0, 1], index_col=[0, 1])
res = res.loc[:, res.columns.get_level_values(1) == 'mean']
res.columns = res.columns.droplevel(1)
Expand Down Expand Up @@ -329,4 +370,37 @@ def calculate_p_values(self, true_results, perms):
p_values_df = pd.concat(p_values).reset_index(drop=True)
p_values_df = p_values_df.set_index(['network', 'model'])

return p_values_df
return p_values_df


def main(results_directory: str = typer.Option(...,
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
readable=True,
resolve_path=True,
help="Define results folder for analysis"),
data_directory: str = typer.Option(
...,
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
readable=True,
resolve_path=True,
help="Path to input data containing targets.csv and data.csv."),
config_file: str = typer.Option(..., help="Absolute path to config file"),
perm_run: int = typer.Option(default=0, help="Current permutation run.")):

X = np.load(os.path.join(data_directory, 'X.npy'))
y = np.load(os.path.join(data_directory, 'y.npy'))
covariates = np.load(os.path.join(data_directory, 'covariates.npy'))

cpm = CPMRegression(results_directory=results_directory)
cpm.load_configuration(results_directory=results_directory, config_filename=config_file)
cpm._estimate(X=X, y=y, covariates=covariates, perm_run=perm_run)


if __name__ == "__main__":
typer.run(main)
84 changes: 11 additions & 73 deletions cpm/edge_selection.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,19 @@
import pandas as pd
import numpy as np
from scipy.stats import ttest_1samp, pearsonr, t, rankdata
from pingouin import partial_corr

from scipy.stats import ttest_1samp, t, rankdata
from typing import Union

from sklearn.base import BaseEstimator
from sklearn.utils.metaestimators import _BaseComposition
from sklearn.model_selection import ParameterGrid
from sklearn.linear_model import LinearRegression
import statsmodels.stats.multitest as multitest


class BaseEdgeSelector(BaseEstimator):
def transform(self):
pass


def one_sample_t_test(x):
# use two-sided for correlations (functional connectome) or one-sided positive for NOS etc (structural connectome)
_, p_value = ttest_1samp(x, popmean=0, nan_policy='omit', alternative='two-sided')
return p_value


def partial_correlation(X, y, covariates, method: str = 'pearson'):
p_values = list()

df = pd.concat([pd.DataFrame(X, columns=[f"x{s}" for s in range(X.shape[1])]), pd.DataFrame({'y': y})], axis=1)
cov_names = list()
for c in range(covariates.shape[1]):
df[f'cov{c}'] = covariates[:, c]
cov_names.append(f'cov{c}')

for xi in range(X.shape[1]):
res = partial_corr(data=df, x=f'x{xi}', y='y', covar=cov_names, method=method)['p-val'].iloc[0]
p_values.append(res)
return np.asarray(p_values)


def pearson_correlation(x, Y):
correlations = np.apply_along_axis(lambda col: pearsonr(x, col), 0, Y)
return correlations[0, :], correlations[1, :]


def compute_t_and_p_values(correlations, df):
# Calculate t-statistics
t_stats = correlations * np.sqrt(df / (1 - correlations ** 2))
Expand Down Expand Up @@ -132,6 +104,11 @@ def semi_partial_correlation_spearman(x, Y, Z):
return semi_partial_correlation(x, Y, Z, rank=True)


class BaseEdgeSelector(BaseEstimator):
def select(self, r, p):
pass


class PThreshold(BaseEdgeSelector):
def __init__(self, threshold: Union[float, list] = 0.05, correction: Union[str, list] = None):
"""
Expand Down Expand Up @@ -170,45 +147,6 @@ def __init__(self, k: Union[int, list] = None):
self.k = k


class CPMPipeline(_BaseComposition):
def __init__(self, elements):
self.elements = elements
self.current_config = None

def get_params(self, deep=True):
"""Get parameters for this estimator.
Parameters
----------
deep : boolean, optional
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
return self._get_params('elements', deep=deep)

def set_params(self, **kwargs):
"""Set the parameters of this estimator.
Valid parameter keys can be listed with ``get_params()``.
Returns
-------
self
"""
if self.current_config is not None and len(self.current_config) > 0:
if kwargs is not None and len(kwargs) == 0:
raise ValueError("Pipeline cannot set parameters to elements with an emtpy dictionary. Old values persist")
self.current_config = kwargs
self._set_params('elements', **kwargs)

return self

@property
def named_steps(self):
return dict(self.elements)


class UnivariateEdgeSelection(BaseEstimator):
def __init__(self,
edge_statistic: Union[str, list] = 'pearson',
Expand All @@ -229,13 +167,13 @@ def _generate_config_grid(self):
return ParameterGrid(grid_elements)

def fit_transform(self, X, y=None, covariates=None):
r_edges, p_edges = self._correlation(X=X, y=y, covariates=covariates)
r_edges, p_edges = self.compute_edge_statistics(X=X, y=y, covariates=covariates)
edges = self.edge_selection.select(r=r_edges, p=p_edges)
return edges

def _correlation(self, X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, pd.DataFrame, np.ndarray],
covariates: Union[pd.Series, pd.DataFrame, np.ndarray]):
def compute_edge_statistics(self, X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, pd.DataFrame, np.ndarray],
covariates: Union[pd.Series, pd.DataFrame, np.ndarray]):
if self.edge_statistic == 'pearson':
r_edges, p_edges = pearson_correlation_with_pvalues(y, X)
elif self.edge_statistic == 'spearman':
Expand Down
6 changes: 3 additions & 3 deletions cpm/fold.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd

from cpm.models import LinearCPMModel
from cpm.models import LinearCPMModel, NetworkDict, ModelDict
from cpm.utils import score_regression_models, train_test_split


Expand All @@ -17,8 +17,8 @@ def compute_inner_folds(X, y, covariates, cv, edge_selection, param, param_id):
y_pred = model.predict(X_test, cov_test)
metrics = score_regression_models(y_true=y_test, y_pred=y_pred)

for model_type in ['full', 'covariates', 'connectome', 'residuals']:
for network in ['positive', 'negative', 'both']:
for model_type in ModelDict().keys():
for network in NetworkDict().keys():
res = metrics[model_type][network]
res['model'] = model_type
res['network'] = network
Expand Down
4 changes: 2 additions & 2 deletions cpm/reporting/nichord_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from nichord.convert import convert_matrix
from nichord.chord import plot_chord
from nichord.combine import plot_and_combine
from cpm.simulate_data import simulate_data
from cpm.simulate_data import simulate_regression_data
from nilearn.datasets import fetch_atlas_schaefer_2018
import numpy as np
import matplotlib.pyplot as plt
Expand Down Expand Up @@ -63,7 +63,7 @@ def vector_to_matrix_3d(vector_2d, shape):
return matrix_3d


X, y, covariates = simulate_data(n_features=4950, n_informative_features=100)
X, y, covariates = simulate_regression_data(n_features=4950, n_informative_features=100)
atlas = fetch_atlas_schaefer_2018(n_rois=100)
networks = [str(roi).split("_")[-2] for roi in list(atlas["labels"])]
regions = [str(roi).split("b'7Networks_")[1] for roi in list(atlas["labels"])]
Expand Down
16 changes: 8 additions & 8 deletions cpm/simulate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from sklearn.preprocessing import StandardScaler


def simulate_data(n_samples: int = 500,
n_features: int = 4950,
n_informative_features: int = 200,
n_covariates: int = 5,
noise_level: float = 0.1,
feature_effect_size: float = 0.2, # Reduced effect size for features
covariate_effect_size: float = 5.0,
random_state: int = 42):
def simulate_regression_data(n_samples: int = 500,
n_features: int = 4950,
n_informative_features: int = 200,
n_covariates: int = 5,
noise_level: float = 0.1,
feature_effect_size: float = 0.2, # Reduced effect size for features
covariate_effect_size: float = 5.0,
random_state: int = 42):
"""
Simulates data with specified parameters for regression problems.
Expand Down
Loading

0 comments on commit b6bee24

Please sign in to comment.