Changed communication for serial connections to signal and slots.

This commit is contained in:
jupfi 2023-12-11 08:03:02 +01:00
parent 850cf9e034
commit 039e9fe2d9
3 changed files with 306 additions and 84 deletions

View file

@ -16,6 +16,20 @@ logger = logging.getLogger(__name__)
class AutoTMController(ModuleController): class AutoTMController(ModuleController):
BAUDRATE = 115200 BAUDRATE = 115200
def on_loading(self):
"""This method is called when the module is loaded.
It sets up the serial connection and connects the signals and slots.
"""
logger.debug("Setting up serial connection")
self.find_devices()
# Connect signals
self.module.model.serial_data_received.connect(self.process_frequency_sweep_data)
self.module.model.serial_data_received.connect(self.process_measurement_data)
self.module.model.serial_data_received.connect(self.process_calibration_data)
self.module.model.serial_data_received.connect(self.print_info)
self.module.model.serial_data_received.connect(self.read_position_data)
@pyqtSlot(str, object) @pyqtSlot(str, object)
def process_signals(self, key: str, value: object) -> None: def process_signals(self, key: str, value: object) -> None:
logger.debug("Received signal: %s", key) logger.debug("Received signal: %s", key)
@ -149,45 +163,54 @@ class AutoTMController(ModuleController):
self.module.model.clear_data_points() self.module.model.clear_data_points()
self.module.view.create_frequency_sweep_spinner_dialog() self.module.view.create_frequency_sweep_spinner_dialog()
def process_frequency_sweep_data(self, text): @pyqtSlot(str)
def process_frequency_sweep_data(self, text : str) -> None:
"""This method is called when data is received from the serial connection during a frequency sweep. """This method is called when data is received from the serial connection during a frequency sweep.
It processes the data and adds it to the model. It processes the data and adds it to the model.
""" """
if text.startswith("f") and self.module.view.frequency_sweep_spinner.isVisible():
text = text[1:].split("r") text = text[1:].split("r")
frequency = float(text[0]) frequency = float(text[0])
return_loss, phase = map(float, text[1].split("p")) return_loss, phase = map(float, text[1].split("p"))
self.module.model.add_data_point(frequency, return_loss, phase) self.module.model.add_data_point(frequency, return_loss, phase)
def process_measurement_data(self): @pyqtSlot(str)
def process_measurement_data(self, text : str) -> None:
"""This method is called when data is received from the serial connection during a measurement. """This method is called when data is received from the serial connection during a measurement.
It processes the data and adds it to the model. It processes the data and adds it to the model.
""" """
if self.module.model.active_calibration is None and text.startswith("r"):
logger.debug("Measurement finished") logger.debug("Measurement finished")
self.module.model.measurement = S11Data( self.module.model.measurement = S11Data(
self.module.model.data_points.copy() self.module.model.data_points.copy()
) )
self.finish_frequency_sweep() self.finish_frequency_sweep()
def process_calibration_data(self, calibration_type): @pyqtSlot(str)
def process_calibration_data(self, text : str) -> None:
"""This method is called when data is received from the serial connection during a calibration. """This method is called when data is received from the serial connection during a calibration.
It processes the data and adds it to the model. It processes the data and adds it to the model.
Args: Args:
calibration_type (str): The type of calibration that is being performed. calibration_type (str): The type of calibration that is being performed.
""" """
if text.startswith("r") and self.module.model.active_calibration in ["short", "open", "load"]:
calibration_type = self.module.model.active_calibration
logger.debug(f"{calibration_type.capitalize()} calibration finished") logger.debug(f"{calibration_type.capitalize()} calibration finished")
setattr(self.module.model, f"{calibration_type}_calibration", setattr(self.module.model, f"{calibration_type}_calibration",
S11Data(self.module.model.data_points.copy())) S11Data(self.module.model.data_points.copy()))
self.module.model.active_calibration = None self.module.model.active_calibration = None
self.module.view.frequency_sweep_spinner.hide() self.module.view.frequency_sweep_spinner.hide()
def process_voltage_sweep_result(self, text): @pyqtSlot(str)
def process_voltage_sweep_result(self, text : str) -> None:
"""This method is called when data is received from the serial connection during a voltage sweep. """This method is called when data is received from the serial connection during a voltage sweep.
It processes the data and adds it to the model. It processes the data and adds it to the model.
Args: Args:
text (str): The data received from the serial connection. text (str): The data received from the serial connection.
""" """
if text.startswith("v"):
text = text[1:].split("t") text = text[1:].split("t")
matching_voltage, tuning_voltage = map(float, text) matching_voltage, tuning_voltage = map(float, text)
LUT = self.module.model.LUT LUT = self.module.model.LUT
@ -244,27 +267,25 @@ class AutoTMController(ModuleController):
self.module.view.add_info_text(f"Voltage sweep finished in {duration:.2f} seconds") self.module.view.add_info_text(f"Voltage sweep finished in {duration:.2f} seconds")
self.module.nqrduck_signal.emit("LUT_finished", LUT) self.module.nqrduck_signal.emit("LUT_finished", LUT)
def on_ready_read(self) -> None: @pyqtSlot(str)
"""This method is called when data is received from the serial connection.""" def print_info(self, text : str) -> None:
serial = self.module.model.serial """This method is called when data is received from the serial connection.
while serial.canReadLine(): It prints the data to the info text box.
text = serial.readLine().data().decode().rstrip("\r\n")
# logger.debug("Received data: %s", text)
if text.startswith("f") and self.module.view.frequency_sweep_spinner.isVisible(): Args:
self.process_frequency_sweep_data(text) text (str): The data received from the serial connection.
elif text.startswith("r"): """
if self.module.model.active_calibration is None: if text.startswith("i"):
self.process_measurement_data() text = text[1:]
elif self.module.model.active_calibration in ["short", "open", "load"]: self.module.view.add_info_text(text)
self.process_calibration_data(self.module.model.active_calibration)
elif text.startswith("i"):
self.module.view.add_info_text("ATM Info: " + text[1:])
elif text.startswith("e"): elif text.startswith("e"):
self.module.view.add_info_text("ATM Error: " + text[1:]) text = text[1:]
elif text.startswith("v"): self.module.view.add_error_text(text)
self.process_voltage_sweep_result(text)
elif text.startswith("p"): @pyqtSlot(str)
def read_position_data(self, text : str) -> None:
"""This method is called when data is received from the serial connection."""
if text.startswith("p"):
# Format is p<tuning_position>m<matching_position> # Format is p<tuning_position>m<matching_position>
text = text[1:].split("m") text = text[1:].split("m")
tuning_position, matching_position = map(int, text) tuning_position, matching_position = map(int, text)
@ -275,6 +296,31 @@ class AutoTMController(ModuleController):
logger.debug("Tuning position: %s, Matching position: %s", tuning_position, matching_position) logger.debug("Tuning position: %s, Matching position: %s", tuning_position, matching_position)
self.module.view.on_active_stepper_changed() self.module.view.on_active_stepper_changed()
def on_ready_read(self) -> None:
"""This method is called when data is received from the serial connection."""
serial = self.module.model.serial
if self.module.model.waiting_for_reflection:
logger.debug("Waiting for reflection data")
return
while serial.canReadLine():
text = serial.readLine().data().decode().rstrip("\r\n")
logger.debug("Received data: %s", text)
self.module.model.serial_data_received.emit(text)
def process_reflection_data(self, text):
"""This method is called when data is received from the serial connection.
It processes the data and adds it to the model.
Args:
text (str): The data received from the serial connection.
"""
text = text[1:]
return_loss, phase = map(float, text.split("p"))
self.module.model.last_reflection = (return_loss, phase)
### Calibration Stuff ###
def on_short_calibration( def on_short_calibration(
self, start_frequency: float, stop_frequency: float self, start_frequency: float, stop_frequency: float
@ -423,6 +469,8 @@ class AutoTMController(ModuleController):
self.module.model.open_calibration = S11Data.from_json(data["open"]) self.module.model.open_calibration = S11Data.from_json(data["open"])
self.module.model.load_calibration = S11Data.from_json(data["load"]) self.module.model.load_calibration = S11Data.from_json(data["load"])
### Voltage Control ###
def set_voltages(self, tuning_voltage: str, matching_voltage: str) -> None: def set_voltages(self, tuning_voltage: str, matching_voltage: str) -> None:
"""This method is called when the set voltages button is pressed. """This method is called when the set voltages button is pressed.
It writes the specified tuning and matching voltage to the serial connection. It writes the specified tuning and matching voltage to the serial connection.
@ -471,7 +519,9 @@ class AutoTMController(ModuleController):
# Emit nqrduck signal that T&M was successful # Emit nqrduck signal that T&M was successful
self.module.nqrduck_signal.emit("confirm_tune_and_match", None) self.module.nqrduck_signal.emit("confirm_tune_and_match", None)
def generate_lut( ### Electrical Lookup Table ###
def generate_electrical_lut(
self, self,
start_frequency: str, start_frequency: str,
stop_frequency: str, stop_frequency: str,
@ -613,6 +663,8 @@ class AutoTMController(ModuleController):
logger.error("Could not send command. %s", e) logger.error("Could not send command. %s", e)
self.module.view.add_error_text("Could not send command. %s" % e) self.module.view.add_error_text("Could not send command. %s" % e)
### Stepper Motor Control ###
def homing(self) -> None: def homing(self) -> None:
"""This method is used to send the command 'h' to the atm system. """This method is used to send the command 'h' to the atm system.
This command is used to home the stepper motors of the atm system. This command is used to home the stepper motors of the atm system.
@ -693,6 +745,8 @@ class AutoTMController(ModuleController):
confirmation = self.send_stepper_command(actual_steps, stepper) confirmation = self.send_stepper_command(actual_steps, stepper)
return confirmation return confirmation
### Position Saving and Loading ###
def load_positions(self, path : str) -> None: def load_positions(self, path : str) -> None:
"""Load the saved positions from a json file. """Load the saved positions from a json file.
@ -751,7 +805,10 @@ class AutoTMController(ModuleController):
logger.debug("Deleting position: %s", position) logger.debug("Deleting position: %s", position)
self.module.model.delete_saved_position(position) self.module.model.delete_saved_position(position)
def generate_mech_lut(self, start_frequency: str, stop_frequency: str, frequency_step: str) -> None:
#### Mechanical tuning and matching ####
def generate_mechanical_lut(self, start_frequency: str, stop_frequency: str, frequency_step: str) -> None:
"""Generate a lookup table for the specified frequency range and voltage resolution. """Generate a lookup table for the specified frequency range and voltage resolution.
Args: Args:
@ -759,13 +816,110 @@ class AutoTMController(ModuleController):
stop_frequency (str): The stop frequency in Hz. stop_frequency (str): The stop frequency in Hz.
frequency_step (str): The frequency step in Hz. frequency_step (str): The frequency step in Hz.
""" """
logger.debug("Generating mech LUT") try:
start_frequency = start_frequency.replace(",", ".")
stop_frequency = stop_frequency.replace(",", ".")
frequency_step = frequency_step.replace(",", ".")
start_frequency = float(start_frequency)
stop_frequency = float(stop_frequency)
frequency_step = float(frequency_step)
except ValueError:
error = "Could not generate LUT. Start frequency, stop frequency, frequency step must be floats"
logger.error(error)
self.module.view.add_info_text(error)
return
if (
start_frequency < 0
or stop_frequency < 0
or frequency_step < 0
):
error = "Could not generate LUT. Start frequency, stop frequency, frequency step must be positive"
logger.error(error)
self.module.view.add_info_text(error)
return
if start_frequency > stop_frequency:
error = "Could not generate LUT. Start frequency must be smaller than stop frequency"
logger.error(error)
self.module.view.add_info_text(error)
return
# - 0.1 is to prevent float errors
if frequency_step - 0.1 > (stop_frequency - start_frequency):
error = "Could not generate LUT. Frequency step must be smaller than the frequency range"
logger.error(error)
self.module.view.add_info_text(error)
return
logger.debug(
"Generating LUT from %s MHz to %s MHz with a frequency step of %s MHz",
start_frequency,
stop_frequency,
frequency_step,
)
# We create the lookup table # We create the lookup table
LUT = MechanicalLookupTable( LUT = MechanicalLookupTable(
start_frequency, stop_frequency, frequency_step start_frequency, stop_frequency, frequency_step
) )
# Lock GUI
# self.module.view.create_mech_LUT_spinner_dialog()
self.start_next_mechTM(LUT)
def start_next_mechTM(self, LUT):
"""Start the next mechanical tuning and matching sweep."""
next_frequency = LUT.get_next_frequency()
LUT.started_frequency = next_frequency
logger.debug("Starting next mechanical tuning and matching:")
# Now we vary the tuning capacitor position and matching capacitor position
# Step size tuner:
TUNER_STEP_SIZE = 10
# Step size matcher:
MATCHER_STEP_SIZE = 50
# We read the first reflection
reflection = self.read_reflection(next_frequency)
logger.debug("Reflection: %s", reflection)
def read_reflection(self, frequency):
"""Read the reflection at the specified frequency."""
# We send the command to the atm system
self.module.model.waiting_for_reflection = True
command = f"r{frequency}"
try:
confirmation = self.send_command(command)
if confirmation:
if self.module.model.serial.waitForReadyRead(1000):
#if self.module.model.serial.canReadLine():
text = self.module.model.serial.readLine().data().decode("utf-8")
if text:
logger.debug("Received reflection: %s", text)
#self.module.model.waiting_for_reflection = False
#return text
#else:
time.sleep(0.1)
else:
logger.error("Could not read reflection. No confirmation received")
self.module.view.add_error_text("Could not read reflection. No confirmation received")
self.module.model.waiting_for_reflection = False
return None
except Exception as e:
logger.error("Could not read reflection. %s", e)
self.module.view.add_error_text("Could not read reflection. %s" % e)
self.module.model.waiting_for_reflection = False
return None

View file

@ -178,33 +178,6 @@ class LookupTable:
# This is the frequency at which the tuning and matching process was started # This is the frequency at which the tuning and matching process was started
self.started_frequency = None self.started_frequency = None
def is_incomplete(self) -> bool:
"""This method returns True if the lookup table is incomplete,
i.e. if there are frequencies for which no the tuning or matching voltage is none.
Returns:
bool: True if the lookup table is incomplete, False otherwise.
"""
return any(
[
tuning_voltage is None or matching_voltage is None
for tuning_voltage, matching_voltage in self.data.values()
]
)
def get_next_frequency(self) -> float:
"""This method returns the next frequency for which the tuning and matching voltage is not yet set.
Returns:
float: The next frequency for which the tuning and matching voltage is not yet set.
"""
for frequency, (tuning_voltage, matching_voltage) in self.data.items():
if tuning_voltage is None or matching_voltage is None:
return frequency
return None
def get_entry_number(self, frequency: float) -> int: def get_entry_number(self, frequency: float) -> int:
"""This method returns the entry number of the given frequency. """This method returns the entry number of the given frequency.
@ -287,9 +260,97 @@ class ElectricalLookupTable(LookupTable):
key = list(self.data.keys())[entry_number] key = list(self.data.keys())[entry_number]
return self.data[key] return self.data[key]
def is_incomplete(self) -> bool:
"""This method returns True if the lookup table is incomplete,
i.e. if there are frequencies for which no the tuning or matching voltage is none.
Returns:
bool: True if the lookup table is incomplete, False otherwise.
"""
return any(
[
tuning_voltage is None or matching_voltage is None
for tuning_voltage, matching_voltage in self.data.values()
]
)
def get_next_frequency(self) -> float:
"""This method returns the next frequency for which the tuning and matching voltage is not yet set.
Returns:
float: The next frequency for which the tuning and matching voltage is not yet set.
"""
for frequency, (tuning_voltage, matching_voltage) in self.data.items():
if tuning_voltage is None or matching_voltage is None:
return frequency
return None
class MechanicalLookupTable(LookupTable): class MechanicalLookupTable(LookupTable):
# Hmm duplicate code
TYPE = "Mechanical" TYPE = "Mechanical"
pass
def __init__(self, start_frequency: float, stop_frequency: float, frequency_step: float) -> None:
super().__init__(start_frequency, stop_frequency, frequency_step)
self.init_positions()
def init_positions(self) -> None:
"""Initialize the lookup table with default values."""
for frequency in np.arange(
self.start_frequency, self.stop_frequency, self.frequency_step
):
self.started_frequency = frequency
self.add_positions(None, None)
def add_positions(self, tuning_position: int, matching_position: int) -> None:
"""Add a tuning and matching position for the last started frequency to the lookup table.
Args:
tuning_position (int): The tuning position for the given frequency.
matching_position (int): The matching position for the given frequency."""
self.data[self.started_frequency] = (tuning_position, matching_position)
def get_positions(self, frequency: float) -> tuple:
"""Get the tuning and matching position for the given frequency.
Args:
frequency (float): The frequency for which the tuning and matching position should be returned.
Returns:
tuple: The tuning and matching position for the given frequency.
"""
entry_number = self.get_entry_number(frequency)
key = list(self.data.keys())[entry_number]
return self.data[key]
def is_incomplete(self) -> bool:
"""This method returns True if the lookup table is incomplete,
i.e. if there are frequencies for which no the tuning or matching position is none.
Returns:
bool: True if the lookup table is incomplete, False otherwise.
"""
return any(
[
tuning_position is None or matching_position is None
for tuning_position, matching_position in self.data.values()
]
)
def get_next_frequency(self) -> float:
"""This method returns the next frequency for which the tuning and matching position is not yet set.
Returns:
float: The next frequency for which the tuning and matching position is not yet set.
"""
for frequency, (tuning_position, matching_position) in self.data.items():
if tuning_position is None or matching_position is None:
return frequency
return None
class AutoTMModel(ModuleModel): class AutoTMModel(ModuleModel):
available_devices_changed = pyqtSignal(list) available_devices_changed = pyqtSignal(list)
@ -297,6 +358,7 @@ class AutoTMModel(ModuleModel):
data_points_changed = pyqtSignal(list) data_points_changed = pyqtSignal(list)
active_stepper_changed = pyqtSignal(Stepper) active_stepper_changed = pyqtSignal(Stepper)
saved_positions_changed = pyqtSignal(list) saved_positions_changed = pyqtSignal(list)
serial_data_received = pyqtSignal(str)
short_calibration_finished = pyqtSignal(S11Data) short_calibration_finished = pyqtSignal(S11Data)
open_calibration_finished = pyqtSignal(S11Data) open_calibration_finished = pyqtSignal(S11Data)
@ -318,6 +380,7 @@ class AutoTMModel(ModuleModel):
self.el_lut = None self.el_lut = None
self.mech_lut = None self.mech_lut = None
self.waiting_for_reflection = False
@property @property
def available_devices(self): def available_devices(self):

View file

@ -64,18 +64,18 @@ class AutoTMView(ModuleView):
) )
) )
# On clicking of the generateLUTButton call the generate_lut method # On clicking of the generateLUTButton call the generate_mechanical_lut method
self._ui_form.generateLUTButton.clicked.connect( self._ui_form.generateLUTButton.clicked.connect(
lambda: self.module.controller.generate_lut( lambda: self.module.controller.generate_electrical_lut(
self._ui_form.startfrequencyBox.text(), self._ui_form.startfrequencyBox.text(),
self._ui_form.stopfrequencyBox.text(), self._ui_form.stopfrequencyBox.text(),
self._ui_form.frequencystepBox.text(), self._ui_form.frequencystepBox.text(),
) )
) )
# On clicking of the generateLUTButton call the generate_lut method # On clicking of the generateLUTButton call the generate_electrical_lut method
self._ui_form.generateLUTButton.clicked.connect( self._ui_form.mechLUTButton.clicked.connect(
lambda: self.module.controller.generate_mech_lut( lambda: self.module.controller.generate_mechanical_lut(
self._ui_form.startfrequencyBox.text(), self._ui_form.startfrequencyBox.text(),
self._ui_form.stopfrequencyBox.text(), self._ui_form.stopfrequencyBox.text(),
self._ui_form.frequencystepBox.text(), self._ui_form.frequencystepBox.text(),
@ -357,6 +357,11 @@ class AutoTMView(ModuleView):
self.el_LUT_spinner = self.LoadingSpinner("Generating electrical LUT ...", self) self.el_LUT_spinner = self.LoadingSpinner("Generating electrical LUT ...", self)
self.el_LUT_spinner.show() self.el_LUT_spinner.show()
def create_mech_LUT_spinner_dialog(self) -> None:
"""Creates a mechanical LUT spinner dialog."""
self.mech_LUT_spinner = self.LoadingSpinner("Generating mechanical LUT ...", self)
self.mech_LUT_spinner.show()
def view_el_lut(self) -> None: def view_el_lut(self) -> None:
"""Creates a new Dialog that shows the currently active electrical LUT.""" """Creates a new Dialog that shows the currently active electrical LUT."""
logger.debug("View LUT") logger.debug("View LUT")