Refactored translation of pulse sequence.

This commit is contained in:
jupfi 2023-12-10 09:55:12 +01:00
parent 16aabe1ecc
commit 67ae7e5ca5

View file

@ -276,158 +276,191 @@ class LimeNQRController(BaseSpectrometerController):
return lime return lime
def translate_pulse_sequence(self, lime): def translate_pulse_sequence(self, lime):
"""This method sets the parameters of the limr object according to the pulse sequence set in the pulse programmer module# """This method translates the pulse sequence to the limr object.
This is only relevant for the tx pulse parameters. General settings are set in the update_settings method and the rx event is
handled in the translate_rx_event method.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (limr): The limr object that is used to communicate with the pulseN driver
Returns:
limr: The updated limr object
""" """
events = self.module.model.pulse_programmer.model.pulse_sequence.events events = self.fetch_pulse_sequence_events()
for event in events: for event in events:
logger.debug("Event %s has parameters: %s", event.name, event.parameters) self.log_event_details(event)
for parameter in event.parameters.values(): for parameter in event.parameters.values():
logger.debug( self.log_parameter_details(parameter)
"Parameter %s has options: %s", parameter.name, parameter.options
)
if ( if self.is_translatable_tx_parameter(parameter):
parameter.name == self.module.model.TX pulse_shape, pulse_amplitude = self.prepare_pulse_amplitude(event, parameter)
and parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value pulse_amplitude, modulated_phase = self.modulate_pulse_amplitude(pulse_amplitude, event, lime)
> 0
):
pulse_shape = parameter.get_option_by_name(
TXPulse.TX_PULSE_SHAPE
).value
pulse_amplitude = abs(pulse_shape.get_pulse_amplitude(event.duration))
# Apply the relative amplitude if not lime.pfr: # If the pulse frequency list is empty
pulse_amplitude *= parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value self.initialize_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase)
# Calculate the number of samples
num_samples = int(float(event.duration) * lime.sra)
# Create the time vector for the pulse duration
tdx = np.linspace(0, float(event.duration), num_samples, endpoint=False)
# Create the full complex exponential for modulation
# This represents a rotating vector (phasor) at your IF
shift_signal = np.exp(1j * 2 * np.pi * self.module.model.if_frequency * tdx)
# pulse_amplitude is your desired pulse envelope, defined earlier
# Let's assume that pulse_amplitude is a real-valued vector with values corresponding to the amplitude of each sample
# Apply the shift by multiplying with the complex exponential
pulse_complex = pulse_amplitude * shift_signal
# Calculate amplitude and phase from the complex signal
modulated_amplitude = np.abs(pulse_complex)
modulated_phase = np.angle(pulse_complex) # This returns the phase in radians
# For SDRs that require phase between 0 and 2*pi
modulated_phase = np.unwrap(modulated_phase) # To correct discontinuities
modulated_phase = (modulated_phase + 2 * np.pi) % (2 * np.pi) # Shift to [0, 2*pi] range
# Apply the shift by multiplying the time domain signal
pulse_amplitude = (modulated_amplitude)
# Clip the pulse amplitude to a minimum and maximum value of -0.99 and 0.99
# this is kind of ugly but it prevents some kind of issue with the pulse clipping
# I'm not sure why this happens but it seems to be related to the pulse shape
# rectangular pulses seem to be the most effected by this
pulse_amplitude = np.clip(pulse_amplitude, -0.99, 0.99)
if len(lime.pfr) == 0:
# Add the TX pulse to the pulse frequency list (lime.pfr)
lime.pfr = [
float(self.module.model.if_frequency)
for i in range(len(pulse_amplitude))
]
# Add the duration of the TX pulse to the pulse duration list (lime.pdr)
lime.pdr = [
float(pulse_shape.resolution)
for i in range(len(pulse_amplitude))
]
# Add the TX pulse amplitude to the pulse amplitude list (lime.pam)
lime.pam = list(pulse_amplitude)
# Add the pulse offset to the pulse offset list (lime.pof)
# This leads to a default offset of 300 samples for the first pulse
lime.pof = [self.module.model.OFFSET_FIRST_PULSE]
lime.pof += [
int(pulse_shape.resolution * Decimal(lime.sra))
for i in range(len(pulse_amplitude) -1)
]
lime.pph = list(modulated_phase)
# Add the TX pulse phase to the pulse phase list (lime.pph) -> not yet implemented
else: else:
logger.debug("Adding TX pulse to existing pulse sequence") self.extend_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase)
lime.pfr += [ self.calculate_and_set_offsets(lime, pulse_shape, events, event, pulse_amplitude)
float(self.module.model.if_frequency)
for i in range(len(pulse_amplitude))
]
lime.pdr += [ # Set repetition time event as last event's duration and update number of pulses
float(pulse_shape.resolution)
for i in range(len(pulse_amplitude))
]
# Setting pulse amplitude
lime.pam += list(pulse_amplitude)
# Setting pulse phase
lime.pph += list(modulated_phase)
# Get the length of the previous event without a tx pulse
blank = []
previous_events = events[: events.index(event)]
# Firstuful this is ugly as hell and needs to be refactored
# Secondly this just sets the pulse offsets.
for prev_event in previous_events[::-1]:
logger.debug(
"Previous event: %s with duration: %s",
prev_event.name,
prev_event.duration,
)
for parameter in prev_event.parameters.values():
if (
parameter.name == self.module.model.TX
and parameter.get_option_by_name(
TXPulse.RELATIVE_AMPLITUDE
).value
== 0
):
blank.append(float(prev_event.duration))
elif (
parameter.name == self.module.model.TX
and parameter.get_option_by_name(
TXPulse.RELATIVE_AMPLITUDE
).value
> 0
):
break
else:
continue
break
logger.debug("Found blanks: %s", blank)
prev_duration = lime.pdr[-2] + sum(blank)
logger.debug("Setting pulse offset to: %s", prev_duration)
lime.pof.append(int(np.ceil(prev_duration * lime.sra)))
lime.pof += [
int(float(pulse_shape.resolution) * lime.sra)
for i in range(len(pulse_amplitude) - 1)
]
# The last event is the repetition time event
lime.trp = float(event.duration) lime.trp = float(event.duration)
lime.npu = len(lime.pfr) lime.npu = len(lime.pfr)
return lime return lime
# Helper functions below:
def fetch_pulse_sequence_events(self):
"""This method fetches the pulse sequence events from the pulse programmer module.
Returns:
list: The pulse sequence events"""
return self.module.model.pulse_programmer.model.pulse_sequence.events
def log_event_details(self, event):
logger.debug("Event %s has parameters: %s", event.name, event.parameters)
def log_parameter_details(self, parameter):
logger.debug("Parameter %s has options: %s", parameter.name, parameter.options)
def is_translatable_tx_parameter(self, parameter):
"""This method checks if a parameter a pulse with a transmit pulse shape (amplitude nonzero)
Args:
parameter (Parameter): The parameter to check"""
return (parameter.name == self.module.model.TX and
parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value > 0)
def prepare_pulse_amplitude(self, event, parameter):
"""This method prepares the pulse amplitude for the limr object.
Args:
event (Event): The event that contains the parameter
parameter (Parameter): The parameter that contains the pulse shape and amplitude
Returns:
tuple: A tuple containing the pulse shape and the pulse amplitude"""
pulse_shape = parameter.get_option_by_name(TXPulse.TX_PULSE_SHAPE).value
pulse_amplitude = abs(pulse_shape.get_pulse_amplitude(event.duration)) * \
parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value
pulse_amplitude = np.clip(pulse_amplitude, -0.99, 0.99)
return pulse_shape, pulse_amplitude
def modulate_pulse_amplitude(self, pulse_amplitude, event, lime):
"""This method modulates the pulse amplitude for the limr object. We need to do this to have the pulse at IF frequency instead of LO frequency.
Args:
pulse_amplitude (float): The pulse amplitude
event (Event): The event that contains the parameter
lime (limr): The limr object that is used to communicate with the pulseN driver
Returns:
tuple: A tuple containing the modulated pulse amplitude and the modulated phase
"""
num_samples = int(float(event.duration) * lime.sra)
tdx = np.linspace(0, float(event.duration), num_samples, endpoint=False)
shift_signal = np.exp(1j * 2 * np.pi * self.module.model.if_frequency * tdx)
pulse_complex = pulse_amplitude * shift_signal
modulated_amplitude = np.abs(pulse_complex)
modulated_phase = self.unwrap_phase(np.angle(pulse_complex))
return modulated_amplitude, modulated_phase
def unwrap_phase(self, phase):
"""This method unwraps the phase of the pulse.
Args:
phase (float): The phase of the pulse"""
return (np.unwrap(phase) + 2 * np.pi) % (2 * np.pi)
def initialize_pulse_lists(self, lime, pulse_amplitude, pulse_shape, modulated_phase):
"""This method initializes the pulse lists of the limr object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
pulse_amplitude (float): The pulse amplitude
pulse_shape (PulseShape): The pulse shape
modulated_phase (float): The modulated phase
"""
lime.pfr = [float(self.module.model.if_frequency)] * len(pulse_amplitude)
lime.pdr = [float(pulse_shape.resolution)] * len(pulse_amplitude)
lime.pam = list(pulse_amplitude)
lime.pof = ([self.module.model.OFFSET_FIRST_PULSE] +
[int(pulse_shape.resolution * Decimal(lime.sra))] * (len(pulse_amplitude) - 1))
lime.pph = list(modulated_phase)
def extend_pulse_lists(self, lime, pulse_amplitude, pulse_shape, modulated_phase):
"""This method extends the pulse lists of the limr object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
pulse_amplitude (float): The pulse amplitude
pulse_shape (PulseShape): The pulse shape
modulated_phase (float): The modulated phase
"""
lime.pfr.extend([float(self.module.model.if_frequency)] * len(pulse_amplitude))
lime.pdr.extend([float(pulse_shape.resolution)] * len(pulse_amplitude))
lime.pam.extend(list(pulse_amplitude))
lime.pph.extend(list(modulated_phase))
def calculate_and_set_offsets(self, lime, pulse_shape, events, current_event, pulse_amplitude):
"""This method calculates and sets the offsets for the limr object.
Args:
lime (limr): The limr object that is used to communicate with the pulseN driver
pulse_shape (PulseShape): The pulse shape
events (list): The pulse sequence events
current_event (Event): The current event
pulse_amplitude (float): The pulse amplitude
"""
blank_durations = self.get_blank_durations_before_event(events, current_event)
# Calculate the total time that has passed before the current event
total_blank_duration = sum(blank_durations)
# Calculate the offset for the current pulse
# The first pulse offset is already set, so calculate subsequent ones
offset_for_current_pulse = int(np.ceil(total_blank_duration * lime.sra))
# Offset for the current pulse should be added only once
lime.pof.append(offset_for_current_pulse)
# Set the offset for the remaining samples of the current pulse (excluding the first sample)
# We subtract 1 because we have already set the offset for the current pulse's first sample
offset_per_sample = int(float(pulse_shape.resolution) * lime.sra)
lime.pof.extend([offset_per_sample] * (len(pulse_amplitude) - 1))
def get_blank_durations_before_event(self, events, current_event):
"""This method returns the blank durations before the current event.
Args:
events (list): The pulse sequence events
current_event (Event): The current event
Returns:
list: The blank durations before the current event
"""
blank_durations = []
previous_events_without_tx_pulse = self.get_previous_events_without_tx_pulse(events, current_event)
for event in previous_events_without_tx_pulse:
blank_durations.append(float(event.duration))
return blank_durations
def get_previous_events_without_tx_pulse(self, events, current_event):
"""This method returns the previous events without a transmit pulse.
Args:
events (list): The pulse sequence events
current_event (Event): The current event
Returns:
list: The previous events without a transmit pulse
"""
index = events.index(current_event)
previous_events = events[:index]
result = []
for event in reversed(previous_events):
translatable = any(self.is_translatable_tx_parameter(param) for param in event.parameters.values())
if not translatable:
result.append(event)
else:
break
return reversed(result) # Reversed to maintain the original order if needed elsewhere
def translate_rx_event(self, lime): def translate_rx_event(self, lime):
"""This method translates the RX event of the pulse sequence to the limr object. """This method translates the RX event of the pulse sequence to the limr object.