A.K.03/Manual/KITT_Simulator/sounddevice_simulator.py
Nicholas Stănescu ee4d929042 done
2025-11-19 19:57:14 +01:00

148 lines
6.3 KiB
Python

import numpy as np
import matplotlib.pyplot as plt
try:
from Manual.KITT_Simulator.shared_state import SharedState
except:
from shared_state import SharedState
# try:
# from KITT_Simulator.KITT_Control.Localization import Localization
# except:
# from KITT_Control.Localization import Localization
# pass
class sounddevice:
def __init__(self, x=240, y=30, theta=np.pi/2):
self.state = SharedState(x , y, theta)
self.state.beacon = True # Turn on the beacon for testing!!
def get_device_count(self):
return 5
def get_device_info_by_index(self, index):
example_output = [
{'index': 0, 'structVersion': 2, 'name': 'iPhone Microphone', 'hostApi': 0, 'maxInputChannels': 1, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.12841666666666668, 'defaultLowOutputLatency': 0.01, 'defaultHighInputLatency': 0.13775, 'defaultHighOutputLatency': 0.1, 'defaultSampleRate': 48000.0},
{'index': 1, 'structVersion': 2, 'name': 'Scarlett 18i20', 'hostApi': 0, 'maxInputChannels': 18, 'maxOutputChannels': 18, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.0045578231292517, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.014716553287981859, 'defaultSampleRate': 48000.0},
{'index': 2, 'structVersion': 2, 'name': 'MacBook Pro Microphone', 'hostApi': 0, 'maxInputChannels': 1, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.03285416666666666, 'defaultLowOutputLatency': 0.01, 'defaultHighInputLatency': 0.0421875, 'defaultHighOutputLatency': 0.1, 'defaultSampleRate': 48000.0},
{'index': 3, 'structVersion': 2, 'name': 'MacBook Pro Speakers', 'hostApi': 0, 'maxInputChannels': 0, 'maxOutputChannels': 2, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.018708333333333334, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.028041666666666666, 'defaultSampleRate': 48000.0},
{'index': 4, 'structVersion': 2, 'name': 'Microsoft Teams Audio', 'hostApi': 0, 'maxInputChannels': 2, 'maxOutputChannels': 2, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.0013333333333333333, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.010666666666666666, 'defaultSampleRate': 48000.0},
]
return example_output[index]
def open(self, input_device_index, channels, format, rate, input):
self.stream = Stream(self.state)
return self.stream
class Stream:
def __init__(self, state):
self.state = state
self.pulse = []
self.silence = []
# Load the recording
self.load_recordings()
self.Fs = 44100 # Sampling frequency
self.num_pulses = 3
self.speed_of_sound = 34300 # Speed of sound in cm/s
def load_recordings(self):
# Load the recordings from files
try:
with open("KITT_Simulator/simulator_data/pulses_recording.txt", "r") as f:
line = f.readline()
self.pulse = np.array([int(i) for i in line.strip().split()[1:]], dtype=np.float32)
with open("KITT_Simulator/simulator_data/silence.txt", "r") as f:
self.silence = np.array([int(i) for i in f.readline().strip().split()[1:]], dtype=np.float32)
except:
with open("simulator_data/pulses_recording.txt", "r") as f:
line = f.readline()
self.pulse = np.array([int(i) for i in line.strip().split()[1:]], dtype=np.float32)
with open("simulator_data/silence.txt", "r") as f:
self.silence = np.array([int(i) for i in f.readline().strip().split()[1:]], dtype=np.float32)
def read(self, num_frames):
overflow = False
distances = self.__dist()
distances = [dist * self.Fs / self.speed_of_sound for dist in distances] # Convert distances to samples
R1 = self.pulse[1000-int(distances[0]):]
R2 = self.pulse[1000-int(distances[1]):]
R3 = self.pulse[1000-int(distances[2]):]
R4 = self.pulse[1000-int(distances[3]):]
R5 = self.pulse[1000-int(distances[4]):]
R1 = R1[:33000]
R2 = R2[:33000]
R3 = R3[:33000]
R4 = R4[:33000]
R5 = R5[:33000]
buffer = self.__interleave_and_prepare_buffer(R1, R2, R3, R4, R5)
return buffer, overflow
def __dist(self):
# Calculate distances from the car to each microphone
mic_coor = np.array([[0, 0], [0, 460], [460, 460], [460, 0], [230, 0]])
car_coor = np.array([self.state.x, self.state.y])
D1 = np.linalg.norm(mic_coor[0] - car_coor)
D2 = np.linalg.norm(mic_coor[1] - car_coor)
D3 = np.linalg.norm(mic_coor[2] - car_coor)
D4 = np.linalg.norm(mic_coor[3] - car_coor)
D5 = np.linalg.norm(mic_coor[4] - car_coor)
return D1, D2, D3, D4, D5
@staticmethod
def __interleave_and_prepare_buffer(*arrays):
# Interleave arrays and prepare buffer
length = min(len(arr) for arr in arrays)
arrays = [arr[:length] for arr in arrays]
interleaved_list = np.vstack(arrays).reshape((-1,), order='F')
buffer = interleaved_list.astype(np.float32)
return buffer
if __name__ == "__main__":
sounddevice_handle = sounddevice(x=400, y=140, theta=np.pi/2)
for i in range(sounddevice_handle.get_device_count()):
device_info = sounddevice_handle.get_device_info_by_index(i)
print(i, device_info['name'])
device_index = int(input('Enter device index: '))
Fs = 44100
stream = sounddevice_handle.open(
input_device_index=device_index, channels=5, format='float32', rate=Fs, input=True
)
samples,_ = stream.read(Fs * 6)
data = np.frombuffer(samples, dtype='float32')
recording = np.array([data[0::5], data[1::5], data[2::5], data[3::5], data[4::5]]).T
plt.plot(recording)
plt.title("Recording")
plt.show()
# Proceed with localization using the corrected recordings
pulse_dict = {}
with open("KITT_Simulator/simulator_data/pulses_recording.txt", "r") as f:
for line in f:
line = line.strip().split()
pulse_dict[int(line[0][:-1])] = [int(i) for i in line[1:]]
refSignal = pulse_dict[30][16000:19500]
# localization = Localization(recording, refSignal, 3, Fs)
# Get the localized position
# x_car, y_car = localization.localizations
# print(f"Estimated position: x={x_car}, y={y_car}")