diff --git a/src/nqrduck_spectrometer_limenqr/controller.py b/src/nqrduck_spectrometer_limenqr/controller.py index 6d04f4a..40eb859 100644 --- a/src/nqrduck_spectrometer_limenqr/controller.py +++ b/src/nqrduck_spectrometer_limenqr/controller.py @@ -276,158 +276,191 @@ class LimeNQRController(BaseSpectrometerController): return lime def translate_pulse_sequence(self, lime): - """This method sets the parameters of the limr object according to the pulse sequence set in the pulse programmer module# - This is only relevant for the tx pulse parameters. General settings are set in the update_settings method and the rx event is - handled in the translate_rx_event method. - + """This method translates the pulse sequence to the limr object. + Args: lime (limr): The limr object that is used to communicate with the pulseN driver - - Returns: - limr: The updated limr object """ - events = self.module.model.pulse_programmer.model.pulse_sequence.events + events = self.fetch_pulse_sequence_events() for event in events: - logger.debug("Event %s has parameters: %s", event.name, event.parameters) + self.log_event_details(event) for parameter in event.parameters.values(): - logger.debug( - "Parameter %s has options: %s", parameter.name, parameter.options - ) + self.log_parameter_details(parameter) - if ( - parameter.name == self.module.model.TX - and parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value - > 0 - ): - pulse_shape = parameter.get_option_by_name( - TXPulse.TX_PULSE_SHAPE - ).value - pulse_amplitude = abs(pulse_shape.get_pulse_amplitude(event.duration)) + if self.is_translatable_tx_parameter(parameter): + pulse_shape, pulse_amplitude = self.prepare_pulse_amplitude(event, parameter) + pulse_amplitude, modulated_phase = self.modulate_pulse_amplitude(pulse_amplitude, event, lime) - # Apply the relative amplitude - pulse_amplitude *= parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value - - # Calculate the number of samples - num_samples = int(float(event.duration) * lime.sra) - - # Create the time vector for the pulse duration - tdx = np.linspace(0, float(event.duration), num_samples, endpoint=False) - - # Create the full complex exponential for modulation - # This represents a rotating vector (phasor) at your IF - shift_signal = np.exp(1j * 2 * np.pi * self.module.model.if_frequency * tdx) - - # pulse_amplitude is your desired pulse envelope, defined earlier - # Let's assume that pulse_amplitude is a real-valued vector with values corresponding to the amplitude of each sample - - # Apply the shift by multiplying with the complex exponential - pulse_complex = pulse_amplitude * shift_signal - - # Calculate amplitude and phase from the complex signal - modulated_amplitude = np.abs(pulse_complex) - modulated_phase = np.angle(pulse_complex) # This returns the phase in radians - - # For SDRs that require phase between 0 and 2*pi - modulated_phase = np.unwrap(modulated_phase) # To correct discontinuities - modulated_phase = (modulated_phase + 2 * np.pi) % (2 * np.pi) # Shift to [0, 2*pi] range - - # Apply the shift by multiplying the time domain signal - pulse_amplitude = (modulated_amplitude) - - # Clip the pulse amplitude to a minimum and maximum value of -0.99 and 0.99 - # this is kind of ugly but it prevents some kind of issue with the pulse clipping - # I'm not sure why this happens but it seems to be related to the pulse shape - # rectangular pulses seem to be the most effected by this - pulse_amplitude = np.clip(pulse_amplitude, -0.99, 0.99) - - if len(lime.pfr) == 0: - # Add the TX pulse to the pulse frequency list (lime.pfr) - lime.pfr = [ - float(self.module.model.if_frequency) - for i in range(len(pulse_amplitude)) - ] - # Add the duration of the TX pulse to the pulse duration list (lime.pdr) - lime.pdr = [ - float(pulse_shape.resolution) - for i in range(len(pulse_amplitude)) - ] - # Add the TX pulse amplitude to the pulse amplitude list (lime.pam) - lime.pam = list(pulse_amplitude) - # Add the pulse offset to the pulse offset list (lime.pof) - # This leads to a default offset of 300 samples for the first pulse - lime.pof = [self.module.model.OFFSET_FIRST_PULSE] - lime.pof += [ - int(pulse_shape.resolution * Decimal(lime.sra)) - for i in range(len(pulse_amplitude) -1) - ] - lime.pph = list(modulated_phase) - # Add the TX pulse phase to the pulse phase list (lime.pph) -> not yet implemented + if not lime.pfr: # If the pulse frequency list is empty + self.initialize_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase) else: - logger.debug("Adding TX pulse to existing pulse sequence") - lime.pfr += [ - float(self.module.model.if_frequency) - for i in range(len(pulse_amplitude)) - ] - - lime.pdr += [ - float(pulse_shape.resolution) - for i in range(len(pulse_amplitude)) - ] - - # Setting pulse amplitude - lime.pam += list(pulse_amplitude) - # Setting pulse phase - lime.pph += list(modulated_phase) - # Get the length of the previous event without a tx pulse - blank = [] - previous_events = events[: events.index(event)] - # Firstuful this is ugly as hell and needs to be refactored - # Secondly this just sets the pulse offsets. - for prev_event in previous_events[::-1]: - logger.debug( - "Previous event: %s with duration: %s", - prev_event.name, - prev_event.duration, - ) - for parameter in prev_event.parameters.values(): - if ( - parameter.name == self.module.model.TX - and parameter.get_option_by_name( - TXPulse.RELATIVE_AMPLITUDE - ).value - == 0 - ): - blank.append(float(prev_event.duration)) - elif ( - parameter.name == self.module.model.TX - and parameter.get_option_by_name( - TXPulse.RELATIVE_AMPLITUDE - ).value - > 0 - ): - break - else: - continue - break - - logger.debug("Found blanks: %s", blank) - - prev_duration = lime.pdr[-2] + sum(blank) - - logger.debug("Setting pulse offset to: %s", prev_duration) - lime.pof.append(int(np.ceil(prev_duration * lime.sra))) - lime.pof += [ - int(float(pulse_shape.resolution) * lime.sra) - for i in range(len(pulse_amplitude) - 1) - ] - - # The last event is the repetition time event - lime.trp = float(event.duration) + self.extend_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase) + self.calculate_and_set_offsets(lime, pulse_shape, events, event, pulse_amplitude) + # Set repetition time event as last event's duration and update number of pulses + lime.trp = float(event.duration) lime.npu = len(lime.pfr) return lime + # Helper functions below: + + def fetch_pulse_sequence_events(self): + """This method fetches the pulse sequence events from the pulse programmer module. + + Returns: + list: The pulse sequence events""" + return self.module.model.pulse_programmer.model.pulse_sequence.events + + def log_event_details(self, event): + logger.debug("Event %s has parameters: %s", event.name, event.parameters) + + def log_parameter_details(self, parameter): + logger.debug("Parameter %s has options: %s", parameter.name, parameter.options) + + def is_translatable_tx_parameter(self, parameter): + """This method checks if a parameter a pulse with a transmit pulse shape (amplitude nonzero) + + Args: + parameter (Parameter): The parameter to check""" + return (parameter.name == self.module.model.TX and + parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value > 0) + + def prepare_pulse_amplitude(self, event, parameter): + """This method prepares the pulse amplitude for the limr object. + + Args: + event (Event): The event that contains the parameter + parameter (Parameter): The parameter that contains the pulse shape and amplitude + + Returns: + tuple: A tuple containing the pulse shape and the pulse amplitude""" + pulse_shape = parameter.get_option_by_name(TXPulse.TX_PULSE_SHAPE).value + pulse_amplitude = abs(pulse_shape.get_pulse_amplitude(event.duration)) * \ + parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value + pulse_amplitude = np.clip(pulse_amplitude, -0.99, 0.99) + return pulse_shape, pulse_amplitude + + def modulate_pulse_amplitude(self, pulse_amplitude, event, lime): + """This method modulates the pulse amplitude for the limr object. We need to do this to have the pulse at IF frequency instead of LO frequency. + + Args: + pulse_amplitude (float): The pulse amplitude + event (Event): The event that contains the parameter + lime (limr): The limr object that is used to communicate with the pulseN driver + + Returns: + tuple: A tuple containing the modulated pulse amplitude and the modulated phase + """ + num_samples = int(float(event.duration) * lime.sra) + tdx = np.linspace(0, float(event.duration), num_samples, endpoint=False) + shift_signal = np.exp(1j * 2 * np.pi * self.module.model.if_frequency * tdx) + pulse_complex = pulse_amplitude * shift_signal + modulated_amplitude = np.abs(pulse_complex) + modulated_phase = self.unwrap_phase(np.angle(pulse_complex)) + return modulated_amplitude, modulated_phase + + def unwrap_phase(self, phase): + """This method unwraps the phase of the pulse. + + Args: + phase (float): The phase of the pulse""" + return (np.unwrap(phase) + 2 * np.pi) % (2 * np.pi) + + def initialize_pulse_lists(self, lime, pulse_amplitude, pulse_shape, modulated_phase): + """This method initializes the pulse lists of the limr object. + + Args: + lime (limr): The limr object that is used to communicate with the pulseN driver + pulse_amplitude (float): The pulse amplitude + pulse_shape (PulseShape): The pulse shape + modulated_phase (float): The modulated phase + """ + lime.pfr = [float(self.module.model.if_frequency)] * len(pulse_amplitude) + lime.pdr = [float(pulse_shape.resolution)] * len(pulse_amplitude) + lime.pam = list(pulse_amplitude) + lime.pof = ([self.module.model.OFFSET_FIRST_PULSE] + + [int(pulse_shape.resolution * Decimal(lime.sra))] * (len(pulse_amplitude) - 1)) + lime.pph = list(modulated_phase) + + def extend_pulse_lists(self, lime, pulse_amplitude, pulse_shape, modulated_phase): + """This method extends the pulse lists of the limr object. + + Args: + lime (limr): The limr object that is used to communicate with the pulseN driver + pulse_amplitude (float): The pulse amplitude + pulse_shape (PulseShape): The pulse shape + modulated_phase (float): The modulated phase + """ + lime.pfr.extend([float(self.module.model.if_frequency)] * len(pulse_amplitude)) + lime.pdr.extend([float(pulse_shape.resolution)] * len(pulse_amplitude)) + lime.pam.extend(list(pulse_amplitude)) + lime.pph.extend(list(modulated_phase)) + + def calculate_and_set_offsets(self, lime, pulse_shape, events, current_event, pulse_amplitude): + """This method calculates and sets the offsets for the limr object. + + Args: + lime (limr): The limr object that is used to communicate with the pulseN driver + pulse_shape (PulseShape): The pulse shape + events (list): The pulse sequence events + current_event (Event): The current event + pulse_amplitude (float): The pulse amplitude + """ + blank_durations = self.get_blank_durations_before_event(events, current_event) + + # Calculate the total time that has passed before the current event + total_blank_duration = sum(blank_durations) + # Calculate the offset for the current pulse + # The first pulse offset is already set, so calculate subsequent ones + offset_for_current_pulse = int(np.ceil(total_blank_duration * lime.sra)) + + # Offset for the current pulse should be added only once + lime.pof.append(offset_for_current_pulse) + + # Set the offset for the remaining samples of the current pulse (excluding the first sample) + # We subtract 1 because we have already set the offset for the current pulse's first sample + offset_per_sample = int(float(pulse_shape.resolution) * lime.sra) + lime.pof.extend([offset_per_sample] * (len(pulse_amplitude) - 1)) + + def get_blank_durations_before_event(self, events, current_event): + """This method returns the blank durations before the current event. + + Args: + events (list): The pulse sequence events + current_event (Event): The current event + + Returns: + list: The blank durations before the current event + """ + blank_durations = [] + previous_events_without_tx_pulse = self.get_previous_events_without_tx_pulse(events, current_event) + for event in previous_events_without_tx_pulse: + blank_durations.append(float(event.duration)) + return blank_durations + + def get_previous_events_without_tx_pulse(self, events, current_event): + """This method returns the previous events without a transmit pulse. + + Args: + events (list): The pulse sequence events + current_event (Event): The current event + + Returns: + list: The previous events without a transmit pulse + """ + index = events.index(current_event) + previous_events = events[:index] + result = [] + for event in reversed(previous_events): + translatable = any(self.is_translatable_tx_parameter(param) for param in event.parameters.values()) + if not translatable: + result.append(event) + else: + break + return reversed(result) # Reversed to maintain the original order if needed elsewhere + + def translate_rx_event(self, lime): """This method translates the RX event of the pulse sequence to the limr object.