Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from Workers to Actors #40

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 138 additions & 132 deletions msm_we/_hamsm/_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,128 @@
SUPPORTED_MAPPERS = {RectilinearBinMapper, VoronoiBinMapper}


@ray.remote
class ClusteringActor:
def __init__(self, model, kmeans_model, processCoordinates):
self.model = model
self.kmeans_model = kmeans_model
self.processCoordinates = processCoordinates

def discretize(self, iteration):
model = self.model
kmeans_model = self.kmeans_model
processCoordinates = self.processCoordinates
# model_id, kmeans_model_id, iteration, processCoordinates_id = arg

# self = ray.get(model_id)
# kmeans_model = ray.get(kmeans_model_id)
# processCoordinates = ray.get(processCoordinates_id)
# self = model

# Need to do this so the model's transformation array is writable -- otherwise predict chokes
# with 'buffer source array is read-only'.
kmeans_model = deepcopy(kmeans_model)

iter_coords = model.get_iter_coordinates(iteration)

# If there are no coords for this iteration, return None
if iter_coords.shape[0] == 0:
return None, 0, iteration

# Otherwise, apply the k-means model and discretize
transformed_coords = model.coordinates.transform(
processCoordinates(iter_coords)
)
dtrajs = kmeans_model.predict(transformed_coords)

return dtrajs, 1, iteration

def stratified_discretize(self, iteration):
model = self.model
kmeans_model = self.kmeans_model
processCoordinates = self.processCoordinates
# model_id, kmeans_model_id, iteration, processCoordinates_id = arg

# import sys
# import westpa.core.binning

# sys.modules["westpa.binning"] = sys.modules["westpa.core.binning"]
# This is silly -- I need to import westpa.core.binning so it's loaded into sys.modules but the linter
# complains that it's unused... so, use it.
# log.debug(f"Loaded {westpa.core.binning}")

# self = ray.get(model_id)
# kmeans_model = ray.get(kmeans_model_id)
# processCoordinates = ray.get(processCoordinates_id)
self = model

# Need to do this so the model's transformation array is writable -- otherwise predict chokes
# with 'buffer source array is read-only'.
kmeans_model = deepcopy(kmeans_model)
kmeans_model.model = self

# for i, cluster_model in enumerate(kmeans_model.cluster_models):
# print(f"Model {i}: \t ", end=" ")
# try:
# print(cluster_model.cluster_centers_)
# except AttributeError:
# print("No cluster centers!")

# iter_coords = self.get_iter_coordinates(iteration)

kmeans_model.model.load_iter_data(iteration)
kmeans_model.model.get_transition_data_lag0()
# print(f"After loading coordPairList in iter {iteration}, shape is {kmeans_model.model.coordPairList.shape}")
parent_coords, child_coords = (
kmeans_model.model.coordPairList[..., 0],
self.coordPairList[..., 1],
)

# If there are no coords for this iteration, return None
if child_coords.shape[0] == 0:
return None, 0, iteration

# Otherwise, apply the k-means model and discretize
transformed_parent_coords = kmeans_model.model.coordinates.transform(
processCoordinates(parent_coords)
)
transformed_child_coords = kmeans_model.model.coordinates.transform(
processCoordinates(child_coords)
)

try:
kmeans_model.processing_from = True
try:
parent_dtrajs = kmeans_model.predict(transformed_parent_coords)
except IndexError as e:

print("Problem ===== ")
print(
f"Parent pcoords are shape {kmeans_model.model.pcoord0List.shape}"
)
print(f"Parent coords are shape {transformed_parent_coords.shape}")
print(f"Child pcoords are shape {kmeans_model.model.pcoord1List.shape}")
print(f"Child coords are shape {transformed_child_coords.shape}")
print("===== ")

raise e

kmeans_model.processing_from = False
child_dtrajs = kmeans_model.predict(transformed_child_coords)
except AttributeError as e:
log.error("Cluster center was not initialized and not remapped")
log.error(kmeans_model.we_remap)
raise e
# TODO: Remap to nearest visited

