diff --git a/msm_we/_hamsm/_clustering.py b/msm_we/_hamsm/_clustering.py index bcdbe6f..e8a89da 100644 --- a/msm_we/_hamsm/_clustering.py +++ b/msm_we/_hamsm/_clustering.py @@ -22,6 +22,128 @@ SUPPORTED_MAPPERS = {RectilinearBinMapper, VoronoiBinMapper} +@ray.remote +class ClusteringActor: + def __init__(self, model, kmeans_model, processCoordinates): + self.model = model + self.kmeans_model = kmeans_model + self.processCoordinates = processCoordinates + + def discretize(self, iteration): + model = self.model + kmeans_model = self.kmeans_model + processCoordinates = self.processCoordinates + # model_id, kmeans_model_id, iteration, processCoordinates_id = arg + + # self = ray.get(model_id) + # kmeans_model = ray.get(kmeans_model_id) + # processCoordinates = ray.get(processCoordinates_id) + # self = model + + # Need to do this so the model's transformation array is writable -- otherwise predict chokes + # with 'buffer source array is read-only'. + kmeans_model = deepcopy(kmeans_model) + + iter_coords = model.get_iter_coordinates(iteration) + + # If there are no coords for this iteration, return None + if iter_coords.shape[0] == 0: + return None, 0, iteration + + # Otherwise, apply the k-means model and discretize + transformed_coords = model.coordinates.transform( + processCoordinates(iter_coords) + ) + dtrajs = kmeans_model.predict(transformed_coords) + + return dtrajs, 1, iteration + + def stratified_discretize(self, iteration): + model = self.model + kmeans_model = self.kmeans_model + processCoordinates = self.processCoordinates + # model_id, kmeans_model_id, iteration, processCoordinates_id = arg + + # import sys + # import westpa.core.binning + + # sys.modules["westpa.binning"] = sys.modules["westpa.core.binning"] + # This is silly -- I need to import westpa.core.binning so it's loaded into sys.modules but the linter + # complains that it's unused... so, use it. + # log.debug(f"Loaded {westpa.core.binning}") + + # self = ray.get(model_id) + # kmeans_model = ray.get(kmeans_model_id) + # processCoordinates = ray.get(processCoordinates_id) + self = model + + # Need to do this so the model's transformation array is writable -- otherwise predict chokes + # with 'buffer source array is read-only'. + kmeans_model = deepcopy(kmeans_model) + kmeans_model.model = self + + # for i, cluster_model in enumerate(kmeans_model.cluster_models): + # print(f"Model {i}: \t ", end=" ") + # try: + # print(cluster_model.cluster_centers_) + # except AttributeError: + # print("No cluster centers!") + + # iter_coords = self.get_iter_coordinates(iteration) + + kmeans_model.model.load_iter_data(iteration) + kmeans_model.model.get_transition_data_lag0() + # print(f"After loading coordPairList in iter {iteration}, shape is {kmeans_model.model.coordPairList.shape}") + parent_coords, child_coords = ( + kmeans_model.model.coordPairList[..., 0], + self.coordPairList[..., 1], + ) + + # If there are no coords for this iteration, return None + if child_coords.shape[0] == 0: + return None, 0, iteration + + # Otherwise, apply the k-means model and discretize + transformed_parent_coords = kmeans_model.model.coordinates.transform( + processCoordinates(parent_coords) + ) + transformed_child_coords = kmeans_model.model.coordinates.transform( + processCoordinates(child_coords) + ) + + try: + kmeans_model.processing_from = True + try: + parent_dtrajs = kmeans_model.predict(transformed_parent_coords) + except IndexError as e: + + print("Problem ===== ") + print( + f"Parent pcoords are shape {kmeans_model.model.pcoord0List.shape}" + ) + print(f"Parent coords are shape {transformed_parent_coords.shape}") + print(f"Child pcoords are shape {kmeans_model.model.pcoord1List.shape}") + print(f"Child coords are shape {transformed_child_coords.shape}") + print("===== ") + + raise e + + kmeans_model.processing_from = False + child_dtrajs = kmeans_model.predict(transformed_child_coords) + except AttributeError as e: + log.error("Cluster center was not initialized and not remapped") + log.error(kmeans_model.we_remap) + raise e + # TODO: Remap to nearest visited + + return ( + (parent_dtrajs, child_dtrajs), + 1, + iteration, + kmeans_model.target_bins, + kmeans_model.basis_bins, + ) + class ClusteringMixin: n_clusters = None clusters = None @@ -108,34 +230,9 @@ def do_discretization(self: "modelWE", arg): return dtrajs, used_iters @ray.remote - def do_ray_discretization( - model: "modelWE", kmeans_model, iteration, processCoordinates - ): - - # model_id, kmeans_model_id, iteration, processCoordinates_id = arg + def do_ray_discretization(actor: ClusteringActor, iteration): + return ray.get(actor.discretize.remote(iteration)) - # self = ray.get(model_id) - # kmeans_model = ray.get(kmeans_model_id) - # processCoordinates = ray.get(processCoordinates_id) - # self = model - - # Need to do this so the model's transformation array is writable -- otherwise predict chokes - # with 'buffer source array is read-only'. - kmeans_model = deepcopy(kmeans_model) - - iter_coords = model.get_iter_coordinates(iteration) - - # If there are no coords for this iteration, return None - if iter_coords.shape[0] == 0: - return None, 0, iteration - - # Otherwise, apply the k-means model and discretize - transformed_coords = model.coordinates.transform( - processCoordinates(iter_coords) - ) - dtrajs = kmeans_model.predict(transformed_coords) - - return dtrajs, 1, iteration def cluster_coordinates( self: "modelWE", @@ -414,10 +511,11 @@ def cluster_aggregated( # Submit all the discretization tasks to the cluster task_ids = [] + n_actors = int(ray.available_resources().get("CPU", 1)) - model_id = ray.put(self) - cluster_model_id = ray.put(cluster_model) - process_coordinates_id = ray.put(self.processCoordinates) + cluster_actors = [ClusteringActor.remote( + self, cluster_model, self.processCoordinates + ) for _ in range(n_actors)] # max_inflight = 50 for iteration in tqdm.tqdm( @@ -429,9 +527,8 @@ def cluster_aggregated( # num_ready = iteration - max_inflight # ray.wait(task_ids, num_returns=num_ready) - _id = self.do_ray_discretization.remote( - model_id, cluster_model_id, iteration, process_coordinates_id - ) + cluster_actor = cluster_actors[(iteration - 1) % n_actors] + _id = self.do_ray_discretization.remote(cluster_actor, iteration) task_ids.append(_id) # As they're completed, add them to dtrajs @@ -1146,6 +1243,7 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): """ self.check_connect_ray() + n_actors = int(ray.available_resources().get("CPU", 1)) self.dtrajs = [] @@ -1157,28 +1255,21 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): else: log.debug("Using cached model for discretization") - model_id = ray.put(self.pre_discretization_model) - clusters = deepcopy(self.clusters) # It's set inside do_stratified_ray_discretization, though I could do it in either place. clusters.model = None # self.pre_discretization_model - cluster_model_id = ray.put(clusters) - - process_coordinates_id = ray.put(self.processCoordinates) + cluster_actors = [ClusteringActor.remote( + self, clusters, self.processCoordinates + ) for _ in range(n_actors)] # max_inflight = 50 with ProgressBar(progress_bar) as progress_bar: submit_task = progress_bar.add_task( description="Submitting discretization tasks", total=self.maxIter - 1 ) for iteration in range(1, self.maxIter): - _id = self.do_stratified_ray_discretization.remote( - model_id, - cluster_model_id, - iteration, - process_coordinates_id - # self, self.clusters, iteration, self.processCoordinates - ) + cluster_actor = cluster_actors[(iteration - 1) % n_actors] + _id = self.do_stratified_ray_discretization.remote(cluster_actor, iteration) task_ids.append(_id) progress_bar.update(submit_task, advance=1) @@ -1222,8 +1313,6 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): del results del finished - del model_id - del cluster_model_id # Remove all empty elements from dtrajs and assign to self.dtrajs self.dtrajs = [dtraj for dtraj in dtrajs if dtraj is not None] @@ -1233,91 +1322,8 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None): log.debug("Discretization complete") @ray.remote - def do_stratified_ray_discretization( - model: "modelWE", kmeans_model, iteration, processCoordinates - ): - - # model_id, kmeans_model_id, iteration, processCoordinates_id = arg - - # import sys - # import westpa.core.binning - - # sys.modules["westpa.binning"] = sys.modules["westpa.core.binning"] - # This is silly -- I need to import westpa.core.binning so it's loaded into sys.modules but the linter - # complains that it's unused... so, use it. - # log.debug(f"Loaded {westpa.core.binning}") - - # self = ray.get(model_id) - # kmeans_model = ray.get(kmeans_model_id) - # processCoordinates = ray.get(processCoordinates_id) - self = model - - # Need to do this so the model's transformation array is writable -- otherwise predict chokes - # with 'buffer source array is read-only'. - kmeans_model = deepcopy(kmeans_model) - kmeans_model.model = self - - # for i, cluster_model in enumerate(kmeans_model.cluster_models): - # print(f"Model {i}: \t ", end=" ") - # try: - # print(cluster_model.cluster_centers_) - # except AttributeError: - # print("No cluster centers!") - - # iter_coords = self.get_iter_coordinates(iteration) - - kmeans_model.model.load_iter_data(iteration) - kmeans_model.model.get_transition_data_lag0() - # print(f"After loading coordPairList in iter {iteration}, shape is {kmeans_model.model.coordPairList.shape}") - parent_coords, child_coords = ( - kmeans_model.model.coordPairList[..., 0], - self.coordPairList[..., 1], - ) - - # If there are no coords for this iteration, return None - if child_coords.shape[0] == 0: - return None, 0, iteration - - # Otherwise, apply the k-means model and discretize - transformed_parent_coords = kmeans_model.model.coordinates.transform( - processCoordinates(parent_coords) - ) - transformed_child_coords = kmeans_model.model.coordinates.transform( - processCoordinates(child_coords) - ) - - try: - kmeans_model.processing_from = True - try: - parent_dtrajs = kmeans_model.predict(transformed_parent_coords) - except IndexError as e: - - print("Problem ===== ") - print( - f"Parent pcoords are shape {kmeans_model.model.pcoord0List.shape}" - ) - print(f"Parent coords are shape {transformed_parent_coords.shape}") - print(f"Child pcoords are shape {kmeans_model.model.pcoord1List.shape}") - print(f"Child coords are shape {transformed_child_coords.shape}") - print("===== ") - - raise e - - kmeans_model.processing_from = False - child_dtrajs = kmeans_model.predict(transformed_child_coords) - except AttributeError as e: - log.error("Cluster center was not initialized and not remapped") - log.error(kmeans_model.we_remap) - raise e - # TODO: Remap to nearest visited - - return ( - (parent_dtrajs, child_dtrajs), - 1, - iteration, - kmeans_model.target_bins, - kmeans_model.basis_bins, - ) + def do_stratified_ray_discretization(actor: ClusteringActor, iteration): + return ray.get(actor.stratified_discretize.remote(iteration)) @staticmethod def find_nearest_bin(bin_mapper, bin_idx, filled_bins): diff --git a/msm_we/_hamsm/_data.py b/msm_we/_hamsm/_data.py index a376f19..4e9c3f9 100644 --- a/msm_we/_hamsm/_data.py +++ b/msm_we/_hamsm/_data.py @@ -609,7 +609,7 @@ def load_iter_coordinates(self: "modelWE"): f"Attempting to obtain coordinates from west_file {west_file}, iteration {self.n_iter}" ) # TODO: This should probably generically be -1, not 1, to deal with variable-length augmentation. - assert cur_iter_coords.shape[1] > 1, ( + assert coords.shape[1] > 1, ( "Augmented coords only have 1 point in them -- " "need at least start & end for transitions" ) diff --git a/setup.py b/setup.py index b607074..0fa8069 100644 --- a/setup.py +++ b/setup.py @@ -65,6 +65,6 @@ tests_require=test_requirements, extras_require=EXTRAS_REQUIRE, url="https://github.com/jdrusso/msm_we", - version="0.1.27", + version="0.1.28a1", zip_safe=False, )