Skip to content

Commit

Permalink
feat(memory): Add SharedMemory mechanism for agent requests
Browse files Browse the repository at this point in the history
Implemented memory request handling:
Agent sends memory requests (save/load).
RRScheduler processes the requests.
SharedMemory performs the actual operations.
Responses are returned to the Agent.
Note: The mechanism is complete, but Agent usage integration is pending.
  • Loading branch information
XiangZhang-zx committed Nov 17, 2024
1 parent a68b0b4 commit 01d1736
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 28 deletions.
107 changes: 97 additions & 10 deletions aios/memory/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,103 @@
# multi-agent systems

from aios.memory.base import BaseMemory
from threading import Lock
import pickle
import os

'''
TODO: implement the shared memory mechanism
'''
class SharedMemory(BaseMemory):
def __init__(self):
pass
def __init__(self, storage_path="shared_memory"):
"""Initialize shared memory
Args:
storage_path: Path for storing shared memory data
"""
self.storage_path = storage_path
self._memory = {} # Dictionary for memory data
self._lock = Lock() # Thread lock for synchronization

# Create storage directory if not exists
if not os.path.exists(storage_path):
os.makedirs(storage_path)

def save(self, key, value, agent_id=None):
"""Save data to shared memory
Args:
key: Key for the data
value: Data to be saved
agent_id: Optional agent identifier
"""
with self._lock:
# Use agent_id as namespace if specified
if agent_id:
if agent_id not in self._memory:
self._memory[agent_id] = {}
self._memory[agent_id][key] = value
else:
self._memory[key] = value

# Persist to disk
self._save_to_disk()

def save(self):
pass

def load(self):
pass
def load(self, key, agent_id=None):
"""Load data from shared memory
Args:
key: Key of data to load
agent_id: Optional agent identifier
Returns:
Loaded data, or None if not exists
"""
with self._lock:
# Load from disk if file exists
self._load_from_disk()

try:
if agent_id:
return self._memory.get(agent_id, {}).get(key)
return self._memory.get(key)
except KeyError:
return None

def _save_to_disk(self):
"""Persist memory data to disk"""
file_path = os.path.join(self.storage_path, "shared_memory.pkl")
with open(file_path, "wb") as f:
pickle.dump(self._memory, f)

def _load_from_disk(self):
"""Load data from disk to memory"""
file_path = os.path.join(self.storage_path, "shared_memory.pkl")
if os.path.exists(file_path):
with open(file_path, "rb") as f:
self._memory = pickle.load(f)

def clear(self, agent_id=None):
"""Clear shared memory data
Args:
agent_id: Optional agent identifier, if specified only clear data for this agent
"""
with self._lock:
if agent_id:
self._memory.pop(agent_id, None)
else:
self._memory.clear()
self._save_to_disk()

def get_all(self, agent_id=None):
"""Get all shared memory data
Args:
agent_id: Optional agent identifier, if specified only return data for this agent
Returns:
Dictionary containing all data
"""
with self._lock:
self._load_from_disk()
if agent_id:
return self._memory.get(agent_id, {})
return self._memory
40 changes: 22 additions & 18 deletions aios/scheduler/rr_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class RRScheduler:
def __init__(
self,
llm,
# memory_manager,
log_mode,
get_llm_request: LLMRequestQueueGetMessage,
get_memory_request: MemoryRequestQueueGetMessage,
Expand All @@ -41,21 +40,20 @@ def __init__(
self.get_memory_request = get_memory_request
self.get_storage_request = get_storage_request
self.get_tool_request = get_tool_request
self.active = False # start/stop the scheduler
self.active = False
self.log_mode = log_mode
self.logger = self.setup_logger()
# self.thread = Thread(target=self.run)
self.request_processors = {
"llm_syscall_processor": Thread(target=self.run_llm_request),
"mem_syscall_processor": Thread(target=self.run_memory_request),
"sto_syscall_processor": Thread(target=self.run_storage_request),
"tool_syscall_processor": Thread(target=self.run_tool_request)
# "memory_request_processor": Thread(self.run_memory_request)
}
self.llm = llm
self.time_limit = 5
self.simple_context_manager = SimpleContextManager()
# self.memory_manager = memory_manager
# Initialize shared memory
self.shared_memory = SharedMemory()

def start(self):
"""start the scheduler"""
Expand Down Expand Up @@ -109,33 +107,39 @@ def run_llm_request(self):
def run_memory_request(self):
while self.active:
try:
# wait at a fixed time interval, if there is nothing received in the time interval, it will raise Empty
agent_request = self.get_memory_request()

agent_request.set_status("executing")
self.logger.log(
f"{agent_request.agent_name} is executing. \n", "execute"
)
self.logger.log(f"{agent_request.agent_name} is executing memory operation.\n", "execute")
agent_request.set_start_time(time.time())

response = self.memory_manager.address_request(agent_request)
# Handle different types of memory operations
if agent_request.operation == "save":
self.shared_memory.save(
agent_request.key,
agent_request.value,
agent_request.agent_id
)
response = {"status": "success", "operation": "save"}
elif agent_request.operation == "load":
value = self.shared_memory.load(
agent_request.key,
agent_request.agent_id
)
response = {"status": "success", "operation": "load", "value": value}

agent_request.set_response(response)

# self.llm.address_request(agent_request)

agent_request.event.set()
agent_request.set_status("done")
agent_request.set_end_time(time.time())

self.logger.log(
f"Current request of {agent_request.agent_name} is done. Thread ID is {agent_request.get_pid()}\n",
"done",
f"Memory request of {agent_request.agent_name} is done. Thread ID is {agent_request.get_pid()}\n",
"done"
)
# wait at a fixed time interval, if there is nothing received in the time interval, it will raise Empty

except Empty:
pass

except Exception:
traceback.print_exc()

Expand Down

0 comments on commit 01d1736

Please sign in to comment.