return (
(parent_dtrajs, child_dtrajs),
1,
iteration,
kmeans_model.target_bins,
kmeans_model.basis_bins,
)

class ClusteringMixin:
n_clusters = None
clusters = None
Expand Down Expand Up @@ -108,34 +230,9 @@ def do_discretization(self: "modelWE", arg):
return dtrajs, used_iters

@ray.remote
def do_ray_discretization(
model: "modelWE", kmeans_model, iteration, processCoordinates
):

# model_id, kmeans_model_id, iteration, processCoordinates_id = arg
def do_ray_discretization(actor: ClusteringActor, iteration):
return ray.get(actor.discretize.remote(iteration))

# self = ray.get(model_id)
# kmeans_model = ray.get(kmeans_model_id)
# processCoordinates = ray.get(processCoordinates_id)
# self = model

# Need to do this so the model's transformation array is writable -- otherwise predict chokes
# with 'buffer source array is read-only'.
kmeans_model = deepcopy(kmeans_model)

iter_coords = model.get_iter_coordinates(iteration)

# If there are no coords for this iteration, return None
if iter_coords.shape[0] == 0:
return None, 0, iteration

# Otherwise, apply the k-means model and discretize
transformed_coords = model.coordinates.transform(
processCoordinates(iter_coords)
)
dtrajs = kmeans_model.predict(transformed_coords)

return dtrajs, 1, iteration

