From 0e2905b895c0e180edec293a314d5fe3f8b0d5c7 Mon Sep 17 00:00:00 2001 From: jupfi Date: Thu, 7 Dec 2023 15:34:46 +0100 Subject: [PATCH] Refactored Serial communication. --- src/nqrduck_autotm/controller.py | 209 ++++++++++++++++--------------- 1 file changed, 110 insertions(+), 99 deletions(-) diff --git a/src/nqrduck_autotm/controller.py b/src/nqrduck_autotm/controller.py index 1e558de..2777ad7 100644 --- a/src/nqrduck_autotm/controller.py +++ b/src/nqrduck_autotm/controller.py @@ -130,111 +130,122 @@ class AutoTMController(ModuleController): self.module.model.clear_data_points() self.module.view.create_frequency_sweep_spinner_dialog() + def process_frequency_sweep_data(self, text): + """This method is called when data is received from the serial connection during a frequency sweep. + It processes the data and adds it to the model. + """ + text = text[1:].split("r") + frequency = float(text[0]) + return_loss, phase = map(float, text[1].split("p")) + self.module.model.add_data_point(frequency, return_loss, phase) + + def process_measurement_data(self): + """This method is called when data is received from the serial connection during a measurement. + It processes the data and adds it to the model. + """ + logger.debug("Measurement finished") + self.module.model.measurement = S11Data( + self.module.model.data_points.copy() + ) + self.finish_frequency_sweep() + + def process_calibration_data(self, calibration_type): + """This method is called when data is received from the serial connection during a calibration. + It processes the data and adds it to the model. + + Args: + calibration_type (str): The type of calibration that is being performed. + """ + logger.debug(f"{calibration_type.capitalize()} calibration finished") + setattr(self.module.model, f"{calibration_type}_calibration", + S11Data(self.module.model.data_points.copy())) + self.module.model.active_calibration = None + self.module.view.frequency_sweep_spinner.hide() + + def process_voltage_sweep_result(self, text): + """This method is called when data is received from the serial connection during a voltage sweep. + It processes the data and adds it to the model. + + Args: + text (str): The data received from the serial connection. + """ + text = text[1:].split("t") + matching_voltage, tuning_voltage = map(float, text) + LUT = self.module.model.LUT + logger.debug("Received voltage sweep result: %s %s", matching_voltage, tuning_voltage) + LUT.add_voltages(matching_voltage, tuning_voltage) + self.continue_or_finish_voltage_sweep(LUT) + + def finish_frequency_sweep(self): + """This method is called when a frequency sweep is finished. + It hides the frequency sweep spinner dialog and adds the data to the model. + """ + self.module.view.frequency_sweep_spinner.hide() + self.module.model.frequency_sweep_stop = time.time() + duration = self.module.model.frequency_sweep_stop - self.module.model.frequency_sweep_start + self.module.view.add_info_text(f"Frequency sweep finished in {duration:.2f} seconds") + + def continue_or_finish_voltage_sweep(self, LUT): + """This method is called when a voltage sweep is finished. + It checks if the voltage sweep is finished or if the next voltage sweep should be started. + + Args: + LUT (LookupTable): The lookup table that is being generated. + """ + if LUT.is_incomplete(): + # Start the next voltage sweep + self.start_next_voltage_sweep(LUT) + else: + # Finish voltage sweep + self.finish_voltage_sweep(LUT) + + def start_next_voltage_sweep(self, LUT): + """This method is called when a voltage sweep is finished. + It starts the next voltage sweep. + + Args: + LUT (LookupTable): The lookup table that is being generated. + """ + next_frequency = LUT.get_next_frequency() + command = f"s{next_frequency}" + LUT.started_frequency = next_frequency + logger.debug("Starting next voltage sweep: %s", command) + self.send_command(command) + + def finish_voltage_sweep(self, LUT): + """This method is called when a voltage sweep is finished. + It hides the voltage sweep spinner dialog and adds the data to the model. + + Args: + LUT (LookupTable): The lookup table that is being generated.""" + logger.debug("Voltage sweep finished") + self.module.view.el_LUT_spinner.hide() + self.module.model.voltage_sweep_stop = time.time() + duration = self.module.model.voltage_sweep_stop - self.module.model.voltage_sweep_start + self.module.view.add_info_text(f"Voltage sweep finished in {duration:.2f} seconds") + self.module.nqrduck_signal.emit("LUT_finished", LUT) + def on_ready_read(self) -> None: """This method is called when data is received from the serial connection.""" serial = self.module.model.serial while serial.canReadLine(): - text = serial.readLine().data().decode() - text = text.rstrip("\r\n") + text = serial.readLine().data().decode().rstrip("\r\n") # logger.debug("Received data: %s", text) - # If the text starts with 'f' and the frequency sweep spinner is visible we know that the data is a data point - # then we have the data for the return loss and the phase at a certain frequency - if ( - text.startswith("f") - and self.module.view.frequency_sweep_spinner.isVisible() - ): - text = text[1:].split("r") - frequency = float(text[0]) - return_loss, phase = map(float, text[1].split("p")) - self.module.model.add_data_point(frequency, return_loss, phase) - # If the text starts with 'r' and no calibration is active we know that the data is a measurement - elif text.startswith("r") and self.module.model.active_calibration == None: - logger.debug("Measurement finished") - self.module.model.measurement = S11Data( - self.module.model.data_points.copy() - ) - self.module.view.frequency_sweep_spinner.hide() - self.module.model.frequency_sweep_stop = time.time() - self.module.view.add_info_text( - "Frequency sweep finished in %.2f seconds" - % ( - self.module.model.frequency_sweep_stop - - self.module.model.frequency_sweep_start - ) - ) - # If the text starts with 'r' and a short calibration is active we know that the data is a short calibration - elif ( - text.startswith("r") and self.module.model.active_calibration == "short" - ): - logger.debug("Short calibration finished") - self.module.model.short_calibration = S11Data( - self.module.model.data_points.copy() - ) - self.module.model.active_calibration = None - self.module.view.frequency_sweep_spinner.hide() - # If the text starts with 'r' and an open calibration is active we know that the data is an open calibration - elif ( - text.startswith("r") and self.module.model.active_calibration == "open" - ): - logger.debug("Open calibration finished") - self.module.model.open_calibration = S11Data( - self.module.model.data_points.copy() - ) - self.module.model.active_calibration = None - self.module.view.frequency_sweep_spinner.hide() - # If the text starts with 'r' and a load calibration is active we know that the data is a load calibration - elif ( - text.startswith("r") and self.module.model.active_calibration == "load" - ): - logger.debug("Load calibration finished") - self.module.model.load_calibration = S11Data( - self.module.model.data_points.copy() - ) - self.module.model.active_calibration = None - self.module.view.frequency_sweep_spinner.hide() - # If the text starts with 'i' we know that the data is an info message - elif text.startswith("i"): - text = "ATM Info: " + text[1:] - self.module.view.add_info_text(text) - # If the text starts with 'e' we know that the data is an error message - elif text.startswith("e"): - text = "ATM Error: " + text[1:] - self.module.view.add_info_text(text) - # If the text starts with 'v' we know that the data is a voltage sweep result - elif text.startswith("v"): - text = text[1:] - text = text.split("t") - matching_voltage = float(text[0]) - tuning_voltage = float(text[1]) - # Now we add the datapoint to the current LUT - LUT = self.module.model.LUT - logger.debug( - "Received voltage sweep result: %s %s", - matching_voltage, - tuning_voltage, - ) - LUT.add_voltages(matching_voltage, tuning_voltage) - # Start the next voltage sweep if there are more voltages to sweep - if LUT.is_incomplete(): - next_frequency = LUT.get_next_frequency() - command = "s%s" % next_frequency - LUT.started_frequency = next_frequency - logger.debug("Starting next voltage sweep: %s", command) - self.send_command(command) - else: - logger.debug("Voltage sweep finished") - self.module.view.el_LUT_spinner.hide() - self.module.model.voltage_sweep_stop = time.time() - self.module.view.add_info_text( - "Voltage sweep finished in %.2f seconds" - % ( - self.module.model.voltage_sweep_stop - - self.module.model.voltage_sweep_start - ) - ) - # Now we emit the signal that the LUT is finished with the LUT as an argument - self.module.nqrduck_signal.emit("LUT_finished", LUT) + if text.startswith("f") and self.module.view.frequency_sweep_spinner.isVisible(): + self.process_frequency_sweep_data(text) + elif text.startswith("r"): + if self.module.model.active_calibration is None: + self.process_measurement_data() + elif self.module.model.active_calibration in ["short", "open", "load"]: + self.process_calibration_data(self.module.model.active_calibration) + elif text.startswith("i"): + self.module.view.add_info_text("ATM Info: " + text[1:]) + elif text.startswith("e"): + self.module.view.add_info_text("ATM Error: " + text[1:]) + elif text.startswith("v"): + self.process_voltage_sweep_result(text) + def on_short_calibration( self, start_frequency: float, stop_frequency: float