mirror of
https://gitlab.ewi.tudelft.nl/ee2l1/2025-2026/A.K.03.git
synced 2025-12-12 16:00:56 +01:00
149 lines
6.4 KiB
Python
149 lines
6.4 KiB
Python
import time as time
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
try:
|
|
from KITT_Simulator.shared_state import SharedState
|
|
except:
|
|
from shared_state import SharedState
|
|
|
|
# try:
|
|
# from KITT_Simulator.KITT_Control.Localization import Localization
|
|
# except:
|
|
# from KITT_Control.Localization import Localization
|
|
# pass
|
|
|
|
class sounddevice:
|
|
|
|
def __init__(self, x=240, y=30, theta=np.pi/2):
|
|
self.state = SharedState(x , y, theta)
|
|
self.state.beacon = True # Turn on the beacon for testing!!
|
|
|
|
def get_device_count(self):
|
|
return 5
|
|
|
|
def get_device_info_by_index(self, index):
|
|
example_output = [
|
|
{'index': 0, 'structVersion': 2, 'name': 'iPhone Microphone', 'hostApi': 0, 'maxInputChannels': 1, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.12841666666666668, 'defaultLowOutputLatency': 0.01, 'defaultHighInputLatency': 0.13775, 'defaultHighOutputLatency': 0.1, 'defaultSampleRate': 48000.0},
|
|
{'index': 1, 'structVersion': 2, 'name': 'Scarlett 18i20', 'hostApi': 0, 'maxInputChannels': 18, 'maxOutputChannels': 18, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.0045578231292517, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.014716553287981859, 'defaultSampleRate': 48000.0},
|
|
{'index': 2, 'structVersion': 2, 'name': 'MacBook Pro Microphone', 'hostApi': 0, 'maxInputChannels': 1, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.03285416666666666, 'defaultLowOutputLatency': 0.01, 'defaultHighInputLatency': 0.0421875, 'defaultHighOutputLatency': 0.1, 'defaultSampleRate': 48000.0},
|
|
{'index': 3, 'structVersion': 2, 'name': 'MacBook Pro Speakers', 'hostApi': 0, 'maxInputChannels': 0, 'maxOutputChannels': 2, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.018708333333333334, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.028041666666666666, 'defaultSampleRate': 48000.0},
|
|
{'index': 4, 'structVersion': 2, 'name': 'Microsoft Teams Audio', 'hostApi': 0, 'maxInputChannels': 2, 'maxOutputChannels': 2, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.0013333333333333333, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.010666666666666666, 'defaultSampleRate': 48000.0},
|
|
]
|
|
return example_output[index]
|
|
|
|
def open(self, input_device_index, channels, format, rate, input):
|
|
self.stream = Stream(self.state)
|
|
return self.stream
|
|
|
|
class Stream:
|
|
|
|
def __init__(self, state):
|
|
self.state = state
|
|
|
|
self.pulse = []
|
|
self.silence = []
|
|
# Load the recording
|
|
self.load_recordings()
|
|
|
|
self.Fs = 44100 # Sampling frequency
|
|
self.num_pulses = 3
|
|
self.speed_of_sound = 34300 # Speed of sound in cm/s
|
|
|
|
def load_recordings(self):
|
|
# Load the recordings from files
|
|
try:
|
|
with open("KITT_Simulator/simulator_data/pulses_recording.txt", "r") as f:
|
|
line = f.readline()
|
|
self.pulse = np.array([int(i) for i in line.strip().split()[1:]], dtype=np.float32)
|
|
|
|
with open("KITT_Simulator/simulator_data/silence.txt", "r") as f:
|
|
self.silence = np.array([int(i) for i in f.readline().strip().split()[1:]], dtype=np.float32)
|
|
except:
|
|
with open("simulator_data/pulses_recording.txt", "r") as f:
|
|
line = f.readline()
|
|
self.pulse = np.array([int(i) for i in line.strip().split()[1:]], dtype=np.float32)
|
|
with open("simulator_data/silence.txt", "r") as f:
|
|
self.silence = np.array([int(i) for i in f.readline().strip().split()[1:]], dtype=np.float32)
|
|
|
|
def read(self, num_frames):
|
|
overflow = False
|
|
distances = self.__dist()
|
|
|
|
distances = [dist * self.Fs / self.speed_of_sound for dist in distances] # Convert distances to samples
|
|
|
|
R1 = self.pulse[1000-int(distances[0]):]
|
|
R2 = self.pulse[1000-int(distances[1]):]
|
|
R3 = self.pulse[1000-int(distances[2]):]
|
|
R4 = self.pulse[1000-int(distances[3]):]
|
|
R5 = self.pulse[1000-int(distances[4]):]
|
|
|
|
R1 = R1[:33000]
|
|
R2 = R2[:33000]
|
|
R3 = R3[:33000]
|
|
R4 = R4[:33000]
|
|
R5 = R5[:33000]
|
|
|
|
buffer = self.__interleave_and_prepare_buffer(R1, R2, R3, R4, R5)
|
|
return buffer, overflow
|
|
|
|
def __dist(self):
|
|
# Calculate distances from the car to each microphone
|
|
mic_coor = np.array([[0, 0], [0, 460], [460, 460], [460, 0], [230, 0]])
|
|
car_coor = np.array([self.state.x, self.state.y])
|
|
|
|
D1 = np.linalg.norm(mic_coor[0] - car_coor)
|
|
D2 = np.linalg.norm(mic_coor[1] - car_coor)
|
|
D3 = np.linalg.norm(mic_coor[2] - car_coor)
|
|
D4 = np.linalg.norm(mic_coor[3] - car_coor)
|
|
D5 = np.linalg.norm(mic_coor[4] - car_coor)
|
|
|
|
return D1, D2, D3, D4, D5
|
|
|
|
@staticmethod
|
|
def __interleave_and_prepare_buffer(*arrays):
|
|
# Interleave arrays and prepare buffer
|
|
length = min(len(arr) for arr in arrays)
|
|
arrays = [arr[:length] for arr in arrays]
|
|
|
|
interleaved_list = np.vstack(arrays).reshape((-1,), order='F')
|
|
buffer = interleaved_list.astype(np.float32)
|
|
return buffer
|
|
|
|
if __name__ == "__main__":
|
|
sounddevice_handle = sounddevice(x=400, y=140, theta=np.pi/2)
|
|
|
|
for i in range(sounddevice_handle.get_device_count()):
|
|
device_info = sounddevice_handle.get_device_info_by_index(i)
|
|
print(i, device_info['name'])
|
|
|
|
device_index = int(input('Enter device index: '))
|
|
Fs = 44100
|
|
|
|
stream = sounddevice_handle.open(
|
|
input_device_index=device_index, channels=5, format='float32', rate=Fs, input=True
|
|
)
|
|
|
|
samples,_ = stream.read(Fs * 6)
|
|
data = np.frombuffer(samples, dtype='float32')
|
|
|
|
recording = np.array([data[0::5], data[1::5], data[2::5], data[3::5], data[4::5]]).T
|
|
|
|
plt.plot(recording)
|
|
plt.title("Recording")
|
|
plt.show()
|
|
|
|
# Proceed with localization using the corrected recordings
|
|
pulse_dict = {}
|
|
with open("KITT_Simulator/simulator_data/pulses_recording.txt", "r") as f:
|
|
for line in f:
|
|
line = line.strip().split()
|
|
pulse_dict[int(line[0][:-1])] = [int(i) for i in line[1:]]
|
|
|
|
refSignal = pulse_dict[30][16000:19500]
|
|
|
|
# localization = Localization(recording, refSignal, 3, Fs)
|
|
|
|
# Get the localized position
|
|
# x_car, y_car = localization.localizations
|
|
# print(f"Estimated position: x={x_car}, y={y_car}") |