Keeping a StateMachine alive #420
Unanswered
JamieAronson1998
asked this question in
Q&A
Replies: 2 comments 4 replies
-
Code: class GpsStateMachine(StateMachine):
idle = State(initial=True)
awake = State(enter="entering_awake", exit="leaving_awake")
invisible_awake = State(enter="entering_invisible_awake", exit="leaving_invisible_awake")
awake_to_invis = awake.to(invisible_awake, cond='ready_awake_to_invis')
invis_to_awake = invisible_awake.to(awake, cond='ready_invis_to_awake')
get_gps_data = invisible_awake.to.itself()
update_vehicle_state = awake_to_invis | invis_to_awake
start = idle.to(awake)
def __init__(self):
super().__init__()
self._client = MqttClient(self, "GpsClient", "localhost", 1883)
self._gps_recevier = GpsReceiver("localhost", "2947")
self.awake_loop_thread = Thread(target=self.awake_loop)
self.invis_awake_loop_thread = Thread(target=self.invis_awake_loop)
self.vehicle_state = 0
self.running_awake_loop_thread = False
self.running_invis_awake_loop_thread = False
print("Finished constructing GPS State Machine")
# self.start()
def before_update_vehicle_state(self, event: str, source: State, target: State, message: str = ""):
message = ". " + message if message else ""
return f"Running {event} from {source.id} to {target.id}{message}"
def ready_invis_to_awake(self, new_vehicle_state):
return self.vehicle_state == 0 and new_vehicle_state == '1'
def ready_awake_to_invis(self, new_vehicle_state):
return self.vehicle_state == 1 and new_vehicle_state == '0'
def entering_awake(self):
print("Entering State awake. Starting awake_loop_thread")
self.vehicle_state = 1
self.running_awake_loop_thread = True
self.awake_loop_thread.start()
def leaving_awake(self):
print("Leaving State awake. Closing awake_loop_thread")
self.running_awake_loop_thread = False
self.awake_loop_thread.join()
def entering_invisible_awake(self):
print("Entering State invisible_awake. Starting invisible_awake_thread")
self.vehicle_state = 0
self.running_invis_awake_loop_thread = True
self.invis_awake_loop_thread.start()
def leaving_invisible_awake(self):
print("Leaving State invisible_awake. Closing invisible_awake_thread")
self.running_invis_awake_loop_thread = False
self.invis_awake_loop_thread.join()
def on_update_vehicle_state(self):
pass
def on_get_gps_data(self):
print("Got request for GPS data")
payload = self._gps_recevier.get_gps_message()
if(payload != None):
self._client.publish_message(payload, "vehicle/gps_data")
def awake_loop(self):
while self.running_awake_loop_thread:
print("Looping awake_loop")
payload = self._gps_recevier.get_gps_message()
if(payload != None):
self._client.publish_message(payload, "vehicle/gps_data")
time.sleep(1)
def invis_awake_loop(self):
while self.running_invis_awake_loop_thread:
print("Looping invis_awake_loop") |
Beta Was this translation helpful? Give feedback.
1 reply
-
Ho @JamieAronson1998 , how are you? For the "alive", what did you mean, running on a background thread? The instance is keep alive as any other regular Python Object. If you want it to "run forever ", the user of the library should implement something like (pseudo code):
This helps? I didn't get the second part, can you elaborate more with examples? |
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi all. 2 questions and apologies if they are silly ones.
I am trying to understand what the best practice is for keeping a state machine instance 'alive'. After instantiating, I want the instance to keep running.
The second question is related to the first.. What is the best bay to create a loop for a particular state? ie when in a particular state, some code executes repeatedly and when in another state, another piece of code executes repeatedly.
I ask this because I have worked with an FMS in C++ that had this and would be looking for similar behaviour.
Thanks
Beta Was this translation helpful? Give feedback.
All reactions