From f4442a05f5a62fafa91edb5bf26f3bb5187751fc Mon Sep 17 00:00:00 2001 From: Malik Ali Date: Fri, 5 Dec 2025 09:06:25 +0100 Subject: [PATCH] Cleaned up localization code 2 --- student_code/my_firstborn.py | 125 +++++++++++++++++++++++++---------- 1 file changed, 91 insertions(+), 34 deletions(-) diff --git a/student_code/my_firstborn.py b/student_code/my_firstborn.py index e3e9a3d..f244daf 100644 --- a/student_code/my_firstborn.py +++ b/student_code/my_firstborn.py @@ -1,69 +1,126 @@ -import numpy as np # For convolution function +import numpy as np # For working with numpy arrays import matplotlib.pyplot as plt # For plotting for tests from scipy.io import wavfile # For reading .wav files for testing -from scipy.signal import find_peaks # For filter function -from scipy.fft import fft, ifft # For fft and ifft +from scipy.signal import find_peaks # For cropping the microphone recordings +from scipy.fft import fft, ifft # For channel estimation from scipy.optimize import least_squares # For estimating KITT's location def recording_crop_normalize(recordings, ref_mic): # Finding the last peak in the recording of the chosen reference microphone ref_sig = recordings[:,ref_mic] - ref_peaks = find_peaks(ref_sig, height= 0.5*np.max(ref_sig)) + ref_peaks, _ = find_peaks(ref_sig, height= 0.5*max(abs(ref_sig))) ref_peak = ref_peaks[-1] # Cropping all recordings to show only the peaks around the reference peak start = ref_peak - 1500 end = ref_peak + 1500 - recordings_cropped = recordings[start:end] + recordings = recordings[start:end] # Normalizing all recordings after they are cropped - amplitude, sample, mic = recordings_cropped.shape - recordings_cropped_normalized = np.zeros((amplitude, sample, mic)) + samples, mic = recordings.shape + recordings_cropped_normalized = np.zeros((samples, mic)) for i in range(mic): - recordings_cropped_normalized[:, i] = recordings_cropped[:, i]/max(recordings_cropped[:, i]) - return recordings_cropped_normalized + recordings_cropped_normalized[:, i] = recordings[:, i]/max(abs(recordings[:, i])) + recordings = recordings_cropped_normalized + return recordings -def ch3(x, y, epsilon): - # Find both x (recording) and y (reference recording) in the frequency domain - padded_length = max(len(x), len(y)) - X = fft(x, padded_length-len(x)) - Y = fft(y, padded_length-len(y)) +def channel_estimation(recording, reference_recording, epsilon): + # Finding both the recording and the reference recording in the frequency domain + padded_length = max(len(recording), len(reference_recording)) + rec_freq = fft(recording, padded_length-len(recording)) + ref_rec_freq = fft(reference_recording, padded_length-len(reference_recording)) - # Perform the deconvolution in the frequency domain - H = (Y*np.conj(X))/(np.abs(X)**2+epsilon) + # Performing the deconvolution in the frequency domain + ch_est_freq = (ref_rec_freq*np.conj(rec_freq))/(np.abs(rec_freq)**2+epsilon) - # Find the channel estimation the time domain and centres it - channel_estimate = np.real(ifft(H)) + # Finding the channel estimation in the time domain and centre it + channel_estimate = np.real(ifft(ch_est_freq)) channel_estimate = np.fft.fftshift(channel_estimate) return channel_estimate -def distance_calc(channel_estimate, sample_frequency): +def distance_calc(channel_estimate, sampling_rate): # Finding the location of the peak in the channel estimate relative to the reference peak center = len(channel_estimate)//2 - peak = np.argmax(np.abs(channel_estimate)) + peak = np.argmax(abs(channel_estimate)) sample_range = peak - center - # Calculating the Time Difference of Arrival (TDOA) from found peak location and using it together with the speed of sound to calculate the distance - time_dif = sample_range / sample_frequency - distance = time_dif * 34300 # cm + # Calculating the distance using the Time Difference of Arrival (TDOA) from found peak location + time_dif = sample_range/sampling_rate + distance = time_dif * 34300 # cm return distance -def location_estimation(mic_coords, ref_mic, distances, start_point = [230,230,0]): - # Using the location of the reference microphone as the refence point - ref_point = mic_coords[ref_mic] - other_indices = [i for i in range(mic_coords.shape[0]) if i != ref_mic] +def location_estimation(mic_locations, ref_mic, distances, start_point = None): + # Choose a start point which serves as your starting "guessed" location. If no start point is given, choose the middle of the field as the start point. + if start_point is None: + start_point = [230,230,0] - # Generating the residuals function that is to be minimized - def residuals_function(point): - point = np.array([point[0],point[1],0]) + # Using the location of the reference microphone as the refence point + ref_point = mic_locations[ref_mic] + other_indices = [i for i in range(mic_locations.shape[0]) if i != ref_mic] + + # Generating the residuals function that is to be minimized. This residual is the difference between the "guessed" location and the location calculated from the microphone recordings + def residuals_function(guess): + guess = np.array([guess[0],guess[1],0]) residuals = [] for i, idx in enumerate(other_indices): - mic = mic_coords[idx] - residual = (np.linalg.norm(point-mic) - np.linalg.norm(point-ref_point)) - distances[i] + mic = mic_locations[idx] + residual = (np.linalg.norm(guess-mic) - np.linalg.norm(guess-ref_point)) - distances[i] residuals.append(residual) return residuals - # Uses the least squares method to minimize the difference between the estimated location and the location calculated from the microphone recordings. + # Using the least squares method to minimize the residuals function location = least_squares(residuals_function, start_point, bounds = ([0,0,-1],[460,460,1])) - return location.x \ No newline at end of file + return location.x + +def localization(recordings, sampling_rate): + # Choosing a reference microphone. 0 is mic 1; 4 is mic 5 + ref_mic = 4 + + # Normalize and crop the recordings + recordings = recording_crop_normalize(recordings, ref_mic) + + # Finding the channel estimates between each recording and the reference recording + epsilon = 0.0001 + channel_estimates = [] + recording, mic = recordings.shape + for i in range(mic): + if i == ref_mic: + continue + else: + channel_estimates.append(channel_estimation(recordings[:, i], recordings[:, ref_mic], epsilon)) + + # Finding the distances that correspond to the Time Difference of Arrival (TDOA) for each channel estimate + distances = [] + for i in range(len(channel_estimates)): + distances.append(distance_calc(channel_estimates[i], sampling_rate)) + + # Estimating the location using the least squares method + mic_locations = np.array([ + [0, 0, 25], # mic 1 cm + [0, 460, 25], # mic 2 cm + [460, 460, 25], # mic 3 cm + [460, 0, 25], # mic 4 cm + [0, 230, 55] # mic 5 cm + ]) + location_estimate = location_estimation(mic_locations, ref_mic, distances) + return location_estimate + +# Test +if __name__ == "__main__": + # Coordinates of the recordings + record_x = [64, 82, 109, 143, 150, 178, 232] + record_y = [40, 399, 76, 296, 185, 439, 275] + + # Generating the filenames + filenames = [] + for i in range(len(record_x)): + real_x = record_x[i] + real_y = record_y[i] + filenames.append(f"../files/Student Recordings/record_x{real_x}_y{real_y}.wav") + + for i in range(len(filenames)): + sampling_rate, recordings = wavfile.read(filenames[i]) + print(f"\nRecording {i+1}: {filenames[i]}") + location_estimate = localization(recordings, sampling_rate) + print("Estimated source position:", location_estimate) \ No newline at end of file