Skip to content

Commit

Permalink
Remove old 'part' parallelism and further quieten down logging
Browse files Browse the repository at this point in the history
  • Loading branch information
apontzen committed Dec 15, 2023
1 parent 82e2b58 commit a63ac88
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 31 deletions.
23 changes: 10 additions & 13 deletions tangos/parallel_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,19 @@ def launch(function, args=None, backend_kwargs=None):

return result

def distributed(file_list, proc=None, of=None, allow_resume=False, resumption_id=None):
"""Distribute a list of tasks between all nodes"""
def distributed(items, allow_resume=False, resumption_id=None):
"""Return an iterator that consumes the items, distributed across all processors
(i.e. each item is consumed by only one processor, in a dynamic way).
if type(file_list) == set:
file_list = list(file_list)
Optionally, if allow_resume is True, then the iterator will resume from the last point it reached
provided argv and the stack trace are unchanged. If resumption_id is not None, then
the stack trace is ignored and only resumption_id needs to match."""

if type(items) == set:
items = list(items)

if _backend_name=='null':
if proc is None:
proc = 1
of = 1
i = (len(file_list) * (proc - 1)) // of
j = (len(file_list) * proc) // of - 1
assert proc <= of and proc > 0
if proc == of:
j += 1
return file_list[i:j + 1]
return items
else:
from . import jobs
return jobs.parallel_iterate(file_list, allow_resume, resumption_id)
Expand Down
36 changes: 18 additions & 18 deletions tangos/tools/property_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ def files(self):


def _get_parallel_timestep_iterator(self):
if self.options.part is not None:
# In the case of a null backend with manual parallelism, pass the specified part specification
ma_files = parallel_tasks.distributed(self.files, proc=self.options.part[0], of=self.options.part[1])
if parallel_tasks.backend is None:
# Go sequentially
ma_files = self.files
elif self.options.load_mode is not None and self.options.load_mode.startswith('server'):
# In the case of loading from a centralised server, each node works on the _same_ timestep --
# parallelism is then implemented at the halo level
Expand Down Expand Up @@ -183,8 +183,8 @@ def _compile_inclusion_criterion(self):
else:
self._include = None

def _log_one_process(self, *args):
if parallel_tasks.backend is None or parallel_tasks.backend.rank()==1:
def _log_once_per_timestep(self, *args):
if parallel_tasks.backend is None or parallel_tasks.backend.rank()==1 or self.options.load_mode is None:
logger.info(*args)

def _summarise_timing(self):
Expand Down Expand Up @@ -223,8 +223,8 @@ def _build_halo_list(self, db_timestep):

# perform filtering:
halos = [halo_i for halo_i, include_i in zip(halos, inclusion) if include_i]
self._log_one_process("User-specified inclusion criterion excluded %d of %d halos",
len(inclusion)-len(halos),len(inclusion))
self._log_once_per_timestep("User-specified inclusion criterion excluded %d of %d halos",
len(inclusion) - len(halos), len(inclusion))

return halos

Expand Down Expand Up @@ -477,7 +477,7 @@ def run_halo_calculation(self, db_halo, existing_properties):


def run_timestep_calculation(self, db_timestep):
logger.info("Processing %r", db_timestep)
self._log_once_per_timestep("Processing %r", db_timestep)
self._property_calculator_instances = properties.instantiate_classes(db_timestep.simulation,
self.options.properties,
explain=self.options.explain_classes)
Expand All @@ -491,24 +491,24 @@ def run_timestep_calculation(self, db_timestep):

self._existing_properties_all_halos = self._build_existing_properties_all_halos(db_halos)

self._log_one_process("Successfully gathered existing properties; calculating halo properties now...")
self._log_once_per_timestep("Successfully gathered existing properties; calculating halo properties now...")

self._log_one_process(" %d halos to consider; %d calculation routines for each of them, resulting in %d properties per halo",
len(db_halos), len(self._property_calculator_instances),
sum([1 if isinstance(x.names, str) else len(x.names) for x in self._property_calculator_instances])
)
self._log_once_per_timestep(" %d halos to consider; %d calculation routines for each of them, resulting in %d properties per halo",
len(db_halos), len(self._property_calculator_instances),
sum([1 if isinstance(x.names, str) else len(x.names) for x in self._property_calculator_instances])
)

self._log_one_process(" The property modules are:")
self._log_once_per_timestep(" The property modules are:")
for x in self._property_calculator_instances:
x_type = type(x)
self._log_one_process(f" {x_type.__module__}.{x_type.__qualname__}")
self._log_once_per_timestep(f" {x_type.__module__}.{x_type.__qualname__}")

for db_halo, existing_properties in \
self._get_parallel_halo_iterator(list(zip(db_halos, self._existing_properties_all_halos))):
self._existing_properties_this_halo = existing_properties
self.run_halo_calculation(db_halo, existing_properties)

logger.info("Done with %r",db_timestep)
self._log_once_per_timestep("Done with %r", db_timestep)
self._unload_timestep()

self.tracker.report_to_log_or_server(logger)
Expand All @@ -528,8 +528,8 @@ def _add_prerequisites_to_calculator_instances(self, db_timestep):
for r in requirements:
if r not in will_calculate:
new_instance = properties.instantiate_class(db_timestep.simulation, r)
self._log_one_process("Missing prerequisites - added class %r",type(new_instance))
self._log_one_process(" providing properties %r",new_instance.names)
self._log_once_per_timestep("Missing prerequisites - added class %r", type(new_instance))
self._log_once_per_timestep(" providing properties %r", new_instance.names)
self._property_calculator_instances = [new_instance]+self._property_calculator_instances
self._add_prerequisites_to_calculator_instances(db_timestep) # everything has changed; start afresh
break
Expand Down

0 comments on commit a63ac88

Please sign in to comment.