Started implementation of single freq. measurement

This commit is contained in:
jupfi 2023-07-17 11:30:31 +02:00
parent ae2aff25a2
commit ee971974cf

View file

@ -6,32 +6,36 @@ from pathlib import Path
import numpy as np import numpy as np
from nqrduck.module.module_controller import ModuleController from nqrduck.module.module_controller import ModuleController
from nqrduck_spectrometer.base_spectrometer_controller import BaseSpectrometerController from nqrduck_spectrometer.base_spectrometer_controller import BaseSpectrometerController
from nqrduck_spectrometer.measurement import Measurement
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LimeNQRController(BaseSpectrometerController): class LimeNQRController(BaseSpectrometerController):
def __init__(self, module): def __init__(self, module):
super().__init__(module) super().__init__(module)
def start_measurement(self): def start_measurement(self):
logger.debug("Starting measurement with spectrometer: %s", self.module.model.name) logger.debug(
"Starting measurement with spectrometer: %s", self.module.model.name
)
# Now we request the pulse sequence set in the pulse programmer module # Now we request the pulse sequence set in the pulse programmer module
pulse_sequence = self.module.model.pulse_programmer.model.pulse_sequence pulse_sequence = self.module.model.pulse_programmer.model.pulse_sequence
logger.debug("Pulse sequence is: %s", pulse_sequence.dump_sequence_data()) logger.debug("Pulse sequence is: %s", pulse_sequence.dump_sequence_data())
try: try:
from .contrib.limr import limr from .contrib.limr import limr
self_path = Path(__file__).parent
driver_path = str(self_path / "contrib/pulseN_test_USB.cpp") self_path = Path(__file__).parent
lime = limr(driver_path) driver_path = str(self_path / "contrib/pulseN_test_USB.cpp")
lime = limr(driver_path)
except ImportError as e: except ImportError as e:
logger.error("Error while importing limr. %s", e) logger.error("Error while importing limr. %s", e)
except Exception as e: except Exception as e:
logger.error("Error while loading pulseN_test_USB.cpp: %s", e) logger.error("Error while loading pulseN_test_USB.cpp: %s", e)
lime.noi = -1 # No initialisation
lime.noi = -1 # No initialisation lime.nrp = 1 # Numer of repetitions
lime.nrp = 1 # Numer of repetitions
lime = self.update_settings(lime) lime = self.update_settings(lime)
lime = self.translate_pulse_sequence(lime) lime = self.translate_pulse_sequence(lime)
@ -41,7 +45,7 @@ class LimeNQRController(BaseSpectrometerController):
if val != []: if val != []:
# logger.debug("Attribute: %s, Value: %s, Descr.: %s" % key, val, lime.parsinp[key][1]) # logger.debug("Attribute: %s, Value: %s, Descr.: %s" % key, val, lime.parsinp[key][1])
logger.debug(key + ": " + str(val) + " " + lime.parsinp[key][1]) logger.debug(key + ": " + str(val) + " " + lime.parsinp[key][1])
# Create temp folder for .hdf files # Create temp folder for .hdf files
temp_dir = tempfile.TemporaryDirectory() temp_dir = tempfile.TemporaryDirectory()
path = Path(temp_dir.name) path = Path(temp_dir.name)
@ -58,44 +62,32 @@ class LimeNQRController(BaseSpectrometerController):
rx_begin, rx_stop = self.translate_rx_event(lime) rx_begin, rx_stop = self.translate_rx_event(lime)
logger.debug("RX event starts at: %s and ends at: %s", rx_begin, rx_stop) logger.debug("RX event starts at: %s and ends at: %s", rx_begin, rx_stop)
#evaluation range, defines: blanking time and window length # evaluation range, defines: blanking time and window length
evran = [rx_begin, rx_stop] evran = [rx_begin, rx_stop]
#np.where sometimes does not work out, so it is put in a try except # np.where sometimes does not work out, so it is put in a try except
#always check the console for errors # always check the console for errors
try: try:
evidx = np.where( (lime.HDF.tdx > evran[0]) & (lime.HDF.tdx < evran[1]) )[0] evidx = np.where((lime.HDF.tdx > evran[0]) & (lime.HDF.tdx < evran[1]))[0]
except: except:
print("error due to np.where evaluation!") logger.error("Error while reading the measurement data")
return -1
#time domain x and y data
# time domain x and y data
tdx = lime.HDF.tdx[evidx] tdx = lime.HDF.tdx[evidx]
tdy = lime.HDF.tdy[evidx] tdy = lime.HDF.tdy[evidx] / lime.nav
#correcting a offset in the time domain by subtracting the mean measurement_data = Measurement(tdx, tdy, self.module.model.target_frequency)
tdy_mean = tdy[:,0]-np.mean(tdy)
#fft of the corrected time domain data
fdy1 = fftshift(fft(tdy_mean,axis=0),axes=0)
#fft freq and fft shift is here used to scale the x axis (frequency axis) # Emit the data to the nqrduck core
fdx1 = fftfreq(len(fdy1))*lime.sra/1e6 logger.debug("Emitting measurement data")
fdx1 = fftshift(fdx1) self.module.nqrduck_signal.emit("single_measurement", measurement_data)
#scaling factor which converts the y axis (usually a proportional number of points) into uV
fac_p_to_uV = 447651/1e6
#tdy_mean = tdy_mean/l.nav/fac_p_to_uV/RX_gainfactor
plt.figure(1);
plt.plot(tdx,tdy_mean/lime.nav)
plt.xlabel("t in µs")
plt.ylabel("Amplitude / points")
plt.show()
def update_settings(self, lime): def update_settings(self, lime):
logger.debug("Updating settings for spectrometer: %s for measurement", self.module.model.name) logger.debug(
"Updating settings for spectrometer: %s for measurement",
self.module.model.name,
)
lime.t3d = [0, 0, 0, 0] lime.t3d = [0, 0, 0, 0]
for category in self.module.model.settings.keys(): for category in self.module.model.settings.keys():
for setting in self.module.model.settings[category]: for setting in self.module.model.settings[category]:
@ -109,7 +101,9 @@ class LimeNQRController(BaseSpectrometerController):
lime.sra = setting.get_setting() lime.sra = setting.get_setting()
# Careful this doesn't only set the IF frequency but the local oscillator frequency # Careful this doesn't only set the IF frequency but the local oscillator frequency
elif setting.name == self.module.model.IF_FREQUENCY: elif setting.name == self.module.model.IF_FREQUENCY:
lime.lof = self.module.model.target_frequency - setting.get_setting() lime.lof = (
self.module.model.target_frequency - setting.get_setting()
)
self.module.model.if_frequency = setting.get_setting() self.module.model.if_frequency = setting.get_setting()
elif setting.name == self.module.model.ACQUISITION_TIME: elif setting.name == self.module.model.ACQUISITION_TIME:
lime.tac = setting.get_setting() lime.tac = setting.get_setting()
@ -152,17 +146,17 @@ class LimeNQRController(BaseSpectrometerController):
lime.rgq = setting.get_setting() lime.rgq = setting.get_setting()
elif setting.name == self.module.model.RX_PHASE_ADJUSTMENT: elif setting.name == self.module.model.RX_PHASE_ADJUSTMENT:
lime.rpc = setting.get_setting() lime.rpc = setting.get_setting()
return lime return lime
def translate_pulse_sequence(self, lime): def translate_pulse_sequence(self, lime):
"""This method sets the parameters of the limr object according to the pulse sequence set in the pulse programmer module# """This method sets the parameters of the limr object according to the pulse sequence set in the pulse programmer module#
This is only relevant for the tx pulse parameters. General settings are set in the update_settings method and the rx event is This is only relevant for the tx pulse parameters. General settings are set in the update_settings method and the rx event is
handled in the translate_rx_event method. handled in the translate_rx_event method.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (limr): The limr object that is used to communicate with the pulseN driver
Returns: Returns:
limr: The updated limr object limr: The updated limr object
""" """
@ -171,10 +165,14 @@ class LimeNQRController(BaseSpectrometerController):
for event in events: for event in events:
logger.debug("Event %s has parameters: %s", event.name, event.parameters) logger.debug("Event %s has parameters: %s", event.name, event.parameters)
for parameter in event.parameters.values(): for parameter in event.parameters.values():
logger.debug("Parameter %s has options: %s", parameter.name, parameter.options) logger.debug(
"Parameter %s has options: %s", parameter.name, parameter.options
if parameter.name == self.module.model.TX and parameter.options["TX Amplitude"].value > 0: )
if (
parameter.name == self.module.model.TX
and parameter.options["TX Amplitude"].value > 0
):
if len(lime.pfr) == 0: if len(lime.pfr) == 0:
# Add the TX pulse to the pulse frequency list (lime.pfr) # Add the TX pulse to the pulse frequency list (lime.pfr)
lime.pfr = [float(self.module.model.if_frequency)] lime.pfr = [float(self.module.model.if_frequency)]
@ -188,63 +186,84 @@ class LimeNQRController(BaseSpectrometerController):
# Add the TX pulse phase to the pulse phase list (lime.pph) -> not yet implemented # Add the TX pulse phase to the pulse phase list (lime.pph) -> not yet implemented
else: else:
logger.debug("Adding TX pulse to existing pulse sequence") logger.debug("Adding TX pulse to existing pulse sequence")
logger.debug("Setting if frequency to: %s", self.module.model.if_frequency) logger.debug(
"Setting if frequency to: %s",
self.module.model.if_frequency,
)
lime.pfr.append(float(self.module.model.if_frequency)) lime.pfr.append(float(self.module.model.if_frequency))
logger.debug("Setting pulse duration to: %s", event.duration) logger.debug("Setting pulse duration to: %s", event.duration)
lime.pdr.append(float(event.duration)) lime.pdr.append(float(event.duration))
logger.debug("Setting pulse amplitude to: %s", parameter.options["TX Amplitude"].value) logger.debug(
"Setting pulse amplitude to: %s",
parameter.options["TX Amplitude"].value,
)
lime.pam.append(float(parameter.options["TX Amplitude"].value)) lime.pam.append(float(parameter.options["TX Amplitude"].value))
# Get the length of the previous event without a tx pulse # Get the length of the previous event without a tx pulse
blank = [] blank = []
previous_events = events[:events.index(event)] previous_events = events[: events.index(event)]
# Firstuful this is ugly as hell and needs to be refactored # Firstuful this is ugly as hell and needs to be refactored
# Secondly this just sets the pulse offsets. # Secondly this just sets the pulse offsets.
for prev_event in previous_events[::-1]: for prev_event in previous_events[::-1]:
logger.debug("Previous event: %s with duration: %s", prev_event.name, prev_event.duration) logger.debug(
"Previous event: %s with duration: %s",
prev_event.name,
prev_event.duration,
)
for parameter in prev_event.parameters.values(): for parameter in prev_event.parameters.values():
if parameter.name == self.module.model.TX and parameter.options["TX Amplitude"].value == 0: if (
parameter.name == self.module.model.TX
and parameter.options["TX Amplitude"].value == 0
):
blank.append(float(prev_event.duration)) blank.append(float(prev_event.duration))
elif parameter.name == self.module.model.TX and parameter.options["TX Amplitude"].value > 0: elif (
parameter.name == self.module.model.TX
and parameter.options["TX Amplitude"].value > 0
):
break break
else: else:
continue continue
break break
logger.debug("Found blanks: %s", blank) logger.debug("Found blanks: %s", blank)
prev_duration = lime.pdr[-2] + sum(blank) prev_duration = lime.pdr[-2] + sum(blank)
logger.debug("Setting pulse offset to: %s", prev_duration)
lime.pof.append(np.ceil(prev_duration * lime.sra))
logger.debug("Setting pulse offset to: %s", prev_duration)
lime.pof.append(np.ceil(prev_duration * lime.sra))
# The last event is the repetition time event # The last event is the repetition time event
lime.trp = float(event.duration) lime.trp = float(event.duration)
lime.npu = len(lime.pfr) lime.npu = len(lime.pfr)
return lime return lime
def translate_rx_event(self, lime): def translate_rx_event(self, lime):
# This is a correction factor for the RX event. The offset of the first pulse is 2.2µs longer than from the specified samples. # This is a correction factor for the RX event. The offset of the first pulse is 2.2µs longer than from the specified samples.
CORRECTION_FACTOR = 2.2e-6 CORRECTION_FACTOR = 2.2e-6
events = self.module.model.pulse_programmer.model.pulse_sequence.events events = self.module.model.pulse_programmer.model.pulse_sequence.events
previous_events_duration = []
for event in events: for event in events:
logger.debug("Event %s has parameters: %s", event.name, event.parameters) logger.debug("Event %s has parameters: %s", event.name, event.parameters)
for parameter in event.parameters.values(): for parameter in event.parameters.values():
logger.debug("Parameter %s has options: %s", parameter.name, parameter.options) logger.debug(
"Parameter %s has options: %s", parameter.name, parameter.options
if parameter.name == self.module.model.RX and parameter.options['RX'].state: )
if (
parameter.name == self.module.model.RX
and parameter.options["RX"].state
):
# Get the length of all previous events # Get the length of all previous events
previous_events = events[:events.index(event)] previous_events = events[: events.index(event)]
previous_events_duration = sum([event.duration for event in previous_events]) previous_events_duration = sum(
[event.duration for event in previous_events]
)
# Get the offset of the first pulse # Get the offset of the first pulse
offset = self.module.model.OFFSET_FIRST_PULSE * (1/lime.sra) offset = self.module.model.OFFSET_FIRST_PULSE * (1 / lime.sra)
rx_duration = event.duration rx_duration = event.duration
rx_begin = previous_events_duration + offset + CORRECTION_FACTOR rx_begin = previous_events_duration + offset + CORRECTION_FACTOR
rx_stop = rx_begin + rx_duration rx_stop = rx_begin + rx_duration
return rx_begin * 1e6, rx_stop * 1e6 return rx_begin * 1e6, rx_stop * 1e6