Updated simulator with RX event.

This commit is contained in:
jupfi 2023-08-25 12:38:14 +02:00
parent 7f721d242a
commit 9cbe265694
2 changed files with 127 additions and 57 deletions

View file

@ -9,13 +9,14 @@ from nqr_blochsimulator.classes.simulation import Simulation
logger = logging.getLogger(__name__)
class SimulatorController(BaseSpectrometerController):
def __init__(self, module):
super().__init__(module)
def start_measurement(self):
"""This method is called when the start_measurement signal is received from the core.
It will becalled if the simulator is the active spectrometer.
It will becalled if the simulator is the active spectrometer.
This will start the simulation based on the settings and the pulse sequence.
"""
logger.debug("Starting simulation")
@ -30,11 +31,21 @@ class SimulatorController(BaseSpectrometerController):
simulation = self.get_simulation(sample, pulse_array)
result = abs(simulation.simulate())
tdx = np.linspace(0, float(self.calculate_simulation_length()), len(result)) * 1e6
tdx = (
np.linspace(0, float(self.calculate_simulation_length()), len(result)) * 1e6
)
rx_begin, rx_stop = self.translate_rx_event()
# If we have a RX event, we need to cut the result to the RX event
if rx_begin and rx_stop:
evidx = np.where((tdx > rx_begin) & (tdx < rx_stop))[0]
tdx = tdx[evidx]
result = result[evidx]
measurement_data = Measurement(
tdx,
result,
result / self.module.model.averages,
sample.resonant_frequency,
# frequency_shift=self.module.model.if_frequency,
)
@ -95,31 +106,32 @@ class SimulatorController(BaseSpectrometerController):
else:
logger.warning("Unknown sample setting: %s", samplesetting.name)
self.module.nqrduck_signal.emit(
"notification", ["Error", "Unknown sample setting: " + samplesetting.name]
"notification",
["Error", "Unknown sample setting: " + samplesetting.name],
)
return None
sample = Sample(
name = name,
density = density,
molar_mass = molar_mass,
resonant_frequency = resonant_frequency,
gamma = gamma,
nuclear_spin = nuclear_spin,
spin_factor = spin_factor,
powder_factor = powder_factor,
filling_factor = filling_factor,
T1 = T1,
T2 = T2,
T2_star = T2_star,
atom_density = atom_density,
sample_volume = sample_volume,
sample_length = sample_length,
sample_diameter = sample_diameter
name=name,
density=density,
molar_mass=molar_mass,
resonant_frequency=resonant_frequency,
gamma=gamma,
nuclear_spin=nuclear_spin,
spin_factor=spin_factor,
powder_factor=powder_factor,
filling_factor=filling_factor,
T1=T1,
T2=T2,
T2_star=T2_star,
atom_density=atom_density,
sample_volume=sample_volume,
sample_length=sample_length,
sample_diameter=sample_diameter,
)
return sample
def translate_pulse_sequence(self, dwell_time : float) -> PulseArray:
def translate_pulse_sequence(self, dwell_time: float) -> PulseArray:
"""This method translates the pulse sequence from the core to a PulseArray object needed for the simulation.
Args:
@ -147,29 +159,35 @@ class SimulatorController(BaseSpectrometerController):
pulse_shape = parameter.get_option_by_name(
TXPulse.TX_PULSE_SHAPE
).value
pulse_amplitude = abs(pulse_shape.get_pulse_amplitude(event.duration, resolution = dwell_time))
pulse_amplitude = abs(
pulse_shape.get_pulse_amplitude(
event.duration, resolution=dwell_time
)
)
amplitude_array.append(pulse_amplitude)
elif (parameter.name == self.module.model.TX and parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value
== 0):
elif (
parameter.name == self.module.model.TX
and parameter.get_option_by_name(TXPulse.RELATIVE_AMPLITUDE).value
== 0
):
# If we have a wait, we need to add it to the pulse array
amplitude_array.append(np.zeros(int(event.duration / dwell_time)))
amplitude_array = np.concatenate(amplitude_array)
# This has not yet been implemented right now the phase is always 0
phase_array = np.zeros(len(amplitude_array))
pulse_array = PulseArray(
pulseamplitude = amplitude_array,
pulsephase = phase_array,
dwell_time = float(dwell_time)
pulseamplitude=amplitude_array,
pulsephase=phase_array,
dwell_time=float(dwell_time),
)
return pulse_array
def get_simulation(self, sample : Sample, pulse_array : PulseArray) -> Simulation:
def get_simulation(self, sample: Sample, pulse_array: PulseArray) -> Simulation:
"""This method creates a simulation object based on the settings and the pulse sequence.
Args:
@ -183,24 +201,29 @@ class SimulatorController(BaseSpectrometerController):
noise = float(model.get_setting_by_name(model.NOISE).value)
simulation = Simulation(
sample = sample,
pulse = pulse_array,
number_isochromats = int(model.get_setting_by_name(model.NUMBER_ISOCHROMATS).value),
initial_magnetization = float(model.get_setting_by_name(model.INITIAL_MAGNETIZATION).value),
gradient = float(model.get_setting_by_name(model.GRADIENT).value),
noise = float(model.get_setting_by_name(model.NOISE).value),
length_coil = float(model.get_setting_by_name(model.LENGTH_COIL).value),
diameter_coil = float(model.get_setting_by_name(model.DIAMETER_COIL).value),
number_turns = float(model.get_setting_by_name(model.NUMBER_TURNS).value),
power_amplifier_power =float( model.get_setting_by_name(model.POWER_AMPLIFIER_POWER).value),
gain = float(model.get_setting_by_name(model.GAIN).value),
temperature = float(model.get_setting_by_name(model.TEMPERATURE).value),
averages = int(model.averages),
loss_TX = float(model.get_setting_by_name(model.LOSS_TX).value),
loss_RX = float(model.get_setting_by_name(model.LOSS_RX).value)
sample=sample,
pulse=pulse_array,
number_isochromats=int(
model.get_setting_by_name(model.NUMBER_ISOCHROMATS).value
),
initial_magnetization=float(
model.get_setting_by_name(model.INITIAL_MAGNETIZATION).value
),
gradient=float(model.get_setting_by_name(model.GRADIENT).value),
noise=float(model.get_setting_by_name(model.NOISE).value),
length_coil=float(model.get_setting_by_name(model.LENGTH_COIL).value),
diameter_coil=float(model.get_setting_by_name(model.DIAMETER_COIL).value),
number_turns=float(model.get_setting_by_name(model.NUMBER_TURNS).value),
power_amplifier_power=float(
model.get_setting_by_name(model.POWER_AMPLIFIER_POWER).value
),
gain=float(model.get_setting_by_name(model.GAIN).value),
temperature=float(model.get_setting_by_name(model.TEMPERATURE).value),
averages=int(model.averages),
loss_TX=float(model.get_setting_by_name(model.LOSS_TX).value),
loss_RX=float(model.get_setting_by_name(model.LOSS_RX).value),
)
return simulation
def calculate_dwelltime(self) -> float:
"""This method calculates the dwell time based on the settings and the pulse sequence.
@ -208,11 +231,13 @@ class SimulatorController(BaseSpectrometerController):
Returns:
float: The dwell time in seconds.
"""
n_points = int(self.module.model.get_setting_by_name(self.module.model.NUMBER_POINTS).value)
n_points = int(
self.module.model.get_setting_by_name(self.module.model.NUMBER_POINTS).value
)
simulation_length = self.calculate_simulation_length()
dwell_time = simulation_length / n_points
return dwell_time
def calculate_simulation_length(self) -> float:
"""This method calculates the simulation length based on the settings and the pulse sequence.
@ -225,16 +250,59 @@ class SimulatorController(BaseSpectrometerController):
simulation_length += event.duration
return simulation_length
def set_frequency(self, value : str) -> None:
""" This method is called when the set_frequency signal is received from the core.
def translate_rx_event(self) -> tuple:
"""This method translates the RX event of the pulse sequence to the limr object.
Returns:
tuple: A tuple containing the start and stop time of the RX event in µs"""
# This is a correction factor for the RX event. The offset of the first pulse is 2.2µs longer than from the specified samples.
events = self.module.model.pulse_programmer.model.pulse_sequence.events
previous_events_duration = 0
offset = 0
rx_duration = 0
for event in events:
logger.debug("Event %s has parameters: %s", event.name, event.parameters)
for parameter in event.parameters.values():
logger.debug(
"Parameter %s has options: %s", parameter.name, parameter.options
)
if (
parameter.name == self.module.model.RX
and parameter.get_option_by_name(RXReadout.RX).value
):
# Get the length of all previous events
previous_events = events[: events.index(event)]
previous_events_duration = sum(
[event.duration for event in previous_events]
)
rx_duration = event.duration
rx_begin = float(previous_events_duration)
if rx_duration:
rx_stop = rx_begin + float(rx_duration)
return rx_begin * 1e6, rx_stop * 1e6
else:
return None, None
def set_frequency(self, value: str) -> None:
"""This method is called when the set_frequency signal is received from the core.
For the simulator this just prints a warning that the simulator is selected.
"""
self.module.nqrduck_signal.emit(
"notification", ["Warning", "Could not set averages to because the simulator is selected as active spectrometer "]
"notification",
[
"Warning",
"Could not set averages to because the simulator is selected as active spectrometer ",
],
)
def set_averages(self, value : str) -> None:
""" This method is called when the set_averages signal is received from the core.
def set_averages(self, value: str) -> None:
"""This method is called when the set_averages signal is received from the core.
It sets the averages in the model used for the simulation.
Args:

View file

@ -102,6 +102,8 @@ class SimulatorModel(BaseSpectrometerModel):
# self.add_pulse_parameter_option(self.GATE, Gate)
self.add_pulse_parameter_option(self.RX, RXReadout)
self.averages = 1
# Try to load the pulse programmer module
try:
from nqrduck_pulseprogrammer.pulseprogrammer import pulse_programmer