forked from agiresearch/AIOS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulator.py
104 lines (78 loc) · 2.25 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import sys
import json
from src.command_parser import (
PunctuationParser,
ChatGPTParser
)
from src.command_executor import (
Executor
)
from src.scheduler.fifo_scheduler import FIFOScheduler
from src.scheduler.rr_scheduler import RRScheduler
from src.utils.utils import (
parse_global_args,
)
from src.agents.agent_factory import AgentFactory
from src.agents.agent_process import AgentProcessFactory
import warnings
from src.llm_kernel import llms
import threading
terminate_signal = threading.Event()
def main():
warnings.filterwarnings("ignore")
parser = parse_global_args()
args = parser.parse_args()
llm_name = args.llm_name
max_gpu_memory = args.max_gpu_memory
eval_device = args.eval_device
max_new_tokens = args.max_new_tokens
scheduler_log_mode = args.scheduler_log_mode
agent_log_mode = args.agent_log_mode
llm = llms.LLMKernel(
llm_name,
max_gpu_memory,
eval_device,
max_new_tokens
)
# start the scheduler
# scheduler = FIFOScheduler(
# llm = llm,
# log_mode = scheduler_log_mode
# )
scheduler = RRScheduler(
llm = llm,
log_mode = scheduler_log_mode
)
agent_process_factory = AgentProcessFactory()
agent_factory = AgentFactory(
llm = llm,
agent_process_queue = scheduler.agent_process_queue,
agent_process_factory = agent_process_factory,
agent_log_mode = agent_log_mode
)
parser = PunctuationParser(
llm = llm
)
executor = Executor(agent_factory=agent_factory)
scheduler.start()
while True:
try:
# Read a command line input
command_line = input(f"[{llm_name}]>")
if command_line.strip().lower() == "exit":
print("Exiting...")
# agent_factory.terminate_signal.set()
break
# Parse command
tokens = parser.parse(command_line)
# Execute the command
executor.execute(tokens)
except KeyboardInterrupt:
# Handle Ctrl+C gracefully
print("\nUse 'exit' to quit the shell.")
except EOFError:
pass
scheduler.stop()
if __name__ == "__main__":
main()