This repository has been archived by the owner on Sep 11, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__main__.py
151 lines (116 loc) · 3.96 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import SimulationFramework as sim
import threading
import random
import queue
from time import sleep
from Projection import hackathon_projector
action_queues = {
plate: queue.Queue()
for plate in sim.all_license_plates()
}
charging_stations = sim.get_all_charging_stations()
class QueueWorker(threading.Thread):
def __init__(self, queue: queue.Queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
if self.queue.empty():
return
else:
action = self.queue.get()
"""
try:
execute_action(action)
except:
break
"""
if __debug__:
print('Executing action {} with args {}.'.format(
action[0].__name__, action[1]
))
execute_action(action)
def execute_action(action):
action[0](*action[1])
def app_opening_periodic(propability: float):
for plate in sim.all_license_plates():
if random.random() < propability:
action_queues[plate].put(
[sim.create_app_opening, [plate]]
)
start_workers()
def drain_fuel_periodic(min_drain: float, max_drain: float):
for plate in sim.all_license_plates():
action_queues[plate].put(
[sim.drain_fuel, [plate, random.uniform(min_drain, max_drain)]]
)
start_workers()
if __debug__:
print('drained fuel [{:.4f}, {:.4f}] from vehicles'.format(min_drain,
max_drain))
def kill_battery_periodic(propability: float):
for plate in sim.all_license_plates():
if random.random() < propability:
action_queues[plate].put(
[sim.kill_12v_battery, [plate]]
)
if __debug__:
print('Killed battery of {}.'.format(plate))
start_workers()
def initialize_simulation():
for plate in sim.all_license_plates():
action_queues[plate].put(
[sim.set_fuel_level, [plate, 100]]
)
action_queues[plate].put(
[sim.revive_12v_battery, [plate]]
)
action_queues[plate].put(
[sim.delete_tasks_for_vehicle, [plate]]
)
start_workers()
if __debug__:
print('Initialized Simulation.')
def check_position():
pos = {}
for plate in sim.all_license_plates():
pos[plate] = sim.get_lon_lat_of_vehicle(plate)
for pos_station in charging_stations.values():
if hackathon_projector.distance(pos[plate], pos_station) <= 200:
action_queues[plate].put(
[sim.drain_fuel, [plate, -5]]
)
start_workers()
def check_delivery():
# TODO: get app opening tasks for every vehicle and check if customer
# has been picked up or delivered
pass
def start_workers():
for plate in sim.all_license_plates():
QueueWorker(action_queues[plate]).start()
def is_finished_simulation() -> bool:
# TODO: define a condition which actually makes sense
return False
def simulation_step(step_frequency: float=5.0):
"""
perfoms 1 simulation step on fmm
:param step_frequency: number of simulation steps per second
:return:
"""
# time lapsed between simulation steps in seconds
time_per_step = 1.0 / step_frequency
drain_fuel_periodic(0.5*time_per_step/100.0, 1.5*time_per_step/100.0)
# kill battery about once per hour
kill_battery_periodic(time_per_step/3600.0)
# app-opening every minute
app_opening_periodic(time_per_step/60.0)
def main():
step_frequency = 1.0
initialize_simulation()
# simulation loop
while not is_finished_simulation():
threading.Thread(target=check_position).start()
simulation_step(step_frequency)
sleep(1.0/step_frequency)
if __name__ == '__main__':
main()