Refactored start measurement method.

This commit is contained in:
jupfi 2023-12-10 08:55:11 +01:00
parent 222e5fe252
commit 16aabe1ecc

View file

@ -16,94 +16,194 @@ class LimeNQRController(BaseSpectrometerController):
super().__init__(module)
def start_measurement(self):
logger.debug(
"Starting measurement with spectrometer: %s", self.module.model.name
)
# Now we request the pulse sequence set in the pulse programmer module
pulse_sequence = self.module.model.pulse_programmer.model.pulse_sequence
logger.debug("Pulse sequence is: %s", pulse_sequence.to_json())
self.log_start_message()
try:
from .contrib.limr import limr
self_path = Path(__file__).parent
driver_path = str(self_path / "contrib/pulseN_test_USB.cpp")
lime = limr(driver_path)
except ImportError as e:
logger.error("Error while importing limr. %s", e)
except Exception as e:
logger.error("Error while loading pulseN_test_USB.cpp: %s", e)
lime.noi = -1 # No initialisation
lime.nrp = 1 # Numer of repetitions
lime = self.update_settings(lime)
lime = self.translate_pulse_sequence(lime)
lime.nav = self.module.model.averages
for key in sorted(lime.parsinp):
val = getattr(lime, key)
if val != []:
# logger.debug("Attribute: %s, Value: %s, Descr.: %s" % key, val, lime.parsinp[key][1])
logger.debug(key + ": " + str(val) + " " + lime.parsinp[key][1])
# Create temp folder for .hdf files
temp_dir = tempfile.TemporaryDirectory()
path = Path(temp_dir.name)
logger.debug("Created temporary directory: %s", path)
lime.spt = path
lime.fpa = "temp"
# Write to statusbar
self.module.nqrduck_signal.emit("statusbar_message", "Started Measurement")
logger.debug("Starting measurement")
lime.run()
logger.debug("Reading hdf file")
lime.readHDF()
rx_begin, rx_stop = self.translate_rx_event(lime)
logger.debug("RX event starts at: %s and ends at: %s", rx_begin, rx_stop)
# evaluation range, defines: blanking time and window length
evran = [rx_begin, rx_stop]
# np.where sometimes does not work out, so it is put in a try except
# always check the console for errors
try:
evidx = np.where((lime.HDF.tdx > evran[0]) & (lime.HDF.tdx < evran[1]))[0]
except:
logger.error("Error while reading the measurement data")
self.module.nqrduck_signal.emit("measurement_error", "Error with measurement data. Did you set an RX event?")
lime = self.initialize_lime()
if not lime:
return -1
# time domain x and y data
tdx = lime.HDF.tdx[evidx] - lime.HDF.tdx[evidx][0]
tdy = lime.HDF.tdy[evidx] / lime.nav
self.setup_lime_parameters(lime)
self.setup_temporary_storage(lime)
fft_shift = self.module.model.get_setting_by_name(self.module.model.FFT_SHIFT).value
self.emit_status_message("Started Measurement")
if fft_shift:
fft_shift = self.module.model.if_frequency
if not self.perform_measurement(lime):
self.emit_status_message("Measurement failed")
self.emit_measurement_error("Error with measurement data. Did you set an RX event?")
return -1
measurement_data = self.process_measurement_results(lime)
if measurement_data:
self.emit_measurement_data(measurement_data)
self.emit_status_message("Finished Measurement")
else:
fft_shift = 0
self.emit_measurement_error("Measurement failed. Unable to retrieve data.")
measurement_data = Measurement(
tdx,
tdy,
self.module.model.target_frequency,
frequency_shift=fft_shift,
)
def log_start_message(self):
"""This method logs a message when the measurement is started."""
logger.debug("Starting measurement with spectrometer: %s", self.module.model.name)
# Emit the data to the nqrduck core
def initialize_lime(self):
"""This method initializes the limr object that is used to communicate with the pulseN driver."""
try:
from .contrib.limr import limr
driver_path = str(Path(__file__).parent / "contrib/pulseN_test_USB.cpp")
return limr(driver_path)
except ImportError as e:
logger.error("Error while importing limr: %s", e)
except Exception as e:
logger.error("Error while initializing Lime driver: %s", e)
return None
def setup_lime_parameters(self, lime):
"""This method sets the parameters of the limr object according to the settings set in the spectrometer module.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
"""
lime.noi = -1
lime.nrp = 1
lime = self.update_settings(lime)
lime = self.translate_pulse_sequence(lime)
lime.nav = self.module.model.averages
self.log_lime_parameters(lime)
def setup_temporary_storage(self, lime):
"""This method sets up the temporary storage for the measurement data.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
"""
temp_dir = tempfile.TemporaryDirectory()
logger.debug("Created temporary directory at: %s", temp_dir.name)
lime.spt = Path(temp_dir.name) # Temporary storage path
lime.fpa = "temp" # Temporary filename prefix or related config
def perform_measurement(self, lime):
"""This method executes the measurement procedure.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
Returns:
bool: True if the measurement was successful, False otherwise
"""
logger.debug("Running the measurement procedure")
try:
lime.run()
lime.readHDF()
return True
except Exception as e:
logger.error("Failed to execute the measurement: %s", e)
return False
def process_measurement_results(self, lime):
"""This method processes the measurement results and returns a Measurement object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
Returns:
Measurement: The measurement data
"""
rx_begin, rx_stop = self.translate_rx_event(lime)
if rx_begin is None or rx_stop is None:
return None
logger.debug("RX event begins at: %sµs and ends at: %sµs", rx_begin, rx_stop)
return self.calculate_measurement_data(lime, rx_begin, rx_stop)
def calculate_measurement_data(self, lime, rx_begin, rx_stop):
"""This method calculates the measurement data from the limr object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
rx_begin (float): The start time of the RX event in µs
rx_stop (float): The stop time of the RX event in µs
Returns:
Measurement: The measurement data
"""
try:
evidx = self.find_evaluation_range_indices(lime, rx_begin, rx_stop)
tdx, tdy = self.extract_measurement_data(lime, evidx)
fft_shift = self.get_fft_shift()
return Measurement(tdx, tdy, self.module.model.target_frequency, frequency_shift=fft_shift)
except Exception as e:
logger.error("Error processing measurement result: %s", e)
return None
def find_evaluation_range_indices(self, lime, rx_begin, rx_stop):
"""This method finds the indices of the evaluation range in the measurement data.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
rx_begin (float): The start time of the RX event in µs
rx_stop (float): The stop time of the RX event in µs
Returns:
list: The indices of the evaluation range in the measurement data"""
return np.where((lime.HDF.tdx > rx_begin) & (lime.HDF.tdx < rx_stop))[0]
def extract_measurement_data(self, lime, indices):
"""This method extracts the measurement data from the limr object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
indices (list): The indices of the evaluation range in the measurement data
Returns:
tuple: A tuple containing the time vector and the measurement data
"""
tdx = lime.HDF.tdx[indices] - lime.HDF.tdx[indices][0]
tdy = lime.HDF.tdy[indices] / lime.nav
return tdx, tdy
def get_fft_shift(self):
"""This method returns the FFT shift value from the settings.
Returns:
int: The FFT shift value"""
fft_shift_enabled = self.module.model.get_setting_by_name(self.module.model.FFT_SHIFT).value
return self.module.model.if_frequency if fft_shift_enabled else 0
def emit_measurement_data(self, measurement_data):
"""This method emits the measurement data to the GUI.
Args:
measurement_data (Measurement): The measurement data
"""
logger.debug("Emitting measurement data")
self.module.nqrduck_signal.emit("statusbar_message", "Finished Measurement")
self.module.nqrduck_signal.emit("measurement_data", measurement_data)
def emit_status_message(self, message):
"""This method emits a status message to the GUI.
Args:
message (str): The status message
"""
self.module.nqrduck_signal.emit("statusbar_message", message)
def emit_measurement_error(self, error_message):
"""This method emits a measurement error to the GUI.
Args:
error_message (str): The error message
"""
logger.error(error_message)
self.module.nqrduck_signal.emit("measurement_error", error_message)
def log_lime_parameters(self, lime):
"""This method logs the parameters of the limr object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
"""
for key in sorted(lime.parsinp):
val = getattr(lime, key, [])
if val:
logger.debug(f"{key}: {val} {lime.parsinp[key][1]}")
def update_settings(self, lime):
"""This method sets the parameters of the limr object according to the settings set in the spectrometer module.
@ -118,6 +218,7 @@ class LimeNQRController(BaseSpectrometerController):
self.module.model.name,
)
lime.t3d = [0, 0, 0, 0]
# I don't like this code
for category in self.module.model.settings.keys():
for setting in self.module.model.settings[category]:
logger.debug("Setting %s has value %s", setting.name, setting.value)