From 039e9fe2d976b61100153e68bd432a1274ed6368 Mon Sep 17 00:00:00 2001 From: jupfi Date: Mon, 11 Dec 2023 08:03:02 +0100 Subject: [PATCH] Changed communication for serial connections to signal and slots. --- src/nqrduck_autotm/controller.py | 256 +++++++++++++++++++++++++------ src/nqrduck_autotm/model.py | 119 ++++++++++---- src/nqrduck_autotm/view.py | 15 +- 3 files changed, 306 insertions(+), 84 deletions(-) diff --git a/src/nqrduck_autotm/controller.py b/src/nqrduck_autotm/controller.py index 14e80a0..16c98c5 100644 --- a/src/nqrduck_autotm/controller.py +++ b/src/nqrduck_autotm/controller.py @@ -16,6 +16,20 @@ logger = logging.getLogger(__name__) class AutoTMController(ModuleController): BAUDRATE = 115200 + def on_loading(self): + """This method is called when the module is loaded. + It sets up the serial connection and connects the signals and slots. + """ + logger.debug("Setting up serial connection") + self.find_devices() + + # Connect signals + self.module.model.serial_data_received.connect(self.process_frequency_sweep_data) + self.module.model.serial_data_received.connect(self.process_measurement_data) + self.module.model.serial_data_received.connect(self.process_calibration_data) + self.module.model.serial_data_received.connect(self.print_info) + self.module.model.serial_data_received.connect(self.read_position_data) + @pyqtSlot(str, object) def process_signals(self, key: str, value: object) -> None: logger.debug("Received signal: %s", key) @@ -149,51 +163,60 @@ class AutoTMController(ModuleController): self.module.model.clear_data_points() self.module.view.create_frequency_sweep_spinner_dialog() - def process_frequency_sweep_data(self, text): + @pyqtSlot(str) + def process_frequency_sweep_data(self, text : str) -> None: """This method is called when data is received from the serial connection during a frequency sweep. It processes the data and adds it to the model. """ - text = text[1:].split("r") - frequency = float(text[0]) - return_loss, phase = map(float, text[1].split("p")) - self.module.model.add_data_point(frequency, return_loss, phase) + if text.startswith("f") and self.module.view.frequency_sweep_spinner.isVisible(): + text = text[1:].split("r") + frequency = float(text[0]) + return_loss, phase = map(float, text[1].split("p")) + self.module.model.add_data_point(frequency, return_loss, phase) - def process_measurement_data(self): + @pyqtSlot(str) + def process_measurement_data(self, text : str) -> None: """This method is called when data is received from the serial connection during a measurement. It processes the data and adds it to the model. """ - logger.debug("Measurement finished") - self.module.model.measurement = S11Data( - self.module.model.data_points.copy() - ) - self.finish_frequency_sweep() + if self.module.model.active_calibration is None and text.startswith("r"): + logger.debug("Measurement finished") + self.module.model.measurement = S11Data( + self.module.model.data_points.copy() + ) + self.finish_frequency_sweep() - def process_calibration_data(self, calibration_type): + @pyqtSlot(str) + def process_calibration_data(self, text : str) -> None: """This method is called when data is received from the serial connection during a calibration. It processes the data and adds it to the model. Args: calibration_type (str): The type of calibration that is being performed. """ - logger.debug(f"{calibration_type.capitalize()} calibration finished") - setattr(self.module.model, f"{calibration_type}_calibration", - S11Data(self.module.model.data_points.copy())) - self.module.model.active_calibration = None - self.module.view.frequency_sweep_spinner.hide() + if text.startswith("r") and self.module.model.active_calibration in ["short", "open", "load"]: + calibration_type = self.module.model.active_calibration + logger.debug(f"{calibration_type.capitalize()} calibration finished") + setattr(self.module.model, f"{calibration_type}_calibration", + S11Data(self.module.model.data_points.copy())) + self.module.model.active_calibration = None + self.module.view.frequency_sweep_spinner.hide() - def process_voltage_sweep_result(self, text): + @pyqtSlot(str) + def process_voltage_sweep_result(self, text : str) -> None: """This method is called when data is received from the serial connection during a voltage sweep. It processes the data and adds it to the model. Args: text (str): The data received from the serial connection. """ - text = text[1:].split("t") - matching_voltage, tuning_voltage = map(float, text) - LUT = self.module.model.LUT - logger.debug("Received voltage sweep result: %s %s", matching_voltage, tuning_voltage) - LUT.add_voltages(matching_voltage, tuning_voltage) - self.continue_or_finish_voltage_sweep(LUT) + if text.startswith("v"): + text = text[1:].split("t") + matching_voltage, tuning_voltage = map(float, text) + LUT = self.module.model.LUT + logger.debug("Received voltage sweep result: %s %s", matching_voltage, tuning_voltage) + LUT.add_voltages(matching_voltage, tuning_voltage) + self.continue_or_finish_voltage_sweep(LUT) def finish_frequency_sweep(self): """This method is called when a frequency sweep is finished. @@ -244,37 +267,60 @@ class AutoTMController(ModuleController): self.module.view.add_info_text(f"Voltage sweep finished in {duration:.2f} seconds") self.module.nqrduck_signal.emit("LUT_finished", LUT) + @pyqtSlot(str) + def print_info(self, text : str) -> None: + """This method is called when data is received from the serial connection. + It prints the data to the info text box. + + Args: + text (str): The data received from the serial connection. + """ + if text.startswith("i"): + text = text[1:] + self.module.view.add_info_text(text) + elif text.startswith("e"): + text = text[1:] + self.module.view.add_error_text(text) + + @pyqtSlot(str) + def read_position_data(self, text : str) -> None: + """This method is called when data is received from the serial connection.""" + if text.startswith("p"): + # Format is pm + text = text[1:].split("m") + tuning_position, matching_position = map(int, text) + self.module.model.tuning_stepper.position = tuning_position + self.module.model.matching_stepper.position = matching_position + self.module.model.tuning_stepper.homed = True + self.module.model.matching_stepper.homed = True + logger.debug("Tuning position: %s, Matching position: %s", tuning_position, matching_position) + self.module.view.on_active_stepper_changed() + def on_ready_read(self) -> None: """This method is called when data is received from the serial connection.""" serial = self.module.model.serial + if self.module.model.waiting_for_reflection: + logger.debug("Waiting for reflection data") + return + while serial.canReadLine(): text = serial.readLine().data().decode().rstrip("\r\n") - # logger.debug("Received data: %s", text) + logger.debug("Received data: %s", text) - if text.startswith("f") and self.module.view.frequency_sweep_spinner.isVisible(): - self.process_frequency_sweep_data(text) - elif text.startswith("r"): - if self.module.model.active_calibration is None: - self.process_measurement_data() - elif self.module.model.active_calibration in ["short", "open", "load"]: - self.process_calibration_data(self.module.model.active_calibration) - elif text.startswith("i"): - self.module.view.add_info_text("ATM Info: " + text[1:]) - elif text.startswith("e"): - self.module.view.add_info_text("ATM Error: " + text[1:]) - elif text.startswith("v"): - self.process_voltage_sweep_result(text) - elif text.startswith("p"): - # Format is pm - text = text[1:].split("m") - tuning_position, matching_position = map(int, text) - self.module.model.tuning_stepper.position = tuning_position - self.module.model.matching_stepper.position = matching_position - self.module.model.tuning_stepper.homed = True - self.module.model.matching_stepper.homed = True - logger.debug("Tuning position: %s, Matching position: %s", tuning_position, matching_position) - self.module.view.on_active_stepper_changed() + self.module.model.serial_data_received.emit(text) + def process_reflection_data(self, text): + """This method is called when data is received from the serial connection. + It processes the data and adds it to the model. + + Args: + text (str): The data received from the serial connection. + """ + text = text[1:] + return_loss, phase = map(float, text.split("p")) + self.module.model.last_reflection = (return_loss, phase) + + ### Calibration Stuff ### def on_short_calibration( self, start_frequency: float, stop_frequency: float @@ -423,6 +469,8 @@ class AutoTMController(ModuleController): self.module.model.open_calibration = S11Data.from_json(data["open"]) self.module.model.load_calibration = S11Data.from_json(data["load"]) + ### Voltage Control ### + def set_voltages(self, tuning_voltage: str, matching_voltage: str) -> None: """This method is called when the set voltages button is pressed. It writes the specified tuning and matching voltage to the serial connection. @@ -471,7 +519,9 @@ class AutoTMController(ModuleController): # Emit nqrduck signal that T&M was successful self.module.nqrduck_signal.emit("confirm_tune_and_match", None) - def generate_lut( + ### Electrical Lookup Table ### + + def generate_electrical_lut( self, start_frequency: str, stop_frequency: str, @@ -613,6 +663,8 @@ class AutoTMController(ModuleController): logger.error("Could not send command. %s", e) self.module.view.add_error_text("Could not send command. %s" % e) + ### Stepper Motor Control ### + def homing(self) -> None: """This method is used to send the command 'h' to the atm system. This command is used to home the stepper motors of the atm system. @@ -693,6 +745,8 @@ class AutoTMController(ModuleController): confirmation = self.send_stepper_command(actual_steps, stepper) return confirmation + ### Position Saving and Loading ### + def load_positions(self, path : str) -> None: """Load the saved positions from a json file. @@ -751,7 +805,10 @@ class AutoTMController(ModuleController): logger.debug("Deleting position: %s", position) self.module.model.delete_saved_position(position) - def generate_mech_lut(self, start_frequency: str, stop_frequency: str, frequency_step: str) -> None: + + #### Mechanical tuning and matching #### + + def generate_mechanical_lut(self, start_frequency: str, stop_frequency: str, frequency_step: str) -> None: """Generate a lookup table for the specified frequency range and voltage resolution. Args: @@ -759,14 +816,111 @@ class AutoTMController(ModuleController): stop_frequency (str): The stop frequency in Hz. frequency_step (str): The frequency step in Hz. """ - logger.debug("Generating mech LUT") + try: + start_frequency = start_frequency.replace(",", ".") + stop_frequency = stop_frequency.replace(",", ".") + frequency_step = frequency_step.replace(",", ".") + start_frequency = float(start_frequency) + stop_frequency = float(stop_frequency) + frequency_step = float(frequency_step) + except ValueError: + error = "Could not generate LUT. Start frequency, stop frequency, frequency step must be floats" + logger.error(error) + self.module.view.add_info_text(error) + return + + if ( + start_frequency < 0 + or stop_frequency < 0 + or frequency_step < 0 + ): + error = "Could not generate LUT. Start frequency, stop frequency, frequency step must be positive" + logger.error(error) + self.module.view.add_info_text(error) + return + + if start_frequency > stop_frequency: + error = "Could not generate LUT. Start frequency must be smaller than stop frequency" + logger.error(error) + self.module.view.add_info_text(error) + return + + # - 0.1 is to prevent float errors + if frequency_step - 0.1 > (stop_frequency - start_frequency): + error = "Could not generate LUT. Frequency step must be smaller than the frequency range" + logger.error(error) + self.module.view.add_info_text(error) + return + + logger.debug( + "Generating LUT from %s MHz to %s MHz with a frequency step of %s MHz", + start_frequency, + stop_frequency, + frequency_step, + ) # We create the lookup table LUT = MechanicalLookupTable( start_frequency, stop_frequency, frequency_step ) + # Lock GUI + # self.module.view.create_mech_LUT_spinner_dialog() + + self.start_next_mechTM(LUT) + + + def start_next_mechTM(self, LUT): + """Start the next mechanical tuning and matching sweep.""" + + next_frequency = LUT.get_next_frequency() + LUT.started_frequency = next_frequency + logger.debug("Starting next mechanical tuning and matching:") + + # Now we vary the tuning capacitor position and matching capacitor position + # Step size tuner: + TUNER_STEP_SIZE = 10 + + # Step size matcher: + MATCHER_STEP_SIZE = 50 + + # We read the first reflection + reflection = self.read_reflection(next_frequency) + logger.debug("Reflection: %s", reflection) + + def read_reflection(self, frequency): + """Read the reflection at the specified frequency.""" + # We send the command to the atm system + self.module.model.waiting_for_reflection = True + + command = f"r{frequency}" + try: + + confirmation = self.send_command(command) + if confirmation: + if self.module.model.serial.waitForReadyRead(1000): + #if self.module.model.serial.canReadLine(): + text = self.module.model.serial.readLine().data().decode("utf-8") + if text: + logger.debug("Received reflection: %s", text) + #self.module.model.waiting_for_reflection = False + #return text + #else: + time.sleep(0.1) + else: + logger.error("Could not read reflection. No confirmation received") + self.module.view.add_error_text("Could not read reflection. No confirmation received") + self.module.model.waiting_for_reflection = False + return None + except Exception as e: + logger.error("Could not read reflection. %s", e) + self.module.view.add_error_text("Could not read reflection. %s" % e) + self.module.model.waiting_for_reflection = False + return None + + + diff --git a/src/nqrduck_autotm/model.py b/src/nqrduck_autotm/model.py index 10bc75a..37efb78 100644 --- a/src/nqrduck_autotm/model.py +++ b/src/nqrduck_autotm/model.py @@ -177,33 +177,6 @@ class LookupTable: # This is the frequency at which the tuning and matching process was started self.started_frequency = None - - def is_incomplete(self) -> bool: - """This method returns True if the lookup table is incomplete, - i.e. if there are frequencies for which no the tuning or matching voltage is none. - - Returns: - bool: True if the lookup table is incomplete, False otherwise. - """ - return any( - [ - tuning_voltage is None or matching_voltage is None - for tuning_voltage, matching_voltage in self.data.values() - ] - ) - - def get_next_frequency(self) -> float: - """This method returns the next frequency for which the tuning and matching voltage is not yet set. - - Returns: - float: The next frequency for which the tuning and matching voltage is not yet set. - """ - - for frequency, (tuning_voltage, matching_voltage) in self.data.items(): - if tuning_voltage is None or matching_voltage is None: - return frequency - - return None def get_entry_number(self, frequency: float) -> int: """This method returns the entry number of the given frequency. @@ -286,10 +259,98 @@ class ElectricalLookupTable(LookupTable): entry_number = self.get_entry_number(frequency) key = list(self.data.keys())[entry_number] return self.data[key] + + def is_incomplete(self) -> bool: + """This method returns True if the lookup table is incomplete, + i.e. if there are frequencies for which no the tuning or matching voltage is none. + + Returns: + bool: True if the lookup table is incomplete, False otherwise. + """ + return any( + [ + tuning_voltage is None or matching_voltage is None + for tuning_voltage, matching_voltage in self.data.values() + ] + ) + + def get_next_frequency(self) -> float: + """This method returns the next frequency for which the tuning and matching voltage is not yet set. + + Returns: + float: The next frequency for which the tuning and matching voltage is not yet set. + """ + + for frequency, (tuning_voltage, matching_voltage) in self.data.items(): + if tuning_voltage is None or matching_voltage is None: + return frequency + + return None class MechanicalLookupTable(LookupTable): + # Hmm duplicate code TYPE = "Mechanical" - pass + + + def __init__(self, start_frequency: float, stop_frequency: float, frequency_step: float) -> None: + super().__init__(start_frequency, stop_frequency, frequency_step) + self.init_positions() + + def init_positions(self) -> None: + """Initialize the lookup table with default values.""" + for frequency in np.arange( + self.start_frequency, self.stop_frequency, self.frequency_step + ): + self.started_frequency = frequency + self.add_positions(None, None) + + def add_positions(self, tuning_position: int, matching_position: int) -> None: + """Add a tuning and matching position for the last started frequency to the lookup table. + + Args: + tuning_position (int): The tuning position for the given frequency. + matching_position (int): The matching position for the given frequency.""" + self.data[self.started_frequency] = (tuning_position, matching_position) + + def get_positions(self, frequency: float) -> tuple: + """Get the tuning and matching position for the given frequency. + + Args: + frequency (float): The frequency for which the tuning and matching position should be returned. + + Returns: + tuple: The tuning and matching position for the given frequency. + """ + entry_number = self.get_entry_number(frequency) + key = list(self.data.keys())[entry_number] + return self.data[key] + + def is_incomplete(self) -> bool: + """This method returns True if the lookup table is incomplete, + i.e. if there are frequencies for which no the tuning or matching position is none. + + Returns: + bool: True if the lookup table is incomplete, False otherwise. + """ + return any( + [ + tuning_position is None or matching_position is None + for tuning_position, matching_position in self.data.values() + ] + ) + + def get_next_frequency(self) -> float: + """This method returns the next frequency for which the tuning and matching position is not yet set. + + Returns: + float: The next frequency for which the tuning and matching position is not yet set. + """ + + for frequency, (tuning_position, matching_position) in self.data.items(): + if tuning_position is None or matching_position is None: + return frequency + + return None class AutoTMModel(ModuleModel): available_devices_changed = pyqtSignal(list) @@ -297,6 +358,7 @@ class AutoTMModel(ModuleModel): data_points_changed = pyqtSignal(list) active_stepper_changed = pyqtSignal(Stepper) saved_positions_changed = pyqtSignal(list) + serial_data_received = pyqtSignal(str) short_calibration_finished = pyqtSignal(S11Data) open_calibration_finished = pyqtSignal(S11Data) @@ -318,6 +380,7 @@ class AutoTMModel(ModuleModel): self.el_lut = None self.mech_lut = None + self.waiting_for_reflection = False @property def available_devices(self): diff --git a/src/nqrduck_autotm/view.py b/src/nqrduck_autotm/view.py index 2d5ad33..c2932c8 100644 --- a/src/nqrduck_autotm/view.py +++ b/src/nqrduck_autotm/view.py @@ -64,18 +64,18 @@ class AutoTMView(ModuleView): ) ) - # On clicking of the generateLUTButton call the generate_lut method + # On clicking of the generateLUTButton call the generate_mechanical_lut method self._ui_form.generateLUTButton.clicked.connect( - lambda: self.module.controller.generate_lut( + lambda: self.module.controller.generate_electrical_lut( self._ui_form.startfrequencyBox.text(), self._ui_form.stopfrequencyBox.text(), self._ui_form.frequencystepBox.text(), ) ) - # On clicking of the generateLUTButton call the generate_lut method - self._ui_form.generateLUTButton.clicked.connect( - lambda: self.module.controller.generate_mech_lut( + # On clicking of the generateLUTButton call the generate_electrical_lut method + self._ui_form.mechLUTButton.clicked.connect( + lambda: self.module.controller.generate_mechanical_lut( self._ui_form.startfrequencyBox.text(), self._ui_form.stopfrequencyBox.text(), self._ui_form.frequencystepBox.text(), @@ -357,6 +357,11 @@ class AutoTMView(ModuleView): self.el_LUT_spinner = self.LoadingSpinner("Generating electrical LUT ...", self) self.el_LUT_spinner.show() + def create_mech_LUT_spinner_dialog(self) -> None: + """Creates a mechanical LUT spinner dialog.""" + self.mech_LUT_spinner = self.LoadingSpinner("Generating mechanical LUT ...", self) + self.mech_LUT_spinner.show() + def view_el_lut(self) -> None: """Creates a new Dialog that shows the currently active electrical LUT.""" logger.debug("View LUT")