A.K.03/Manual/KITT_Simulator/serial_simulator.py
Nicholas Stănescu ee4d929042 done
2025-11-19 19:57:14 +01:00

136 lines
6.0 KiB
Python

import time
import threading
import numpy as np
import logging
from Manual.KITT_Simulator.shared_state import SharedState
from Manual.KITT_Simulator.gui import GUI
from Manual.KITT_Simulator.dynamics_simulator import Dynamics
# try:
# from KITT_Simulator.shared_state import SharedState
# from KITT_Simulator.gui import GUI
# from KITT_Simulator.dynamics_simulator import Dynamics
# except ImportError:
# from shared_state import SharedState
# from GUI_notebook import GUI
# from dynamics_simulator import Dynamics
# else:
# raise ImportError("Could not import the required modules.")
class Serial:
"""Class to simulate serial communication with the car."""
def __init__(self, port, baudrate, rtscts=True, x=240, y=30, theta=np.pi/2, update_freq=20):
"""Initializes the Serial class with port, baudrate, and initial state, and starts the dynamics thread."""
self.port = port
self.baudrate = baudrate
self.rtscts = rtscts
self.update_freq = update_freq
self.send_buffer = [] # Buffer for storing commands to be sent
self.state = SharedState(x, y, theta)
self.dynamics = Dynamics(self.state)
self.gui = GUI(self.state) # Initialize the GUI
self.gui.display()
self.update_thread = threading.Thread(target=self.run_dynamics)
self.update_thread.daemon = True
self.stop_thread = threading.Event()
self.update_thread.start()
def write(self, command):
"""Writes a command to the serial port and updates the shared state accordingly."""
with self.state._lock:
logging.debug(f"Received command: {command}")
match command[0:1]:
case b'M':
motor_command = int(command[1:-1])
self.state.motor_command = motor_command
case b'D':
servo_command = int(command[1:-1])
self.state.servo_command = servo_command
case b'A':
if int(command[1:-1]) == 1:
self.state.beacon = True
elif int(command[1:-1]) == 0:
self.state.beacon = False
case b'S':
left_distance = self.state.dist_L
right_distance = self.state.dist_R
voltage = 11.5
audio_code = 0xABCDEF00
carrier_frequency = 5678
bit_frequency = 1234
repetition_count = 1337
match command[1:2]:
case b'd':
status = "USL{}\nUSR{}\n\x04".format(left_distance, right_distance)
case b'v':
status = """VBATT{:.2f}V\n\x04""".format(voltage)
case _:
status = '**************************\n'
status += '* Audio Beacon: {}\n'.format('on' if self.state.beacon else 'off')
status += '* c: {:#x}\n'.format(audio_code)
status += '* f_c: {}\n'.format(carrier_frequency)
status += '* f_b: {}\n'.format(bit_frequency)
status += '* c_r: {}\n'.format(repetition_count)
status += '**************************\n'
status += '* PWM:\n'
status += '* Dir. {}\n'.format(self.state.servo_command)
status += '* Mot. {}\n'.format(self.state.motor_command)
status += '**************************\n'
status += '* Sensors:\n'
status += '* Dist. L {} R {}\n'.format(left_distance, right_distance)
status += '* V_batt {} V\n'.format(voltage)
status += '**************************\n\x04'
self.send_buffer.append(status)
case b'V':
version = '*******************************\n' \
'* EPO-4 *\n' \
'*******************************\n' \
'* KITT Simulator Rev. 0.1 *\n' \
'* Audio Firmware Rev. 0.0 *\n' \
'*******************************\n' \
'* Author: M. Rom B.Sc. *\n' \
'* Date: Sep 10, 2024 *\n' \
'*******************************\n\x04'
self.send_buffer.append(version)
def read_until(self, end):
"""Reads from the send buffer until a specified end character is found."""
return self.send_buffer.pop(0).encode() # Return the first element in the send buffer
def run_dynamics(self):
"""Continuously updates the state using dynamics every 50ms."""
update_freq = self.update_freq # Desired updates per second
update_interval = 1 / update_freq # Time per update in seconds
while not self.stop_thread.is_set():
start_time = time.time() # Start time of this loop
self.dynamics.update_state()
self.gui.update()
elapsed_time = time.time() - start_time # Time taken for this update loop
sleep_time = update_interval - elapsed_time
if sleep_time > 0:
time.sleep(sleep_time) # Sleep only if we have time remaining in the frame
else:
print("WARNING: Dynamics thread running too slow. Consider reducing the update frequency.")
def close(self):
"""Closes the serial connection."""
if self.update_thread.is_alive():
self.stop_thread.set()
self.update_thread.join()
SharedState.reset()
def __del__(self):
"""Destructor to clean up when the Serial object is deleted."""
self.close()