mirror of
https://github.com/nqrduck/nqrduck-autotm.git
synced 2024-11-09 11:40:02 +00:00
Refactored Serial communication.
This commit is contained in:
parent
44cce2c5da
commit
0e2905b895
1 changed files with 110 additions and 99 deletions
|
@ -130,111 +130,122 @@ class AutoTMController(ModuleController):
|
||||||
self.module.model.clear_data_points()
|
self.module.model.clear_data_points()
|
||||||
self.module.view.create_frequency_sweep_spinner_dialog()
|
self.module.view.create_frequency_sweep_spinner_dialog()
|
||||||
|
|
||||||
|
def process_frequency_sweep_data(self, text):
|
||||||
|
"""This method is called when data is received from the serial connection during a frequency sweep.
|
||||||
|
It processes the data and adds it to the model.
|
||||||
|
"""
|
||||||
|
text = text[1:].split("r")
|
||||||
|
frequency = float(text[0])
|
||||||
|
return_loss, phase = map(float, text[1].split("p"))
|
||||||
|
self.module.model.add_data_point(frequency, return_loss, phase)
|
||||||
|
|
||||||
|
def process_measurement_data(self):
|
||||||
|
"""This method is called when data is received from the serial connection during a measurement.
|
||||||
|
It processes the data and adds it to the model.
|
||||||
|
"""
|
||||||
|
logger.debug("Measurement finished")
|
||||||
|
self.module.model.measurement = S11Data(
|
||||||
|
self.module.model.data_points.copy()
|
||||||
|
)
|
||||||
|
self.finish_frequency_sweep()
|
||||||
|
|
||||||
|
def process_calibration_data(self, calibration_type):
|
||||||
|
"""This method is called when data is received from the serial connection during a calibration.
|
||||||
|
It processes the data and adds it to the model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
calibration_type (str): The type of calibration that is being performed.
|
||||||
|
"""
|
||||||
|
logger.debug(f"{calibration_type.capitalize()} calibration finished")
|
||||||
|
setattr(self.module.model, f"{calibration_type}_calibration",
|
||||||
|
S11Data(self.module.model.data_points.copy()))
|
||||||
|
self.module.model.active_calibration = None
|
||||||
|
self.module.view.frequency_sweep_spinner.hide()
|
||||||
|
|
||||||
|
def process_voltage_sweep_result(self, text):
|
||||||
|
"""This method is called when data is received from the serial connection during a voltage sweep.
|
||||||
|
It processes the data and adds it to the model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): The data received from the serial connection.
|
||||||
|
"""
|
||||||
|
text = text[1:].split("t")
|
||||||
|
matching_voltage, tuning_voltage = map(float, text)
|
||||||
|
LUT = self.module.model.LUT
|
||||||
|
logger.debug("Received voltage sweep result: %s %s", matching_voltage, tuning_voltage)
|
||||||
|
LUT.add_voltages(matching_voltage, tuning_voltage)
|
||||||
|
self.continue_or_finish_voltage_sweep(LUT)
|
||||||
|
|
||||||
|
def finish_frequency_sweep(self):
|
||||||
|
"""This method is called when a frequency sweep is finished.
|
||||||
|
It hides the frequency sweep spinner dialog and adds the data to the model.
|
||||||
|
"""
|
||||||
|
self.module.view.frequency_sweep_spinner.hide()
|
||||||
|
self.module.model.frequency_sweep_stop = time.time()
|
||||||
|
duration = self.module.model.frequency_sweep_stop - self.module.model.frequency_sweep_start
|
||||||
|
self.module.view.add_info_text(f"Frequency sweep finished in {duration:.2f} seconds")
|
||||||
|
|
||||||
|
def continue_or_finish_voltage_sweep(self, LUT):
|
||||||
|
"""This method is called when a voltage sweep is finished.
|
||||||
|
It checks if the voltage sweep is finished or if the next voltage sweep should be started.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
LUT (LookupTable): The lookup table that is being generated.
|
||||||
|
"""
|
||||||
|
if LUT.is_incomplete():
|
||||||
|
# Start the next voltage sweep
|
||||||
|
self.start_next_voltage_sweep(LUT)
|
||||||
|
else:
|
||||||
|
# Finish voltage sweep
|
||||||
|
self.finish_voltage_sweep(LUT)
|
||||||
|
|
||||||
|
def start_next_voltage_sweep(self, LUT):
|
||||||
|
"""This method is called when a voltage sweep is finished.
|
||||||
|
It starts the next voltage sweep.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
LUT (LookupTable): The lookup table that is being generated.
|
||||||
|
"""
|
||||||
|
next_frequency = LUT.get_next_frequency()
|
||||||
|
command = f"s{next_frequency}"
|
||||||
|
LUT.started_frequency = next_frequency
|
||||||
|
logger.debug("Starting next voltage sweep: %s", command)
|
||||||
|
self.send_command(command)
|
||||||
|
|
||||||
|
def finish_voltage_sweep(self, LUT):
|
||||||
|
"""This method is called when a voltage sweep is finished.
|
||||||
|
It hides the voltage sweep spinner dialog and adds the data to the model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
LUT (LookupTable): The lookup table that is being generated."""
|
||||||
|
logger.debug("Voltage sweep finished")
|
||||||
|
self.module.view.el_LUT_spinner.hide()
|
||||||
|
self.module.model.voltage_sweep_stop = time.time()
|
||||||
|
duration = self.module.model.voltage_sweep_stop - self.module.model.voltage_sweep_start
|
||||||
|
self.module.view.add_info_text(f"Voltage sweep finished in {duration:.2f} seconds")
|
||||||
|
self.module.nqrduck_signal.emit("LUT_finished", LUT)
|
||||||
|
|
||||||
def on_ready_read(self) -> None:
|
def on_ready_read(self) -> None:
|
||||||
"""This method is called when data is received from the serial connection."""
|
"""This method is called when data is received from the serial connection."""
|
||||||
serial = self.module.model.serial
|
serial = self.module.model.serial
|
||||||
while serial.canReadLine():
|
while serial.canReadLine():
|
||||||
text = serial.readLine().data().decode()
|
text = serial.readLine().data().decode().rstrip("\r\n")
|
||||||
text = text.rstrip("\r\n")
|
|
||||||
# logger.debug("Received data: %s", text)
|
# logger.debug("Received data: %s", text)
|
||||||
# If the text starts with 'f' and the frequency sweep spinner is visible we know that the data is a data point
|
|
||||||
# then we have the data for the return loss and the phase at a certain frequency
|
|
||||||
if (
|
|
||||||
text.startswith("f")
|
|
||||||
and self.module.view.frequency_sweep_spinner.isVisible()
|
|
||||||
):
|
|
||||||
text = text[1:].split("r")
|
|
||||||
frequency = float(text[0])
|
|
||||||
return_loss, phase = map(float, text[1].split("p"))
|
|
||||||
self.module.model.add_data_point(frequency, return_loss, phase)
|
|
||||||
# If the text starts with 'r' and no calibration is active we know that the data is a measurement
|
|
||||||
elif text.startswith("r") and self.module.model.active_calibration == None:
|
|
||||||
logger.debug("Measurement finished")
|
|
||||||
self.module.model.measurement = S11Data(
|
|
||||||
self.module.model.data_points.copy()
|
|
||||||
)
|
|
||||||
self.module.view.frequency_sweep_spinner.hide()
|
|
||||||
self.module.model.frequency_sweep_stop = time.time()
|
|
||||||
self.module.view.add_info_text(
|
|
||||||
"Frequency sweep finished in %.2f seconds"
|
|
||||||
% (
|
|
||||||
self.module.model.frequency_sweep_stop
|
|
||||||
- self.module.model.frequency_sweep_start
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# If the text starts with 'r' and a short calibration is active we know that the data is a short calibration
|
|
||||||
elif (
|
|
||||||
text.startswith("r") and self.module.model.active_calibration == "short"
|
|
||||||
):
|
|
||||||
logger.debug("Short calibration finished")
|
|
||||||
self.module.model.short_calibration = S11Data(
|
|
||||||
self.module.model.data_points.copy()
|
|
||||||
)
|
|
||||||
self.module.model.active_calibration = None
|
|
||||||
self.module.view.frequency_sweep_spinner.hide()
|
|
||||||
# If the text starts with 'r' and an open calibration is active we know that the data is an open calibration
|
|
||||||
elif (
|
|
||||||
text.startswith("r") and self.module.model.active_calibration == "open"
|
|
||||||
):
|
|
||||||
logger.debug("Open calibration finished")
|
|
||||||
self.module.model.open_calibration = S11Data(
|
|
||||||
self.module.model.data_points.copy()
|
|
||||||
)
|
|
||||||
self.module.model.active_calibration = None
|
|
||||||
self.module.view.frequency_sweep_spinner.hide()
|
|
||||||
# If the text starts with 'r' and a load calibration is active we know that the data is a load calibration
|
|
||||||
elif (
|
|
||||||
text.startswith("r") and self.module.model.active_calibration == "load"
|
|
||||||
):
|
|
||||||
logger.debug("Load calibration finished")
|
|
||||||
self.module.model.load_calibration = S11Data(
|
|
||||||
self.module.model.data_points.copy()
|
|
||||||
)
|
|
||||||
self.module.model.active_calibration = None
|
|
||||||
self.module.view.frequency_sweep_spinner.hide()
|
|
||||||
# If the text starts with 'i' we know that the data is an info message
|
|
||||||
elif text.startswith("i"):
|
|
||||||
text = "ATM Info: " + text[1:]
|
|
||||||
self.module.view.add_info_text(text)
|
|
||||||
# If the text starts with 'e' we know that the data is an error message
|
|
||||||
elif text.startswith("e"):
|
|
||||||
text = "ATM Error: " + text[1:]
|
|
||||||
self.module.view.add_info_text(text)
|
|
||||||
# If the text starts with 'v' we know that the data is a voltage sweep result
|
|
||||||
elif text.startswith("v"):
|
|
||||||
text = text[1:]
|
|
||||||
text = text.split("t")
|
|
||||||
matching_voltage = float(text[0])
|
|
||||||
tuning_voltage = float(text[1])
|
|
||||||
# Now we add the datapoint to the current LUT
|
|
||||||
LUT = self.module.model.LUT
|
|
||||||
logger.debug(
|
|
||||||
"Received voltage sweep result: %s %s",
|
|
||||||
matching_voltage,
|
|
||||||
tuning_voltage,
|
|
||||||
)
|
|
||||||
LUT.add_voltages(matching_voltage, tuning_voltage)
|
|
||||||
|
|
||||||
# Start the next voltage sweep if there are more voltages to sweep
|
if text.startswith("f") and self.module.view.frequency_sweep_spinner.isVisible():
|
||||||
if LUT.is_incomplete():
|
self.process_frequency_sweep_data(text)
|
||||||
next_frequency = LUT.get_next_frequency()
|
elif text.startswith("r"):
|
||||||
command = "s%s" % next_frequency
|
if self.module.model.active_calibration is None:
|
||||||
LUT.started_frequency = next_frequency
|
self.process_measurement_data()
|
||||||
logger.debug("Starting next voltage sweep: %s", command)
|
elif self.module.model.active_calibration in ["short", "open", "load"]:
|
||||||
self.send_command(command)
|
self.process_calibration_data(self.module.model.active_calibration)
|
||||||
else:
|
elif text.startswith("i"):
|
||||||
logger.debug("Voltage sweep finished")
|
self.module.view.add_info_text("ATM Info: " + text[1:])
|
||||||
self.module.view.el_LUT_spinner.hide()
|
elif text.startswith("e"):
|
||||||
self.module.model.voltage_sweep_stop = time.time()
|
self.module.view.add_info_text("ATM Error: " + text[1:])
|
||||||
self.module.view.add_info_text(
|
elif text.startswith("v"):
|
||||||
"Voltage sweep finished in %.2f seconds"
|
self.process_voltage_sweep_result(text)
|
||||||
% (
|
|
||||||
self.module.model.voltage_sweep_stop
|
|
||||||
- self.module.model.voltage_sweep_start
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# Now we emit the signal that the LUT is finished with the LUT as an argument
|
|
||||||
self.module.nqrduck_signal.emit("LUT_finished", LUT)
|
|
||||||
|
|
||||||
def on_short_calibration(
|
def on_short_calibration(
|
||||||
self, start_frequency: float, stop_frequency: float
|
self, start_frequency: float, stop_frequency: float
|
||||||
|
|
Loading…
Reference in a new issue