Skip to content

Commit

Permalink
More progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ramo-j committed Jan 6, 2025
1 parent adf45eb commit 3ef11fb
Show file tree
Hide file tree
Showing 2 changed files with 401 additions and 84 deletions.
109 changes: 55 additions & 54 deletions dftimewolf/lib/containers/manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
"""A ContainerManager class."""


import dataclasses
import threading
import typing
from typing import cast, Sequence, Type

from dftimewolf.lib.containers import interface



@dataclasses.dataclass
class _MODULE():
name: str
dependencies: list[str] = dataclasses.field(default_factory=list)
storage: list[interface.AttributeContainer] = dataclasses.field(
default_factory=list)
completed: bool = False


class ContainerManager():
"""A ConatinerManager.
Expand All @@ -25,54 +36,39 @@ class ContainerManager():
def __init__(self):
"""Initialise a ContainerManager."""
self._mutex = threading.Lock()
self._dependencies: dict[str, # Module name
list[str] # Module names the module depends on
] = None
self._storage: dict[str, # Module name
list[interface.AttributeContainer] # A list of containers generated by the module # pylint: disable=line-too-long
] = {}
# self._storage: dict[str, # Module name
# dict[str, # Container type name
# list[interface.AttributeContainer]] # A list of containers generated by the module of the type # pylint: disable=line-too-long
# ] = {}

def ParseRecipe(self, recipe: dict[str, typing.Any]):
self._modules: dict[str, _MODULE] = None

def ParseRecipe(self, recipe: dict[str, typing.Any]) -> None:
"""Parses a recipe to build the dependency graph."""
self._dependencies = {}
self._modules = {}

for module in recipe.get('preflights', []) + recipe.get('modules', []):
name = module.get('runtime_name', module.get('name', None))
if not name:
raise RuntimeError("Name not set for module in recipe")

self._dependencies[name] = module.get('wants', []) + [name]
self._modules[name] = _MODULE(
name=name, dependencies=module.get('wants', []) + [name])

def StoreContainer(self,
source_module: str,
container: interface.AttributeContainer):
container: interface.AttributeContainer) -> None:
"""Adds a container to storage for later retrieval.
Args:
source_module: The module that generated the container.
container: The container to store.
"""
if not self._dependencies:
if not self._modules:
raise RuntimeError("Container manager has not parsed a recipe yet")

with self._mutex:
if source_module not in self._storage:
# self._storage[source_module] = {}
# if container.CONTAINER_TYPE not in self._storage[source_module]:
# self._storage[source_module][container.CONTAINER_TYPE] = []
# self._storage[source_module][container.CONTAINER_TYPE].append(container)
self._storage[source_module] = []
self._storage[source_module].append(container)
self._modules[source_module].storage.append(container)


def GetContainers(self,
requesting_module: str,
container_class: Type[interface.AttributeContainer],
pop: bool = False,
metadata_filter_key: str | None = None,
metadata_filter_value: typing.Any = None
) -> Sequence[interface.AttributeContainer]:
Expand All @@ -84,54 +80,59 @@ def GetContainers(self,
Args:
requesting_module: The module requesting the containers.
container_class: The type of container to retrieve.
pop: True to remove requested containers from storage.
metadata_filter_key: An optional metadata key to use to filter.
metadata_filter_value: An optional metadata value to use to filter.
Returns:
A sequence of containers that match the various filters.
"""
if not self._dependencies:
if not self._modules:
raise RuntimeError("Container manager has not parsed a recipe yet")
if bool(metadata_filter_key) ^ bool(metadata_filter_value):
raise RuntimeError('Must specify both key and value for attribute filter')

with self._mutex:
ret_val = []

for dependency in self._dependencies[requesting_module]:
containers = self._storage.get(dependency, []) # All the containers generated by the dependency
self._storage[dependency] = [] # Remove all containers
for dependency in self._modules[requesting_module].dependencies:
containers = self._modules[dependency].storage

for c in containers:
if c.CONTAINER_TYPE != container_class.CONTAINER_TYPE:
# Not the type we want - put it back
self._storage[dependency].append(c)
if (c.CONTAINER_TYPE != container_class.CONTAINER_TYPE or
(metadata_filter_key and
c.metadata.get(metadata_filter_key) != metadata_filter_value)):
continue

if (metadata_filter_key and
c.metadata.get(metadata_filter_key) != metadata_filter_value):
# Doesn't match metadat filter - put it back
self._storage[dependency].append(c)
continue

# if we get this far, we want to deliver the container to the caller.
ret_val.append(c)

if not pop:
# The caller wants it left in the state anyway - put it back.
self._storage[dependency].append(c)

return cast(Sequence[interface.AttributeContainer], ret_val)

# def DedupeContainers(self,
# container_class: Type[interface.AttributeContainer]):
# """Dedupe containers.
#
# Unsure if this is needed at the time of writing, but the functionality
# exists in what this manager will replace.
# """
# if not self._dependencies:
# raise RuntimeError("Container manager has not parsed a recipe yet")
#
# raise NotImplementedError()

def CompleteModule(self, module_name: str) -> None:
"""Mark a module as completed in storage.
Containers can consume large amounts of memory. Marking a module as
completed tells the container manager that containers no longer needed can
be removed from storage to free up that memory.
Args:
module_name: The module that has completed running.
"""
with self._mutex:
self._modules[module_name].completed = True

# If all modules `module_name` is a dependency for are marked completed,
# then containers it generated are no longer needed.
for key in self._modules:
if self._CheckDependenciesCompletion(key):
for c in self._modules[key].storage:
del c
self._modules[key].storage = []

def _CheckDependenciesCompletion(self, module_name: str) -> bool:
"""For a module, checks if other modules that depend on are complete."""
for key in self._modules:
if module_name in self._modules[key].dependencies:
if not self._modules[key].completed:
return False
return True
Loading

0 comments on commit 3ef11fb

Please sign in to comment.