From 63df414823ac45f5b41dde507f250313b1fa7984 Mon Sep 17 00:00:00 2001 From: JD Russo Date: Wed, 12 Jan 2022 15:07:13 -0800 Subject: [PATCH 1/6] Merging version 1.1 (#15) * Bugfix for aggregate clustering not getting iters_to_use * Remove pip-style dependency for Ray and add local package install to conda * Bugfix to validation iterations used * Update WESTPA dependency to dev version * Add adaptive FPT calculation * Added new plot, flux profile vs committor colored by pcoord, and corrected some plotting options. * Compatibility for empty models (i.e. failures from block-validation) in plot output Co-authored-by: John Russo --- environment.yml | 4 + msm_we/fpt.py | 281 ++++++++++++++++++++++++++++++++++++++++++++--- msm_we/msm_we.py | 113 +++++++++++++++++-- setup.py | 3 +- 4 files changed, 369 insertions(+), 32 deletions(-) diff --git a/environment.yml b/environment.yml index 37ad3cd..25a51d6 100644 --- a/environment.yml +++ b/environment.yml @@ -17,3 +17,7 @@ dependencies: - sphinx_rtd_theme - tqdm - scipy + - pip + - pip: + - ray + - . diff --git a/msm_we/fpt.py b/msm_we/fpt.py index 6979d8c..d1901fb 100644 --- a/msm_we/fpt.py +++ b/msm_we/fpt.py @@ -4,6 +4,7 @@ """ import numpy as np from copy import deepcopy +import tqdm.auto as tqdm import msm_we.utils as utils from msm_we.utils import Interval @@ -12,7 +13,13 @@ class DirectFPT: @classmethod def mean_fpts( - cls, trajectories, stateA=None, stateB=None, discrete=True, n_variables=None, lag_time=1, + cls, + trajectories, + stateA=None, + stateB=None, + discrete=True, + n_variables=None, + lag_time=1, ): """Empirical mean first passage times (MFPTs) calculation (no model involved) by tracing the trajectories. Notice the difference between @@ -57,7 +64,9 @@ def mean_fpts( multiplied by the lag_time used (not the physical units). """ - passage_timesAB, passage_timesBA, tb_values = cls.fpts(trajectories, stateA, stateB, discrete, n_variables, lag_time) + passage_timesAB, passage_timesBA, tb_values = cls.fpts( + trajectories, stateA, stateB, discrete, n_variables, lag_time + ) n_AB = len(passage_timesAB) n_BA = len(passage_timesBA) @@ -90,7 +99,13 @@ def mean_fpts( @classmethod def fpts( - cls, trajectories, stateA=None, stateB=None, discrete=True, n_variables=None, lag_time=1, + cls, + trajectories, + stateA=None, + stateB=None, + discrete=True, + n_variables=None, + lag_time=1, ): """Empirical first passage times (FPTs) calculation (no model involved) by tracing the trajectories. IMPORTANT: Notice the difference in notation @@ -136,7 +151,9 @@ def fpts( """ if (stateA is None) or (stateB is None): - raise Exception("The final and initial states have " "to be defined to compute the MFPT") + raise Exception( + "The final and initial states have " "to be defined to compute the MFPT" + ) if not discrete: """ @@ -144,7 +161,9 @@ def fpts( is a set of continuous trajectories """ if n_variables is None: - raise Exception("In continuous trajectories the number of " "variables is needed") + raise Exception( + "In continuous trajectories the number of " "variables is needed" + ) stateA = Interval(stateA, n_variables) stateB = Interval(stateB, n_variables) @@ -207,7 +226,9 @@ def mean_fpts(cls, tmatrix, stateA, stateB, lag_time=1): pass @classmethod - def directional_mfpt(cls, transition_matrix, stateA, stateB, ini_probs=None, lag_time=1): + def directional_mfpt( + cls, transition_matrix, stateA, stateB, ini_probs=None, lag_time=1 + ): """Computes the mean-first passage time in a single direction using a recursive procedure This method is useful when there is no B->A ensemble but only A->B transitions, @@ -333,7 +354,9 @@ def mfpts_matrix(cls, transition_matrix, lag_time=1): temp_values = [] for i in range(size): - temp_values.append(cls.mfpts_to_target_microstate(transition_matrix, i, lag_time)) + temp_values.append( + cls.mfpts_to_target_microstate(transition_matrix, i, lag_time) + ) mfpt_m = np.array(temp_values).T # to nummpy array and transposed return mfpt_m @@ -420,9 +443,18 @@ def max_commute_time(cls, matrix_of_mfpts): @classmethod def fpt_distribution( - cls, t_matrix, initial_state, final_state, initial_distrib, - min_power=1, max_power=12, max_n_lags=100, lag_time=1, dt=1.0, - clean_recycling=False, logscale=False + cls, + t_matrix, + initial_state, + final_state, + initial_distrib, + min_power=1, + max_power=12, + max_n_lags=100, + lag_time=1, + dt=1.0, + clean_recycling=False, + logscale=False, ): """Calculated distribution of first passage times from transition matrix @@ -515,7 +547,14 @@ def fpt_distribution( list_of_pdfs[istateIndex, 0] = tmatrix[ini_state[istateIndex], f_state] cls.calc_fmatrix( - Fmatrix, tmatrix, prevFmatrix, list_of_pdfs, lag_list, ini_state, istateIndex, f_state, + Fmatrix, + tmatrix, + prevFmatrix, + list_of_pdfs, + lag_list, + ini_state, + istateIndex, + f_state, ) # Nomalize the FPT distribution and output @@ -529,20 +568,222 @@ def fpt_distribution( # For logscale the dts at different t are different, we need to let FPT(t) # absorb them. Otherwise we have to use dt in variable size to calculate mean # value such as integration of t*dt*FPT(t). - dens_list = [[0, 0]] + [[lag_list[0]*dt2, density[0]*lag_list[0]/dt2]] + dens_list = [[0, 0]] + [[lag_list[0] * dt2, density[0] * lag_list[0] / dt2]] for i in range(1, len(lag_list)): - dens_list += [[lag_list[i]*dt2, density[i]*(lag_list[i]-lag_list[i-1])/dt2]] + dens_list += [ + [ + lag_list[i] * dt2, + density[i] * (lag_list[i] - lag_list[i - 1]) / dt2, + ] + ] density_vs_t = np.array(dens_list) else: - density_vs_t = np.array([[0, 0]] + - [[(i+1) * dt2, dens / dt2] for i, dens in zip(lag_list, density)]) + density_vs_t = np.array( + [[0, 0]] + + [[(i + 1) * dt2, dens / dt2] for i, dens in zip(lag_list, density)] + ) # normalized to 1 density_vs_t[:, 1] /= sum(density_vs_t[:, 1]) return density_vs_t + @staticmethod + def adaptive_fpt_distribution( + Tmatrix, + initial_states, + initial_state_probs, + target_states, + tau=1, + increment=5, + fine_increment=1.2, + relevant_thresh=1e-4, + max_steps=int(1e6), + max_time=np.inf, + explicit_renormalization=False, + verbose=False, + ): + """ + Adaptively computes a first-passage time distribution. + + Starting at t=tau, compute the probability flowing into the target at t. + Then, increment t by multiplying it by the coarse increment. + When relevant_thresh probability has entered the target state, step back to the previous coarse state, and + swap over to incrementing with the fine increment. + This allows you to efficiently sweep log-space. + + Procedurally, this starts probability in specified `initial_states` according to `initial_state_probs`, and then + propagates that probability through the transition matrix. + The FPT distribution is measured by tracking new probability entering the target state at each time. + + Note that absorbing boundary conditions are stripped from the transition matrix -- if this is not done, then + the result is like a probability CDF, not a probability distribution. + + Parameters + ---------- + Tmatrix: array-like + Transition matrix + + initial_states: array-like of ints + List of initial states to start probability in + + initial_state_probs: array-like + Probability distribution across the initial states. + + target_states: array-like + Target states for MFPT. + + tau + increment: float + Multiplicative increment for coarse steps + fine_increment: float + Multiplicative increment for fine steps, once the minimum probability in the target has been reached. + relevant_thresh: float + Amount of probability that must be in the target before switching to fine increments. + max_steps: int + Maximum number of steps to run + max_time: float + Maximum time to run to + explicit_renormalization: bool + Whether to explicitly renormalize the transition matrix. This should not be necessary -- if it is, there's + probably some numerical instability you should be careful of. + verbose: bool + Produce verbose text output. + + Returns + ------- + FPT distribution, + probability distribution at each time, + last step index, + times at which FPT distribution was evaluated + """ + + n_states = len(Tmatrix) + + all_probabilities = np.full(shape=(max_steps + 1, n_states), fill_value=np.nan) + + # The initial probability vector is zero except in the origin states, + # which have their relative probabilities + initial_probability = np.zeros(n_states) + initial_probability[initial_states] = initial_state_probs + initial_probability /= sum(initial_probability) + + all_probabilities[0] = initial_probability + + # Make the target states absorbing + non_recycling_matrix = Tmatrix.copy() + non_recycling_matrix[target_states, :] = 0.0 + for target in target_states: + non_recycling_matrix[target, target] = 1.0 + + # Track the probability that flowed into the target at each time + probs = np.zeros(shape=max_steps) + probs[0] = 0.0 + + # At each one of our timesteps, track the amount of flux that entered the target + last_step = 1 + + get_next_step = lambda x: x * increment + in_relevant_region = False + + steps = [1] + + with tqdm.tqdm(total=1) as pbar: + + for i in range(max_steps - 1): + + this_step = int(get_next_step(last_step)) + if this_step <= last_step: + this_step = int(last_step + 1) + + matrix_next = np.linalg.matrix_power(non_recycling_matrix, this_step) + + if explicit_renormalization: + matrix_next = matrix_next / np.sum(matrix_next, axis=1) + + probability = initial_probability @ matrix_next + + if explicit_renormalization: + probability /= sum(probability) + + # Check if we're just starting to get any probability + if ( + i > 0 + and not in_relevant_region + and (sum(probability[target_states]) - sum(probs[: i + 1])) + > relevant_thresh + ): + if verbose: + print( + f"*** Entered relevant region at step {this_step}. " + f"Swapping to fine-grained, and taking a step back to {this_step / increment}." + ) + # If so, then change our increment to finer resolution + # TODO: Would be cool to do something like as the probability increases, + # continue scaling down to some minimum increment + in_relevant_region = True + this_step /= increment + + steps.append(this_step) + all_probabilities[i + 1] = all_probabilities[i] + probs[i + 1] = probs[i] + + get_next_step = lambda x: x * fine_increment + + if verbose: + print( + f"Current time is {this_step}, time step will be {get_next_step(this_step)}" + ) + + continue + + steps.append(this_step) + + all_probabilities[i + 1] = probability + + # The amount that flowed INTO the target is the probability that's flowed in since the last t + if i == 0: + # In the first iteration, all the probability into the target just got there + probs[i + 1] = sum(probability[target_states]) + else: + # After the first, it's the amount that's there now minus the total amount that entered up until now + probs[i + 1] = sum(probability[target_states]) - sum(probs[: i + 1]) + + pbar.update(probs[i + 1]) + + # Check if we're done (i.e., all our probability has flowed into the target, none left.) + if np.isclose(sum(probs), 1): + # if np.isclose(probs[i+1], 1): + print( + f"*** All probability reached the target at time {this_step}" + ) + break + + if this_step > max_time: + print( + "*** Max steps reached, before all probability flowed into target." + ) + break + + last_step = this_step + + print(f"Finished in {i} steps") + print( + f"By the last time, {sum(probs[:i])} probability has reached the target. (This should be 1!)" + ) + + times = np.array(steps, dtype=float) * float(tau) + return probs[: i + 2], all_probabilities[: i + 2], i, times + @classmethod def calc_fmatrix( - cls, Fmatrix, tmatrix, prevFmatrix, list_of_pdfs, lag_list, ini_state, istateIndex, f_state, + cls, + Fmatrix, + tmatrix, + prevFmatrix, + list_of_pdfs, + lag_list, + ini_state, + istateIndex, + f_state, ): # Calculate FPT distribution from a the recursive formula, Eq. 3 in the paper below: # E. Suarez, A. J. Pratt, L. T. Chong, D. M. Zuckerman, Protein Science 26, 67-78 (2016). @@ -551,9 +792,13 @@ def calc_fmatrix( if time_index == 0: tmatrix_new = np.linalg.matrix_power(tmatrix, time) else: - tmatrix_new = np.linalg.matrix_power(tmatrix, time-lag_list[time_index-1]) + tmatrix_new = np.linalg.matrix_power( + tmatrix, time - lag_list[time_index - 1] + ) Fmatrix = np.dot(tmatrix_new, prevFmatrix - np.diag(np.diag(prevFmatrix))) - list_of_pdfs[istateIndex, time_index] = Fmatrix[ini_state[istateIndex], f_state] + list_of_pdfs[istateIndex, time_index] = Fmatrix[ + ini_state[istateIndex], f_state + ] prevFmatrix = Fmatrix diff --git a/msm_we/msm_we.py b/msm_we/msm_we.py index 6954108..877b834 100644 --- a/msm_we/msm_we.py +++ b/msm_we/msm_we.py @@ -649,7 +649,7 @@ def initialize( log.warning("No tau provided, defaulting to 1.") tau = 1.0 - self.tau = tau + self.tau = float(tau) # This is really only used for nAtoms self.set_topology(refPDBfile) @@ -1277,17 +1277,22 @@ def do_block_validation( block_iterations[-1][-1] = block_iterations[-1][-1] - 1 # Get the iterations corresponding to each group - validation_iterations = [ - range( - start_idx + 1, - cross_validation_blocks, - cross_validation_blocks // cross_validation_groups, - ) + group_blocks = [ + range(start_idx, cross_validation_blocks, cross_validation_groups,) for start_idx in range(cross_validation_groups) ] + validation_iterations = [] + for group in range(cross_validation_groups): + group_iterations = [] + + for block in group_blocks[group]: + group_iterations.extend(range(*block_iterations[block])) + + validation_iterations.append(group_iterations) + # You're looking at this massive try block and judging me -- but don't worry. # The purpose of this is just to catch ANY error, and preface it with an explicit heads-up that it's coming # from the block validation. This is useful because errors may crop up only in the block-validation, and it @@ -1332,6 +1337,7 @@ def do_block_validation( raise e # Store the validation models, in case you want to analyze them. + self.validation_iterations = validation_iterations self.validation_models = validation_models def load_iter_data(self, n_iter: int): @@ -2957,6 +2963,7 @@ def cluster_coordinates( streaming=streaming, first_cluster_iter=first_cluster_iter, use_ray=use_ray, + iters_to_use=iters_to_use, **_cluster_args, ) @@ -5688,6 +5695,81 @@ def get_flux_committor(self): self.Jq = J.squeeze() / self.tau # sys.stdout.write("%s " % i) + def plot_flux_committor_pcoordcolor( + self, nwin=1, ax=None, pcoord_to_use=0, **_plot_args, + ): + + _models = [self] + _model_labels = ["main_model"] + + plot_args = { + "linewidth": 2, + "s": 50, + "marker": ">", + "cmap": plt.cm.rainbow.reversed(), + "alpha": 0.7, + } + + plot_args.update(_plot_args) + + if ax is None: + fig = plt.figure(figsize=(10, 4)) + ax = fig.add_subplot(111) + + for i, (_model, _label) in enumerate(zip(_models[::-1], _model_labels[::-1])): + + if _model is None: + continue + + if not hasattr(_model, "q"): + log.warning( + f"Committors have not yet been generated for {_label}, generating now." + ) + _model.get_committor() + + if not hasattr(_model, "Jq"): + log.warning( + f"Committor-fluxes have not yet been generated for {_label}, generating now." + ) + _model.get_flux_committor() + + n_bins = _model.targetRMSD_centers.shape[0] + Jq_avg = _model.Jq.copy() + Jq_std = np.zeros_like(Jq_avg) + + q_avg = np.zeros_like(Jq_avg) + + indq = np.argsort(np.squeeze(1.0 - _model.q)) + for _i in range(n_bins - 1, nwin - 1, -1): + iav = _i - nwin + ind = range(_i - nwin, _i) + Jq_avg[iav] = np.mean(_model.Jq[ind]) + Jq_std[iav] = np.std(_model.Jq[ind]) + q_avg[iav] = np.mean(_model.q[indq[ind]]) + + indPlus = np.where(Jq_avg > 0.0) + + lines = ax.scatter( + q_avg[indPlus], + np.squeeze(Jq_avg[indPlus]), + c=_model.targetRMSD_centers[indPlus, pcoord_to_use], + label=f"{_label} flux toward target", + **plot_args, + ) + + print("Plotting committor") + ax.figure.colorbar(lines, label=f"Progress Coordinate {pcoord_to_use}") + + ax.set_xlim([-0.1, 1.1]) + + ax.set_title("Full-data model") + ax.set_yscale("log") + ax.set_xlabel("Pseudocommittor") + ax.set_ylabel("Flux (weight/second)") + self.print_pseudocommittor_warning() + + return ax, lines + def plot_flux_committor( self, nwin=1, @@ -5736,6 +5818,9 @@ def plot_flux_committor( for i, (_model, _label) in enumerate(zip(_models, _model_labels)): + if _model is None: + continue + if not hasattr(_model, "q"): log.warning( f"Committors have not yet been generated for {_label}, generating now." @@ -5794,9 +5879,10 @@ def plot_flux_committor( ) ax.set_yscale("log") - ax.set_xscale("log") + ax.set_xscale("linear") + ax.set_xlim([-0.1, 1.1]) ax.set_xlabel("Pseudocommittor") - ax.set_ylabel("Flux (weight/second") + ax.set_ylabel("Flux (weight/second)") self.print_pseudocommittor_warning() if own_ax: @@ -5872,7 +5958,7 @@ def plot_flux( # Draw the basis/target boundaries in this pcoord [ ax.axvline( - bound, color="r", linestyle="--", label=["", "Target bound"][i == 0] + bound, color="r", linestyle="--", label=["", "Target boundary"][i == 0] ) for i, bound in enumerate(self.target_pcoord_bounds[pcoord_to_use, :]) ] @@ -5881,13 +5967,16 @@ def plot_flux( bound, color="b", linestyle="--", - label=["", "Basis/Source bound"][i == 0], + label=["", "Basis/Source boundary"][i == 0], ) for i, bound in enumerate(self.basis_pcoord_bounds[pcoord_to_use, :]) ] for i, (_model, _label) in enumerate(zip(_models, _model_labels)): + if _model is None: + continue + if not hasattr(_model, "J"): log.warning( f"Fluxes have not yet been generated for {_label}, generating now." @@ -5928,7 +6017,7 @@ def plot_flux( ax.set_yscale("log") ax.set_xlabel(f"Pcoord {pcoord_to_use}") - ax.set_ylabel("Flux (weight/second") + ax.set_ylabel("Flux (weight/second)") if own_ax: ax.legend(bbox_to_anchor=(1.01, 1.0), loc="upper left") diff --git a/setup.py b/setup.py index ba7ce53..0eccd73 100644 --- a/setup.py +++ b/setup.py @@ -10,9 +10,8 @@ with open("HISTORY.rst") as history_file: history = history_file.read() -# Ray must be installed through pip, not conda # WESTPA-2.0 with westpa.analysis is not yet in Conda -requirements = ["ray", "westpa>=v2.0b5"] +requirements = ["westpa>=v2.0.dev1"] setup_requirements = [ "pytest-runner", From 53111857f8709f28c18d3592709766d47f51cdbc Mon Sep 17 00:00:00 2001 From: She Zhang Date: Fri, 4 Nov 2022 12:06:59 -0600 Subject: [PATCH 2/6] first attempt to implement ray actors --- msm_we/_hamsm/_clustering.py | 270 ++++++++++++++++++----------------- 1 file changed, 137 insertions(+), 133 deletions(-) diff --git a/msm_we/_hamsm/_clustering.py b/msm_we/_hamsm/_clustering.py index bcdbe6f..616a69b 100644 --- a/msm_we/_hamsm/_clustering.py +++ b/msm_we/_hamsm/_clustering.py @@ -22,6 +22,131 @@ SUPPORTED_MAPPERS = {RectilinearBinMapper, VoronoiBinMapper} +@ray.remote +class ClusteringActor: + def __init__(self, model, kmeans_model, iteration, processCoordinates): + self.model = model + self.kmeans_model = kmeans_model + self.iteration = iteration + self.processCoordinates = processCoordinates + + def discretize(self): + model = self.model + kmeans_model = self.kmeans_model + iteration = self.iteration + processCoordinates = self.processCoordinates + # model_id, kmeans_model_id, iteration, processCoordinates_id = arg + + # self = ray.get(model_id) + # kmeans_model = ray.get(kmeans_model_id) + # processCoordinates = ray.get(processCoordinates_id) + # self = model + + # Need to do this so the model's transformation array is writable -- otherwise predict chokes + # with 'buffer source array is read-only'. + kmeans_model = deepcopy(kmeans_model) + + iter_coords = model.get_iter_coordinates(iteration) + + # If there are no coords for this iteration, return None + if iter_coords.shape[0] == 0: + return None, 0, iteration + + # Otherwise, apply the k-means model and discretize + transformed_coords = model.coordinates.transform( + processCoordinates(iter_coords) + ) + dtrajs = kmeans_model.predict(transformed_coords) + + return dtrajs, 1, iteration + + def stratified_discretize(self): + model = self.model + kmeans_model = self.kmeans_model + iteration = self.iteration + processCoordinates = self.processCoordinates + # model_id, kmeans_model_id, iteration, processCoordinates_id = arg + + # import sys + # import westpa.core.binning + + # sys.modules["westpa.binning"] = sys.modules["westpa.core.binning"] + # This is silly -- I need to import westpa.core.binning so it's loaded into sys.modules but the linter + # complains that it's unused... so, use it. + # log.debug(f"Loaded {westpa.core.binning}") + + # self = ray.get(model_id) + # kmeans_model = ray.get(kmeans_model_id) + # processCoordinates = ray.get(processCoordinates_id) + self = model + + # Need to do this so the model's transformation array is writable -- otherwise predict chokes + # with 'buffer source array is read-only'. + kmeans_model = deepcopy(kmeans_model) + kmeans_model.model = self + + # for i, cluster_model in enumerate(kmeans_model.cluster_models): + # print(f"Model {i}: \t ", end=" ") + # try: + # print(cluster_model.cluster_centers_) + # except AttributeError: + # print("No cluster centers!") + + # iter_coords = self.get_iter_coordinates(iteration) + + kmeans_model.model.load_iter_data(iteration) + kmeans_model.model.get_transition_data_lag0() + # print(f"After loading coordPairList in iter {iteration}, shape is {kmeans_model.model.coordPairList.shape}") + parent_coords, child_coords = ( + kmeans_model.model.coordPairList[..., 0], + self.coordPairList[..., 1], + ) + + # If there are no coords for this iteration, return None + if child_coords.shape[0] == 0: + return None, 0, iteration + + # Otherwise, apply the k-means model and discretize + transformed_parent_coords = kmeans_model.model.coordinates.transform( + processCoordinates(parent_coords) + ) + transformed_child_coords = kmeans_model.model.coordinates.transform( + processCoordinates(child_coords) + ) + + try: + kmeans_model.processing_from = True + try: + parent_dtrajs = kmeans_model.predict(transformed_parent_coords) + except IndexError as e: + + print("Problem ===== ") + print( + f"Parent pcoords are shape {kmeans_model.model.pcoord0List.shape}" + ) + print(f"Parent coords are shape {transformed_parent_coords.shape}") + print(f"Child pcoords are shape {kmeans_model.model.pcoord1List.shape}") + print(f"Child coords are shape {transformed_child_coords.shape}") + print("===== ") + + raise e + + kmeans_model.processing_from = False + child_dtrajs = kmeans_model.predict(transformed_child_coords) + except AttributeError as e: + log.error("Cluster center was not initialized and not remapped") + log.error(kmeans_model.we_remap) + raise e + # TODO: Remap to nearest visited + + return ( + (parent_dtrajs, child_dtrajs), + 1, + iteration, + kmeans_model.target_bins, + kmeans_model.basis_bins, + ) + class ClusteringMixin: n_clusters = None clusters = None @@ -108,34 +233,9 @@ def do_discretization(self: "modelWE", arg): return dtrajs, used_iters @ray.remote - def do_ray_discretization( - model: "modelWE", kmeans_model, iteration, processCoordinates - ): + def do_ray_discretization(actor: ClusteringActor): + return ray.get(actor.discretize.remote()) - # model_id, kmeans_model_id, iteration, processCoordinates_id = arg - - # self = ray.get(model_id) - # kmeans_model = ray.get(kmeans_model_id) - # processCoordinates = ray.get(processCoordinates_id) - # self = model - - # Need to do this so the model's transformation array is writable -- otherwise predict chokes - # with 'buffer source array is read-only'. - kmeans_model = deepcopy(kmeans_model) - - iter_coords = model.get_iter_coordinates(iteration) - - # If there are no coords for this iteration, return None - if iter_coords.shape[0] == 0: - return None, 0, iteration - - # Otherwise, apply the k-means model and discretize - transformed_coords = model.coordinates.transform( - processCoordinates(iter_coords) - ) - dtrajs = kmeans_model.predict(transformed_coords) - - return dtrajs, 1, iteration def cluster_coordinates( self: "modelWE", @@ -415,10 +515,9 @@ def cluster_aggregated( # Submit all the discretization tasks to the cluster task_ids = [] - model_id = ray.put(self) - cluster_model_id = ray.put(cluster_model) - process_coordinates_id = ray.put(self.processCoordinates) - + cluster_actor = ClusteringActor.remote( + self, cluster_model, iteration, self.processCoordinates + ) # max_inflight = 50 for iteration in tqdm.tqdm( range(1, self.maxIter), desc="Submitting discretization tasks" @@ -429,9 +528,7 @@ def cluster_aggregated( # num_ready = iteration - max_inflight # ray.wait(task_ids, num_returns=num_ready) - _id = self.do_ray_discretization.remote( - model_id, cluster_model_id, iteration, process_coordinates_id - ) + _id = self.do_ray_discretization.remote(cluster_actor) task_ids.append(_id) # As they're completed, add them to dtrajs @@ -1157,28 +1254,20 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): else: log.debug("Using cached model for discretization") - model_id = ray.put(self.pre_discretization_model) - clusters = deepcopy(self.clusters) # It's set inside do_stratified_ray_discretization, though I could do it in either place. clusters.model = None # self.pre_discretization_model - cluster_model_id = ray.put(clusters) - - process_coordinates_id = ray.put(self.processCoordinates) + cluster_actor = ClusteringActor.remote( + self, clusters, iteration, self.processCoordinates + ) # max_inflight = 50 with ProgressBar(progress_bar) as progress_bar: submit_task = progress_bar.add_task( description="Submitting discretization tasks", total=self.maxIter - 1 ) for iteration in range(1, self.maxIter): - _id = self.do_stratified_ray_discretization.remote( - model_id, - cluster_model_id, - iteration, - process_coordinates_id - # self, self.clusters, iteration, self.processCoordinates - ) + _id = self.do_stratified_ray_discretization.remote(cluster_actor) task_ids.append(_id) progress_bar.update(submit_task, advance=1) @@ -1222,8 +1311,6 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): del results del finished - del model_id - del cluster_model_id # Remove all empty elements from dtrajs and assign to self.dtrajs self.dtrajs = [dtraj for dtraj in dtrajs if dtraj is not None] @@ -1233,91 +1320,8 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): log.debug("Discretization complete") @ray.remote - def do_stratified_ray_discretization( - model: "modelWE", kmeans_model, iteration, processCoordinates - ): - - # model_id, kmeans_model_id, iteration, processCoordinates_id = arg - - # import sys - # import westpa.core.binning - - # sys.modules["westpa.binning"] = sys.modules["westpa.core.binning"] - # This is silly -- I need to import westpa.core.binning so it's loaded into sys.modules but the linter - # complains that it's unused... so, use it. - # log.debug(f"Loaded {westpa.core.binning}") - - # self = ray.get(model_id) - # kmeans_model = ray.get(kmeans_model_id) - # processCoordinates = ray.get(processCoordinates_id) - self = model - - # Need to do this so the model's transformation array is writable -- otherwise predict chokes - # with 'buffer source array is read-only'. - kmeans_model = deepcopy(kmeans_model) - kmeans_model.model = self - - # for i, cluster_model in enumerate(kmeans_model.cluster_models): - # print(f"Model {i}: \t ", end=" ") - # try: - # print(cluster_model.cluster_centers_) - # except AttributeError: - # print("No cluster centers!") - - # iter_coords = self.get_iter_coordinates(iteration) - - kmeans_model.model.load_iter_data(iteration) - kmeans_model.model.get_transition_data_lag0() - # print(f"After loading coordPairList in iter {iteration}, shape is {kmeans_model.model.coordPairList.shape}") - parent_coords, child_coords = ( - kmeans_model.model.coordPairList[..., 0], - self.coordPairList[..., 1], - ) - - # If there are no coords for this iteration, return None - if child_coords.shape[0] == 0: - return None, 0, iteration - - # Otherwise, apply the k-means model and discretize - transformed_parent_coords = kmeans_model.model.coordinates.transform( - processCoordinates(parent_coords) - ) - transformed_child_coords = kmeans_model.model.coordinates.transform( - processCoordinates(child_coords) - ) - - try: - kmeans_model.processing_from = True - try: - parent_dtrajs = kmeans_model.predict(transformed_parent_coords) - except IndexError as e: - - print("Problem ===== ") - print( - f"Parent pcoords are shape {kmeans_model.model.pcoord0List.shape}" - ) - print(f"Parent coords are shape {transformed_parent_coords.shape}") - print(f"Child pcoords are shape {kmeans_model.model.pcoord1List.shape}") - print(f"Child coords are shape {transformed_child_coords.shape}") - print("===== ") - - raise e - - kmeans_model.processing_from = False - child_dtrajs = kmeans_model.predict(transformed_child_coords) - except AttributeError as e: - log.error("Cluster center was not initialized and not remapped") - log.error(kmeans_model.we_remap) - raise e - # TODO: Remap to nearest visited - - return ( - (parent_dtrajs, child_dtrajs), - 1, - iteration, - kmeans_model.target_bins, - kmeans_model.basis_bins, - ) + def do_stratified_ray_discretization(actor: ClusteringActor): + return ray.get(actor.stratified_discretize.remote()) @staticmethod def find_nearest_bin(bin_mapper, bin_idx, filled_bins): From a68dd0773e059d87d9d7545ef0274d7bcbec5e7d Mon Sep 17 00:00:00 2001 From: She Zhang Date: Fri, 4 Nov 2022 14:37:04 -0600 Subject: [PATCH 3/6] minor assertion bug fix --- msm_we/_hamsm/_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/msm_we/_hamsm/_data.py b/msm_we/_hamsm/_data.py index a376f19..4e9c3f9 100644 --- a/msm_we/_hamsm/_data.py +++ b/msm_we/_hamsm/_data.py @@ -609,7 +609,7 @@ def load_iter_coordinates(self: "modelWE"): f"Attempting to obtain coordinates from west_file {west_file}, iteration {self.n_iter}" ) # TODO: This should probably generically be -1, not 1, to deal with variable-length augmentation. - assert cur_iter_coords.shape[1] > 1, ( + assert coords.shape[1] > 1, ( "Augmented coords only have 1 point in them -- " "need at least start & end for transitions" ) From 05ba702bdc06f3a1af5d63c1120cc0ee913bb0a0 Mon Sep 17 00:00:00 2001 From: She Zhang Date: Fri, 4 Nov 2022 16:09:44 -0600 Subject: [PATCH 4/6] bug fixes --- msm_we/_hamsm/_clustering.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/msm_we/_hamsm/_clustering.py b/msm_we/_hamsm/_clustering.py index 616a69b..cf9af85 100644 --- a/msm_we/_hamsm/_clustering.py +++ b/msm_we/_hamsm/_clustering.py @@ -24,16 +24,14 @@ @ray.remote class ClusteringActor: - def __init__(self, model, kmeans_model, iteration, processCoordinates): + def __init__(self, model, kmeans_model, processCoordinates): self.model = model self.kmeans_model = kmeans_model - self.iteration = iteration self.processCoordinates = processCoordinates - def discretize(self): + def discretize(self, iteration): model = self.model kmeans_model = self.kmeans_model - iteration = self.iteration processCoordinates = self.processCoordinates # model_id, kmeans_model_id, iteration, processCoordinates_id = arg @@ -60,10 +58,9 @@ def discretize(self): return dtrajs, 1, iteration - def stratified_discretize(self): + def stratified_discretize(self, iteration): model = self.model kmeans_model = self.kmeans_model - iteration = self.iteration processCoordinates = self.processCoordinates # model_id, kmeans_model_id, iteration, processCoordinates_id = arg @@ -233,8 +230,8 @@ def do_discretization(self: "modelWE", arg): return dtrajs, used_iters @ray.remote - def do_ray_discretization(actor: ClusteringActor): - return ray.get(actor.discretize.remote()) + def do_ray_discretization(actor: ClusteringActor, iteration): + return ray.get(actor.discretize.remote(iteration)) def cluster_coordinates( @@ -516,7 +513,7 @@ def cluster_aggregated( task_ids = [] cluster_actor = ClusteringActor.remote( - self, cluster_model, iteration, self.processCoordinates + self, cluster_model, self.processCoordinates ) # max_inflight = 50 for iteration in tqdm.tqdm( @@ -528,7 +525,7 @@ def cluster_aggregated( # num_ready = iteration - max_inflight # ray.wait(task_ids, num_returns=num_ready) - _id = self.do_ray_discretization.remote(cluster_actor) + _id = self.do_ray_discretization.remote(cluster_actor, iteration) task_ids.append(_id) # As they're completed, add them to dtrajs @@ -1259,7 +1256,7 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): clusters.model = None # self.pre_discretization_model cluster_actor = ClusteringActor.remote( - self, clusters, iteration, self.processCoordinates + self, clusters, self.processCoordinates ) # max_inflight = 50 with ProgressBar(progress_bar) as progress_bar: @@ -1267,7 +1264,7 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): description="Submitting discretization tasks", total=self.maxIter - 1 ) for iteration in range(1, self.maxIter): - _id = self.do_stratified_ray_discretization.remote(cluster_actor) + _id = self.do_stratified_ray_discretization.remote(cluster_actor, iteration) task_ids.append(_id) progress_bar.update(submit_task, advance=1) @@ -1320,8 +1317,8 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): log.debug("Discretization complete") @ray.remote - def do_stratified_ray_discretization(actor: ClusteringActor): - return ray.get(actor.stratified_discretize.remote()) + def do_stratified_ray_discretization(actor: ClusteringActor, iteration): + return ray.get(actor.stratified_discretize.remote(iteration)) @staticmethod def find_nearest_bin(bin_mapper, bin_idx, filled_bins): From 1e8bf70ce54ebb57f5c5ad49698b961405876798 Mon Sep 17 00:00:00 2001 From: She Zhang Date: Tue, 8 Nov 2022 11:49:26 -0700 Subject: [PATCH 5/6] create more actors --- msm_we/_hamsm/_clustering.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/msm_we/_hamsm/_clustering.py b/msm_we/_hamsm/_clustering.py index cf9af85..e8a89da 100644 --- a/msm_we/_hamsm/_clustering.py +++ b/msm_we/_hamsm/_clustering.py @@ -511,10 +511,12 @@ def cluster_aggregated( # Submit all the discretization tasks to the cluster task_ids = [] + n_actors = int(ray.available_resources().get("CPU", 1)) - cluster_actor = ClusteringActor.remote( + cluster_actors = [ClusteringActor.remote( self, cluster_model, self.processCoordinates - ) + ) for _ in range(n_actors)] + # max_inflight = 50 for iteration in tqdm.tqdm( range(1, self.maxIter), desc="Submitting discretization tasks" @@ -525,6 +527,7 @@ def cluster_aggregated( # num_ready = iteration - max_inflight # ray.wait(task_ids, num_returns=num_ready) + cluster_actor = cluster_actors[(iteration - 1) % n_actors] _id = self.do_ray_discretization.remote(cluster_actor, iteration) task_ids.append(_id) @@ -1240,6 +1243,7 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): """ self.check_connect_ray() + n_actors = int(ray.available_resources().get("CPU", 1)) self.dtrajs = [] @@ -1255,15 +1259,16 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): # It's set inside do_stratified_ray_discretization, though I could do it in either place. clusters.model = None # self.pre_discretization_model - cluster_actor = ClusteringActor.remote( + cluster_actors = [ClusteringActor.remote( self, clusters, self.processCoordinates - ) + ) for _ in range(n_actors)] # max_inflight = 50 with ProgressBar(progress_bar) as progress_bar: submit_task = progress_bar.add_task( description="Submitting discretization tasks", total=self.maxIter - 1 ) for iteration in range(1, self.maxIter): + cluster_actor = cluster_actors[(iteration - 1) % n_actors] _id = self.do_stratified_ray_discretization.remote(cluster_actor, iteration) task_ids.append(_id) progress_bar.update(submit_task, advance=1) From 4a74bc45951d7dce9b809cb4f6fb842bcd579a0b Mon Sep 17 00:00:00 2001 From: She Zhang Date: Tue, 8 Nov 2022 15:42:37 -0700 Subject: [PATCH 6/6] version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b607074..0fa8069 100644 --- a/setup.py +++ b/setup.py @@ -65,6 +65,6 @@ tests_require=test_requirements, extras_require=EXTRAS_REQUIRE, url="https://github.com/jdrusso/msm_we", - version="0.1.27", + version="0.1.28a1", zip_safe=False, )