def cluster_coordinates(
self: "modelWE",
Expand Down Expand Up @@ -414,10 +511,11 @@ def cluster_aggregated(

# Submit all the discretization tasks to the cluster
task_ids = []
n_actors = int(ray.available_resources().get("CPU", 1))

model_id = ray.put(self)
cluster_model_id = ray.put(cluster_model)
process_coordinates_id = ray.put(self.processCoordinates)
cluster_actors = [ClusteringActor.remote(
self, cluster_model, self.processCoordinates
) for _ in range(n_actors)]

# max_inflight = 50
for iteration in tqdm.tqdm(
Expand All @@ -429,9 +527,8 @@ def cluster_aggregated(
# num_ready = iteration - max_inflight
# ray.wait(task_ids, num_returns=num_ready)

_id = self.do_ray_discretization.remote(
model_id, cluster_model_id, iteration, process_coordinates_id
)
cluster_actor = cluster_actors[(iteration - 1) % n_actors]
_id = self.do_ray_discretization.remote(cluster_actor, iteration)
task_ids.append(_id)

# As they're completed, add them to dtrajs
Expand Down Expand Up @@ -1146,6 +1243,7 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None):
"""

self.check_connect_ray()
n_actors = int(ray.available_resources().get("CPU", 1))

self.dtrajs = []

Expand All @@ -1157,28 +1255,21 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None):
else:
log.debug("Using cached model for discretization")

model_id = ray.put(self.pre_discretization_model)

clusters = deepcopy(self.clusters)
# It's set inside do_stratified_ray_discretization, though I could do it in either place.
clusters.model = None # self.pre_discretization_model
cluster_model_id = ray.put(clusters)

process_coordinates_id = ray.put(self.processCoordinates)

cluster_actors = [ClusteringActor.remote(
self, clusters, self.processCoordinates
) for _ in range(n_actors)]
# max_inflight = 50
with ProgressBar(progress_bar) as progress_bar:
submit_task = progress_bar.add_task(
description="Submitting discretization tasks", total=self.maxIter - 1
)
for iteration in range(1, self.maxIter):
_id = self.do_stratified_ray_discretization.remote(
model_id,
cluster_model_id,
iteration,
process_coordinates_id
# self, self.clusters, iteration, self.processCoordinates
)
cluster_actor = cluster_actors[(iteration - 1) % n_actors]
_id = self.do_stratified_ray_discretization.remote(cluster_actor, iteration)
task_ids.append(_id)
progress_bar.update(submit_task, advance=1)

Expand Down Expand Up @@ -1222,8 +1313,6 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None):

del results
del finished
del model_id
del cluster_model_id

# Remove all empty elements from dtrajs and assign to self.dtrajs
self.dtrajs = [dtraj for dtraj in dtrajs if dtraj is not None]
Expand All @@ -1233,91 +1322,8 @@ def launch_ray_discretization(self: "modelWE", progress_bar=None):
log.debug("Discretization complete")

@ray.remote
def do_stratified_ray_discretization(
model: "modelWE", kmeans_model, iteration, processCoordinates
):

# model_id, kmeans_model_id, iteration, processCoordinates_id = arg

# import sys
# import westpa.core.binning

# sys.modules["westpa.binning"] = sys.modules["westpa.core.binning"]
# This is silly -- I need to import westpa.core.binning so it's loaded into sys.modules but the linter
# complains that it's unused... so, use it.
# log.debug(f"Loaded {westpa.core.binning}")

# self = ray.get(model_id)
# kmeans_model = ray.get(kmeans_model_id)
# processCoordinates = ray.get(processCoordinates_id)
self = model

# Need to do this so the model's transformation array is writable -- otherwise predict chokes
# with 'buffer source array is read-only'.
kmeans_model = deepcopy(kmeans_model)
kmeans_model.model = self

# for i, cluster_model in enumerate(kmeans_model.cluster_models):
# print(f"Model {i}: \t ", end=" ")
# try:
# print(cluster_model.cluster_centers_)
# except AttributeError:
# print("No cluster centers!")

# iter_coords = self.get_iter_coordinates(iteration)

kmeans_model.model.load_iter_data(iteration)
kmeans_model.model.get_transition_data_lag0()
# print(f"After loading coordPairList in iter {iteration}, shape is {kmeans_model.model.coordPairList.shape}")
parent_coords, child_coords = (
kmeans_model.model.coordPairList[..., 0],
self.coordPairList[..., 1],
)

# If there are no coords for this iteration, return None
if child_coords.shape[0] == 0:
return None, 0, iteration

# Otherwise, apply the k-means model and discretize
transformed_parent_coords = kmeans_model.model.coordinates.transform(
processCoordinates(parent_coords)
)
transformed_child_coords = kmeans_model.model.coordinates.transform(
processCoordinates(child_coords)
)

try:
kmeans_model.processing_from = True
try:
parent_dtrajs = kmeans_model.predict(transformed_parent_coords)
except IndexError as e:

print("Problem ===== ")
print(
f"Parent pcoords are shape {kmeans_model.model.pcoord0List.shape}"
)
print(f"Parent coords are shape {transformed_parent_coords.shape}")
print(f"Child pcoords are shape {kmeans_model.model.pcoord1List.shape}")
print(f"Child coords are shape {transformed_child_coords.shape}")
print("===== ")

raise e

kmeans_model.processing_from = False
child_dtrajs = kmeans_model.predict(transformed_child_coords)
except AttributeError as e:
log.error("Cluster center was not initialized and not remapped")
log.error(kmeans_model.we_remap)
raise e
# TODO: Remap to nearest visited

return (
(parent_dtrajs, child_dtrajs),
1,
iteration,
kmeans_model.target_bins,
kmeans_model.basis_bins,
)
def do_stratified_ray_discretization(actor: ClusteringActor, iteration):
return ray.get(actor.stratified_discretize.remote(iteration))

@staticmethod
def find_nearest_bin(bin_mapper, bin_idx, filled_bins):
Expand Down
2 changes: 1 addition & 1 deletion msm_we/_hamsm/_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def load_iter_coordinates(self: "modelWE"):
f"Attempting to obtain coordinates from west_file {west_file}, iteration {self.n_iter}"
)
# TODO: This should probably generically be -1, not 1, to deal with variable-length augmentation.
assert cur_iter_coords.shape[1] > 1, (
assert coords.shape[1] > 1, (
"Augmented coords only have 1 point in them -- "
"need at least start & end for transitions"
)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@
tests_require=test_requirements,
extras_require=EXTRAS_REQUIRE,
url="https://github.com/jdrusso/msm_we",
version="0.1.27",
version="0.1.28a1",
zip_safe=False,
)