Merge pull request #3 from nqrduck/LimeBindings

Implemented communication via LimeDriverBinding repo.
This commit is contained in:
Julia Pfitzer 2024-02-09 15:16:55 +01:00 committed by GitHub
commit ec800d9fee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 135 additions and 4308 deletions

View file

@ -24,6 +24,7 @@ classifiers = [
] ]
dependencies = [ dependencies = [
"limedriver @ git+https://github.com/nqrduck/LimeDriverBindings",
"nqrduck-spectrometer", "nqrduck-spectrometer",
"pyqt6", "pyqt6",
"h5py", "h5py",

View file

@ -1,6 +0,0 @@
# Information
These files are used for the control of the LimeSDR based spectrometer.
With permission from the author Andrin Doll, the files are included in this repository.
A. Doll; Pulsed and continuous-wave magnetic resonance spectroscopy using a low-cost software-defined radio. AIP Advances 1 November 2019; 9 (11): 115110. https://doi.org/10.1063/1.5127746

View file

@ -1,533 +0,0 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Dec 7 10:46:20 2018
@author: andrin
Class that eases interfacing with the limesdr routines written in Cpp,
notably the pulse_test_USB and pulseN_test_USB routines, which runs a pulse sequence
according to passed arguments
The class allows for setting of the arguments as well as for parametric sweeps to
implement arbitrary sequences
Note for release: The communication between the python and the Cpp routine is very rudimentary, meaning using command line arguments that are parametrically read from the Cpp source.
Update Feb 2020: Slight changes to make it compatible with Python 3
"""
import subprocess # to call the program
import datetime # to generate timestamps for parsweeps
import h5py # to have organized data storage.....
import numpy as np # ...
import matplotlib.pyplot as plt
class limr():
def __init__(self, filename = './pulseN_USB.cpp'):
# check first for the filename provided
if filename[-3:] == 'cpp':
self.Csrc = filename
else:
self.Csrc = './pulseN_USB.cpp'
# the program to call
self.Cprog = self.Csrc[:-4]
fp = open(self.Csrc, 'r')
in_arg = {}
startpattern = 'struct Config2HDFattr_t HDFattr[]'
stoppattern = '};'
parsing = False
ii_oupargs = 0
for line in fp.readlines():
if (stoppattern in line) & parsing:
break
if parsing:
stripped = line.replace('\t','').replace('"','').strip('\n').strip(',').strip('{').strip('}')
splitted = stripped.split(',')
# remove irrrelevant stuff
rmvidx = range(4,len(splitted)-1)
for ii in range(len(rmvidx)):
splitted.pop(4)
if splitted[0] == '///':
splitted[0] = '//' + str(ii_oupargs)
ii_oupargs+=1
in_arg[splitted[0]] = splitted
in_arg[splitted[0]][0] = []
if startpattern in line:
parsing = True
fp.close()
self.parsinp = in_arg
for key in in_arg:
setattr(self, key, in_arg[key][0])
# initialize other variables
self.parvar = {}
self.parvar_cpl = {}
self.HDFfile = []
self.HDF = HDF()
self.segcount = 0
# print the arguments that have been set
def print_params(self, allel = False):
for key in sorted(self.parsinp):
val = getattr(self,key)
if (val != []) | (allel):
print('{:<5}: {:>50} {:<25}'.format(key, val, self.parsinp[key][1]))
# add parameter variation:
# key is the argument to vary
# idx the indices of values
# strt the starting point
# end the endpoint
# npts the dimension of the sweep
def parsweep(self, key, strt, end, npts, idx = 0):
if ~isinstance(idx,list): idx = [idx] # idx as list eases iteration
# check the key
try:
vals = getattr(self,key)
except:
print('Problem with sweep: Key ' + key + ' is not valid! See below for valid keys')
self.print_params(allel=True)
return
# check for existing val and for proper dimension. Dimension is a priori not known due to number of pulses that can be flexible
if (vals == []):
print('Problem with sweep: Initialize first a value to argument ' + key +'. I will try with assuming zero')
vals = 0;
if isinstance(vals, (list, np.ndarray)):
if len(vals) < max(idx):
print('Problem with sweep: ' + key + ' has only ' + str(len(vals)) + ' objects, while an index of ' + str(max(idx)) + ' was requested!')
return
startlist = [[vals[jj] for jj in range(len(vals))] for ii in range(npts)]
elif max(idx) > 0:
print('Problem with sweep: ' + key + ' is scalar, while an index of ' + str(max(idx)) + ' was requested!')
return
else:
startlist = [[vals] for ii in range(npts)]
# check if a parvar already exists for this key
if len(self.parvar) == 0:
self.parvar['sweeplist'] = startlist
elif not((key == self.parvar['key']) & (npts == self.parvar['dim'])):
self.parvar['sweeplist'] = startlist
self.parvar['key'] = key
self.parvar['dim'] = npts
if npts > 1:
incr = (end - strt)/(npts-1)
else:
incr = 0;
for ii_swp in range(npts):
for swp_idx in idx:
self.parvar['sweeplist'][ii_swp][swp_idx] = strt + ii_swp*incr
# add coupled parameter variation of another variable: (one variable is not enough... two neither, but better than one. A list of dicts would more general....)
# key is the argument to vary
# idx the indices of values
# strt the starting point
# end the endpoint
# npts the dimension of the sweep
def parsweep_cpl(self, key, strt, end, npts, idx = 0):
if ~isinstance(idx,list): idx = [idx] # idx as list eases iteration
# check the key
try:
vals = getattr(self,key)
except:
print('Problem with sweep: Key ' + key + ' is not valid! See below for valid keys')
self.print_params(allel=True)
return
# check for existing val and for proper dimension. Dimension is a priori not known due to number of pulses that can be flexible
if (vals == []):
print('Problem with sweep: Initialize first a value to argument ' + key +'. I will try with assuming zero')
vals = 0;
if isinstance(vals, (list, np.ndarray)):
if len(vals) < max(idx):
print('Problem with sweep: ' + key + ' has only ' + str(len(vals)) + ' objects, while an index of ' + str(max(idx)) + ' was requested!')
return
startlist = [[vals[jj] for jj in range(len(vals))] for ii in range(npts)]
elif max(idx) > 0:
print('Problem with sweep: ' + key + ' is scalar, while an index of ' + str(max(idx)) + ' was requested!')
return
else:
startlist = [[vals] for ii in range(npts)]
# check if a parvar already exists for this key
if len(self.parvar_cpl) == 0:
self.parvar_cpl['sweeplist'] = startlist
elif not((key == self.parvar_cpl['key']) & (npts == self.parvar_cpl['dim'])):
self.parvar_cpl['sweeplist'] = startlist
self.parvar_cpl['key'] = key
self.parvar_cpl['dim'] = npts
incr = (end - strt)/(npts-1)
for ii_swp in range(npts):
for swp_idx in idx:
self.parvar_cpl['sweeplist'][ii_swp][swp_idx] = strt + ii_swp*incr
def run(self, oup = True):
# check if there is a parvar or only a single
if len(self.parvar) == 0:
self.__run_single(oup)
else:
# store the value currently in the swept parameter
stdval = getattr(self, self.parvar['key'])
if len(self.parvar_cpl) != 0:
stdval2 = getattr(self, self.parvar_cpl['key'])
# handle the timestamp
stddatestr = getattr(self,'fst')
if (stddatestr == []):
setattr(self, 'fst', datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
# give it a useful name
stdfilepat = getattr(self,'fpa')
if (stdfilepat == []):
setattr(self, 'fpa', self.parvar['key'] + '_swp')
# actual iteration over the sweeplist
for ii in range(self.parvar['dim']):
setattr(self, self.parvar['key'], self.parvar['sweeplist'][ii])
if len(self.parvar_cpl) != 0: # as well as the coupled variable
setattr(self, self.parvar_cpl['key'], self.parvar_cpl['sweeplist'][ii])
self.__run_single(oup)
# save parvar info as attribute, which means that we need to detect the file
if getattr(self,'nos') != 0: # this one is suspicious...
if self.HDFfile == []:
self.HDFfile = self.__guess_savepath()
try:
# this is probably erroneous and was never recognized...! self.parvar is not a key/value pair
f = h5py.File(self.HDFfile, 'r+')
for key in self.parvar:
f.attrs.create(key, self.parvar[key])
f.close()
except:
print('Problem opening file ' + self.HDFfile)
setattr(self, self.parvar['key'], stdval) # set back to non-swept value
setattr(self, 'fst', stddatestr) # set back to non-swept value
setattr(self, 'fpa', stdfilepat) # set back to non-swept value
if len(self.parvar_cpl) != 0:
setattr(self, self.parvar_cpl['key'], stdval2) # set back to non-swept value
def readHDF(self, filename = ''):
if filename != '':
self.HDFfile = filename
self.HDF.load(self.HDFfile)
# helper functoin to guess the savepath from the file. This should not be called, since it should be obtained from the output of the program call
def __guess_savepath(self):
savepath = getattr(self,'spt')
if savepath == []: savepath = './asdf/' # not recommended here: knowledge about the standard directory in the cpp file.... could be parsed, but user will usually provide a folder to limr.spt
if savepath[-1] != '/': savepath += '/' # and that little fix since users seldomly put the '/' for the directory...
savepath = savepath + getattr(self,'fst') + '_' + getattr(self,'fpa') + '.h5'
return savepath
# run for one single constellation
def __run_single(self, oup = True):
terminated = False
while (terminated == False):
str2call= self.Cprog
for key in self.parsinp:
vals = getattr(self,key)
if (vals == []): continue # ignore arguments that are not set
str2call += ' -' + key # set the key and then the value/s
if isinstance(vals, (list, np.ndarray)):
for val in vals:
str2call += ' ' + str(val)
else:
str2call += ' ' + str(vals)
if oup: print(str2call)
p = subprocess.Popen(str2call.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT);
if getattr(self,'nos') != 0:
terminated = True
for line_b in p.stdout.readlines():
line = line_b.decode('utf-8').rstrip()
if oup: print(line),
if '.h5' in line:
self.HDFfile = line
terminated = True
if 'Unable to open device' in line:
terminated = True
if 'Muted output, exiting immediate' in line:
terminated = True
if self.Cprog + ': not found' in line:
terminated = True
if 'Devices found: 0' in line:
terminated = True
if 'Segmentation' in line:
self.segcount += 1
terminated = False
self.retval = p.wait()
if terminated == False:
print('RE-RUNNING DUE TO PROBLEM WITH SAVING!!!')
# class for accessing data of stored HDF5 file
class HDF():
def __init__(self, filename = ''):
# check first for the filename provided
if filename != '':
self.HDFsrc = filename
else:
self.HDFsrc = ''
# get data
self.__get_data()
# just an alias for __init__ that does load a specific file
def load(self, filename = ''):
self.__init__(filename)
# gets the data of the file
def __get_data(self):
if (self.HDFsrc == '') | (self.HDFsrc == []):
# initialize all as empty
self.tdy = []
self.tdx = []
self.attrs = []
self.parsoutp = {}
self.parvar = {}
else:
f = h5py.File(self.HDFsrc, 'r')
HDFkeys = list(f.keys())
for ii, HDFkey in enumerate(HDFkeys):
if ii == 0:
# initialize data array
dsize = f[HDFkey].shape
inddim = dsize[0]
self.tdy = np.zeros((int(dsize[1]/2), int(dsize[0] * len(HDFkeys))),dtype=np.complex_)
# initialize the output objects
self.attrs = [dynclass() for jj in range(len(HDFkeys))]
# get the attribute keys
self.parsoutp = {}
ii_oupargs = 0
for item in f[HDFkey].attrs.items():
itemname = item[0][5:]
itemarg = item[0][1:4]
if not ('///' in itemarg):
self.parsoutp[itemarg] = [ item[1], itemname]
else:
self.parsoutp['//'+str(ii_oupargs)] = [ item[1], itemname]
ii_oupargs+=1
# look for eventual parvar lists
self.parvar = {}
for item in f.attrs.items():
self.parvar[item[0]] = item[1]
# Get the data
data_raw = np.array(f[HDFkey])
try:
self.tdy[:,ii*inddim:(ii+1)*inddim] = np.transpose(np.float_(data_raw[:,::2])) + 1j*np.transpose(np.float_(data_raw[:,1::2]))
except:
pass
# Get the arguments
ii_oupargs = 0
for item in f[HDFkey].attrs.items():
itemname = item[0][5:]
itemarg = item[0][1:4]
if not ('///' in itemarg):
setattr(self.attrs[ii], itemarg, item[1])
else:
setattr(self.attrs[ii], '//'+str(ii_oupargs), item[1])
ii_oupargs+=1
f.close()
srate_MHz = getattr(self.attrs[0], 'sra')*1e-6
self.tdx = 1/srate_MHz*np.arange(self.tdy.shape[0])
# get an argument by matching the text description
def attr_by_txt(self, pattern):
for key in sorted(self.parsoutp):
if pattern in self.parsoutp[key][1]: # pattern match
attr = getattr(self.attrs[0], key)
try:
ouparr = np.zeros( ( len(attr), len(self.attrs)), attr.dtype)
except:
ouparr = np.zeros( ( 1, len(self.attrs)), attr.dtype)
for ii in np.arange(len(self.attrs)):
ouparr[:,ii] = getattr(self.attrs[ii], key)
return np.transpose(ouparr)
print('Problem obtaining the attribute from the description using the pattern ' + pattern + '!')
print('Valid descriptions are: ')
self.print_params()
# get an argument by key
def attr_by_key(self, key):
if key in dir(self.attrs[0]):
attr = getattr(self.attrs[0], key)
try:
ouparr = np.zeros( ( len(attr), len(self.attrs)), attr.dtype)
except:
ouparr = np.zeros( ( 1, len(self.attrs)), attr.dtype)
for ii in np.arange(len(self.attrs)):
ouparr[:,ii] = getattr(self.attrs[ii], key)
return np.transpose(ouparr)
print('Problem obtaining the attribute from key ' + key + '!')
print('Valid keys are: ')
self.print_params()
# print the arguments
def print_params(self, ouponly = False):
for key in sorted(self.parsoutp):
val = getattr(self.attrs[0], key)
if not('//' in key): # input argument?
if ouponly: continue;
print('{:<5}: {:>50} {:<25}'.format(key, val, self.parsoutp[key][1]))
def plot_dta(self, fignum = 1, stack = False, dtamax = 0.0):
if (fignum == 1) & stack: fignum = 2;
if self.tdy != []:
if dtamax == 0:
dtamax = np.max(np.max(abs(self.tdy),axis=0))
offset = 1.5*dtamax
plt.figure(fignum)
plt.clf()
if stack:
for ii in np.arange(self.tdy.shape[1]):
plt.plot(self.tdx, self.tdy[:,ii].real + ii* offset)
else:
plt.plot(self.tdx, self.tdy.real)
plt.xlabel('$t$ [$\mu$s]')
plt.ylabel('$y$ [Counts]')
# empty class to store dynamic attributes, basically for the attributes in HDF keys
class dynclass:
pass
# addendum that does not fit 100% into this class file, but is related
# class to control the E3631A via serial interface
import serial
import time
from os import listdir
class PSU():
def __init__(self):
self.GperV = 14.309
self.sleeptime = 0.4
devdir = '/dev/'
ttydevs = [f for f in listdir(devdir) if 'ttyUSB' in f]
# ttydev = devdir + [f for f in ttydevs if int(f[-1]) > 4][0]
ttydev = devdir + [f for f in ttydevs][0]
self.psu=serial.Serial(ttydev, stopbits=2, dsrdtr=True)
# read at the beginning to remove eventual junk
response = self.psu.read_all()
self.psu.write("*IDN?\r\n")
time.sleep(self.sleeptime)
response = self.psu.read_all()
if response == 'HEWLETT-PACKARD,E3631A,0,2.1-5.0-1.0\r\n':
print('Success in opening the HP PSU!')
else:
print('Fail!!!')
self.psu.write("INST:SEL P6V\r\n")
time.sleep(self.sleeptime)
self.psu.write("OUTP:STAT ON\r\n")
time.sleep(self.sleeptime)
self.psu.close()
def getVoltage(self):
if not self.psu.isOpen():
self.psu.open()
# read at the beginning to remove eventual junk
self.psu.read_all()
time.sleep(self.sleeptime)
self.psu.write("VOLT?\r\n")
time.sleep(self.sleeptime)
actval = float(self.psu.read_all())
self.psu.close()
return actval
def setVoltage(self, setval, dV = 0.02, ramptime = 0.1):
actval = self.getVoltage()
diff = setval - actval
dVsigned = dV * (-1 if diff < 0 else 1)
if not self.psu.isOpen():
self.psu.open()
while (abs(diff) > dV):
actval += dVsigned
diff -= dVsigned
self.psu.write("VOLT " + str(actval) + "\r\n")
time.sleep(ramptime)
self.psu.write("VOLT " + str(setval) + "\r\n")
time.sleep(ramptime)
self.psu.close()
def getField(self):
return self.getVoltage() * self.GperV
def setField(self, field):
return self.setVoltage(field / self.GperV)

File diff suppressed because it is too large Load diff

View file

@ -3,6 +3,10 @@ import tempfile
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
from decimal import Decimal from decimal import Decimal
from limedriver.binding import PyLimeConfig
from limedriver.hdf_reader import HDF
from nqrduck.module.module_controller import ModuleController from nqrduck.module.module_controller import ModuleController
from nqrduck_spectrometer.base_spectrometer_controller import BaseSpectrometerController from nqrduck_spectrometer.base_spectrometer_controller import BaseSpectrometerController
from nqrduck_spectrometer.measurement import Measurement from nqrduck_spectrometer.measurement import Measurement
@ -47,44 +51,49 @@ class LimeNQRController(BaseSpectrometerController):
def initialize_lime(self): def initialize_lime(self):
"""This method initializes the limr object that is used to communicate with the pulseN driver.""" """This method initializes the limr object that is used to communicate with the pulseN driver."""
try: try:
from .contrib.limr import limr # driver_path = str(Path(__file__).parent / "contrib/pulseN_test_USB.cpp")
driver_path = str(Path(__file__).parent / "contrib/pulseN_test_USB.cpp") n_pulses = self.get_number_of_pulses()
return limr(driver_path) lime = PyLimeConfig(n_pulses)
return lime
except ImportError as e: except ImportError as e:
logger.error("Error while importing limr: %s", e) logger.error("Error while importing limr: %s", e)
except Exception as e: except Exception as e:
logger.error("Error while initializing Lime driver: %s", e) logger.error("Error while initializing Lime driver: %s", e)
import traceback
traceback.print_exc()
return None return None
def setup_lime_parameters(self, lime): def setup_lime_parameters(self, lime):
"""This method sets the parameters of the limr object according to the settings set in the spectrometer module. """This method sets the parameters of the lime config according to the settings set in the spectrometer module.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
""" """
lime.noi = -1 #lime.noi = -1
lime.nrp = 1 #
# lime.nrp = 1
lime.repetitions = 1
lime = self.update_settings(lime) lime = self.update_settings(lime)
lime = self.translate_pulse_sequence(lime) lime = self.translate_pulse_sequence(lime)
lime.nav = self.module.model.averages lime.averages = self.module.model.averages
self.log_lime_parameters(lime) self.log_lime_parameters(lime)
def setup_temporary_storage(self, lime): def setup_temporary_storage(self, lime):
"""This method sets up the temporary storage for the measurement data. """This method sets up the temporary storage for the measurement data.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
""" """
temp_dir = tempfile.TemporaryDirectory() temp_dir = tempfile.TemporaryDirectory()
logger.debug("Created temporary directory at: %s", temp_dir.name) logger.debug("Created temporary directory at: %s", temp_dir.name)
lime.spt = Path(temp_dir.name) # Temporary storage path lime.save_path = str(Path(temp_dir.name)) + "/" # Temporary storage path
lime.fpa = "temp" # Temporary filename prefix or related config lime.file_pattern = "temp" # Temporary filename prefix or related config
def perform_measurement(self, lime): def perform_measurement(self, lime):
"""This method executes the measurement procedure. """This method executes the measurement procedure.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
Returns: Returns:
bool: True if the measurement was successful, False otherwise bool: True if the measurement was successful, False otherwise
@ -92,7 +101,6 @@ class LimeNQRController(BaseSpectrometerController):
logger.debug("Running the measurement procedure") logger.debug("Running the measurement procedure")
try: try:
lime.run() lime.run()
lime.readHDF()
return True return True
except Exception as e: except Exception as e:
logger.error("Failed to execute the measurement: %s", e) logger.error("Failed to execute the measurement: %s", e)
@ -102,7 +110,7 @@ class LimeNQRController(BaseSpectrometerController):
"""This method processes the measurement results and returns a Measurement object. """This method processes the measurement results and returns a Measurement object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
Returns: Returns:
Measurement: The measurement data Measurement: The measurement data
@ -117,7 +125,7 @@ class LimeNQRController(BaseSpectrometerController):
"""This method calculates the measurement data from the limr object. """This method calculates the measurement data from the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
rx_begin (float): The start time of the RX event in µs rx_begin (float): The start time of the RX event in µs
rx_stop (float): The stop time of the RX event in µs rx_stop (float): The stop time of the RX event in µs
@ -125,38 +133,41 @@ class LimeNQRController(BaseSpectrometerController):
Measurement: The measurement data Measurement: The measurement data
""" """
try: try:
evidx = self.find_evaluation_range_indices(lime, rx_begin, rx_stop) path = lime.get_path()
tdx, tdy = self.extract_measurement_data(lime, evidx) hdf = HDF(path)
evidx = self.find_evaluation_range_indices(hdf, rx_begin, rx_stop)
tdx, tdy = self.extract_measurement_data(lime, hdf, evidx)
fft_shift = self.get_fft_shift() fft_shift = self.get_fft_shift()
return Measurement(tdx, tdy, self.module.model.target_frequency, frequency_shift=fft_shift, IF_frequency=self.module.model.if_frequency) return Measurement(tdx, tdy, self.module.model.target_frequency, frequency_shift=fft_shift, IF_frequency=self.module.model.if_frequency)
except Exception as e: except Exception as e:
logger.error("Error processing measurement result: %s", e) logger.error("Error processing measurement result: %s", e)
return None return None
def find_evaluation_range_indices(self, lime, rx_begin, rx_stop): def find_evaluation_range_indices(self, hdf, rx_begin, rx_stop):
"""This method finds the indices of the evaluation range in the measurement data. """This method finds the indices of the evaluation range in the measurement data.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver HDF (HDF): The HDF object that is used to read the measurement data
rx_begin (float): The start time of the RX event in µs rx_begin (float): The start time of the RX event in µs
rx_stop (float): The stop time of the RX event in µs rx_stop (float): The stop time of the RX event in µs
Returns: Returns:
list: The indices of the evaluation range in the measurement data""" list: The indices of the evaluation range in the measurement data"""
return np.where((lime.HDF.tdx > rx_begin) & (lime.HDF.tdx < rx_stop))[0] return np.where((hdf.tdx > rx_begin) & (hdf.tdx < rx_stop))[0]
def extract_measurement_data(self, lime, indices): def extract_measurement_data(self, lime, hdf, indices):
"""This method extracts the measurement data from the limr object. """This method extracts the measurement data from the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
HDF (HDF): The HDF object that is used to read the measurement data
indices (list): The indices of the evaluation range in the measurement data indices (list): The indices of the evaluation range in the measurement data
Returns: Returns:
tuple: A tuple containing the time vector and the measurement data tuple: A tuple containing the time vector and the measurement data
""" """
tdx = lime.HDF.tdx[indices] - lime.HDF.tdx[indices][0] tdx = hdf.tdx[indices] - hdf.tdx[indices][0]
tdy = lime.HDF.tdy[indices] / lime.nav tdy = hdf.tdy[indices] / lime.averages
# flatten the tdy array # flatten the tdy array
tdy = tdy.flatten() tdy = tdy.flatten()
return tdx, tdy return tdx, tdy
@ -199,81 +210,79 @@ class LimeNQRController(BaseSpectrometerController):
"""This method logs the parameters of the limr object. """This method logs the parameters of the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
""" """
for key in sorted(lime.parsinp): # for key, value in lime.__dict__.items():
val = getattr(lime, key, []) # logger.debug("Lime parameter %s has value %s", key, value)
if val: logger.debug("Lime parameter %s has value %s", "srate", lime.srate)
logger.debug(f"{key}: {val} {lime.parsinp[key][1]}")
def update_settings(self, lime): def update_settings(self, lime):
"""This method sets the parameters of the limr object according to the settings set in the spectrometer module. """This method sets the parameters of the limr object according to the settings set in the spectrometer module.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
Returns: Returns:
limr: The updated limr object""" lime: The updated limr object"""
logger.debug( logger.debug(
"Updating settings for spectrometer: %s for measurement", "Updating settings for spectrometer: %s for measurement",
self.module.model.name, self.module.model.name,
) )
lime.t3d = [0, 0, 0, 0] lime.c3_tim = [0, 0, 0, 0]
# I don't like this code # I don't like this code
for category in self.module.model.settings.keys(): for category in self.module.model.settings.keys():
for setting in self.module.model.settings[category]: for setting in self.module.model.settings[category]:
logger.debug("Setting %s has value %s", setting.name, setting.value) logger.debug("Setting %s has value %s", setting.name, setting.value)
# Acquisiton settings # Acquisiton settings
if setting.name == self.module.model.SAMPLING_FREQUENCY: if setting.name == self.module.model.SAMPLING_FREQUENCY:
lime.sra = setting.get_setting() lime.srate = setting.get_setting()
# Careful this doesn't only set the IF frequency but the local oscillator frequency # Careful this doesn't only set the IF frequency but the local oscillator frequency
elif setting.name == self.module.model.IF_FREQUENCY: elif setting.name == self.module.model.IF_FREQUENCY:
lime.lof = ( lime.frq = self.module.model.target_frequency - setting.get_setting()
self.module.model.target_frequency - setting.get_setting()
)
self.module.model.if_frequency = setting.get_setting() self.module.model.if_frequency = setting.get_setting()
elif setting.name == self.module.model.ACQUISITION_TIME: elif setting.name == self.module.model.ACQUISITION_TIME:
lime.tac = setting.get_setting() lime.rectime_secs = setting.get_setting()
# Gate settings # Gate settings
elif setting.name == self.module.model.GATE_ENABLE: elif setting.name == self.module.model.GATE_ENABLE:
lime.t3d[0] = int(setting.value) lime.c3_tim[0] = int(setting.get_setting())
elif setting.name == self.module.model.GATE_PADDING_LEFT: elif setting.name == self.module.model.GATE_PADDING_LEFT:
lime.t3d[1] = int(setting.get_setting()) lime.c3_tim[1] = int(setting.get_setting())
elif setting.name == self.module.model.GATE_SHIFT: elif setting.name == self.module.model.GATE_SHIFT:
lime.t3d[2] = int(setting.get_setting()) lime.c3_tim[2] = int(setting.get_setting())
elif setting.name == self.module.model.GATE_PADDING_RIGHT: elif setting.name == self.module.model.GATE_PADDING_RIGHT:
lime.t3d[3] = int(setting.get_setting()) lime.c3_tim[3] = int(setting.get_setting())
# RX/TX settings # RX/TX settings
elif setting.name == self.module.model.TX_GAIN: elif setting.name == self.module.model.TX_GAIN:
lime.tgn = setting.get_setting() lime.TX_gain = setting.get_setting()
elif setting.name == self.module.model.RX_GAIN: elif setting.name == self.module.model.RX_GAIN:
lime.rgn = setting.get_setting() lime.RX_gain = setting.get_setting()
elif setting.name == self.module.model.RX_LPF_BW: elif setting.name == self.module.model.RX_LPF_BW:
lime.rlp = setting.get_setting() lime.RX_LPF = setting.get_setting()
elif setting.name == self.module.model.TX_LPF_BW: elif setting.name == self.module.model.TX_LPF_BW:
lime.tlp = setting.get_setting() lime.TX_LPF = setting.get_setting()
# Calibration settings # Calibration settings
elif setting.name == self.module.model.TX_I_DC_CORRECTION: elif setting.name == self.module.model.TX_I_DC_CORRECTION:
lime.tdi = setting.get_setting() lime.TX_IcorrDC = setting.get_setting()
elif setting.name == self.module.model.TX_Q_DC_CORRECTION: elif setting.name == self.module.model.TX_Q_DC_CORRECTION:
lime.tdq = setting.get_setting() lime.TX_QcorrDC = setting.get_setting()
# This stuff doesn"t seem to be implemented in the LimeDriver
elif setting.name == self.module.model.TX_I_GAIN_CORRECTION: elif setting.name == self.module.model.TX_I_GAIN_CORRECTION:
lime.tgi = setting.get_setting() pass
elif setting.name == self.module.model.TX_Q_GAIN_CORRECTION: elif setting.name == self.module.model.TX_Q_GAIN_CORRECTION:
lime.tgq = setting.get_setting() pass
elif setting.name == self.module.model.TX_PHASE_ADJUSTMENT: elif setting.name == self.module.model.TX_PHASE_ADJUSTMENT:
lime.tpc = setting.get_setting() pass
elif setting.name == self.module.model.RX_I_DC_CORRECTION: elif setting.name == self.module.model.RX_I_DC_CORRECTION:
lime.rdi = setting.get_setting() pass
elif setting.name == self.module.model.RX_Q_DC_CORRECTION: elif setting.name == self.module.model.RX_Q_DC_CORRECTION:
lime.rdq = setting.get_setting() pass
elif setting.name == self.module.model.RX_I_GAIN_CORRECTION: elif setting.name == self.module.model.RX_I_GAIN_CORRECTION:
lime.rgi = setting.get_setting() pass
elif setting.name == self.module.model.RX_Q_GAIN_CORRECTION: elif setting.name == self.module.model.RX_Q_GAIN_CORRECTION:
lime.rgq = setting.get_setting() pass
elif setting.name == self.module.model.RX_PHASE_ADJUSTMENT: elif setting.name == self.module.model.RX_PHASE_ADJUSTMENT:
lime.rpc = setting.get_setting() pass
return lime return lime
@ -281,10 +290,12 @@ class LimeNQRController(BaseSpectrometerController):
"""This method translates the pulse sequence to the limr object. """This method translates the pulse sequence to the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
""" """
events = self.fetch_pulse_sequence_events() events = self.fetch_pulse_sequence_events()
first_pulse = True
for event in events: for event in events:
self.log_event_details(event) self.log_event_details(event)
for parameter in event.parameters.values(): for parameter in event.parameters.values():
@ -294,24 +305,56 @@ class LimeNQRController(BaseSpectrometerController):
pulse_shape, pulse_amplitude = self.prepare_pulse_amplitude(event, parameter) pulse_shape, pulse_amplitude = self.prepare_pulse_amplitude(event, parameter)
pulse_amplitude, modulated_phase = self.modulate_pulse_amplitude(pulse_amplitude, event, lime) pulse_amplitude, modulated_phase = self.modulate_pulse_amplitude(pulse_amplitude, event, lime)
if not lime.pfr: # If the pulse frequency list is empty if first_pulse: # If the pulse frequency list is empty
self.initialize_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase) pfr, pdr, pam, pof, pph = self.initialize_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase)
first_pulse = False
else: else:
self.extend_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase) pfr_ext, pdr_ext, pam_ext, pph_ext = self.extend_pulse_lists(lime, pulse_amplitude, pulse_shape, modulated_phase)
self.calculate_and_set_offsets(lime, pulse_shape, events, event, pulse_amplitude) pof_ext = self.calculate_and_set_offsets(lime, pulse_shape, events, event, pulse_amplitude)
pfr.extend(pfr_ext)
pdr.extend(pdr_ext)
pam.extend(pam_ext)
pof.extend(pof_ext)
pph.extend(pph_ext)
lime.p_frq = pfr
lime.p_dur = pdr
lime.p_amp = pam
lime.p_offs = pof
lime.p_pha = pph
# Set repetition time event as last event's duration and update number of pulses # Set repetition time event as last event's duration and update number of pulses
lime.trp = float(event.duration) lime.reptime_secs = float(event.duration)
lime.npu = len(lime.pfr) lime.Npulses = len(lime.p_frq)
return lime return lime
def get_number_of_pulses(self):
""" This method calculates the number of pulses in the pulse sequence before the LimeDriverBinding is initialized.
This makes sure it"s initialized with the correct size of the pulse lists.
Returns:
int: The number of pulses in the pulse sequence
"""
events = self.fetch_pulse_sequence_events()
num_pulses = 0
for event in events:
for parameter in event.parameters.values():
if self.is_translatable_tx_parameter(parameter):
_, pulse_amplitude = self.prepare_pulse_amplitude(event, parameter)
num_pulses += len(pulse_amplitude)
logger.debug("Number of pulses: %s", num_pulses)
return num_pulses
# Helper functions below: # Helper functions below:
def fetch_pulse_sequence_events(self): def fetch_pulse_sequence_events(self):
"""This method fetches the pulse sequence events from the pulse programmer module. """This method fetches the pulse sequence events from the pulse programmer module.
Returns: Returns:
list: The pulse sequence events""" list: The pulse sequence events
"""
return self.module.model.pulse_programmer.model.pulse_sequence.events return self.module.model.pulse_programmer.model.pulse_sequence.events
def log_event_details(self, event): def log_event_details(self, event):
@ -349,12 +392,13 @@ class LimeNQRController(BaseSpectrometerController):
Args: Args:
pulse_amplitude (float): The pulse amplitude pulse_amplitude (float): The pulse amplitude
event (Event): The event that contains the parameter event (Event): The event that contains the parameter
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig) : The PyLimeConfig object that is used to communicate with the pulseN driver
Returns: Returns:
tuple: A tuple containing the modulated pulse amplitude and the modulated phase tuple: A tuple containing the modulated pulse amplitude and the modulated phase
""" """
num_samples = int(float(event.duration) * lime.sra) # num_samples = int(float(event.duration) * lime.sra)
num_samples = int(float(event.duration) * lime.srate)
tdx = np.linspace(0, float(event.duration), num_samples, endpoint=False) tdx = np.linspace(0, float(event.duration), num_samples, endpoint=False)
shift_signal = np.exp(1j * 2 * np.pi * self.module.model.if_frequency * tdx) shift_signal = np.exp(1j * 2 * np.pi * self.module.model.if_frequency * tdx)
pulse_complex = pulse_amplitude * shift_signal pulse_complex = pulse_amplitude * shift_signal
@ -373,37 +417,42 @@ class LimeNQRController(BaseSpectrometerController):
"""This method initializes the pulse lists of the limr object. """This method initializes the pulse lists of the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
pulse_amplitude (float): The pulse amplitude pulse_amplitude (float): The pulse amplitude
pulse_shape (PulseShape): The pulse shape pulse_shape (PulseShape): The pulse shape
modulated_phase (float): The modulated phase modulated_phase (float): The modulated phase
""" """
lime.pfr = [float(self.module.model.if_frequency)] * len(pulse_amplitude) pfr = [float(self.module.model.if_frequency)] * len(pulse_amplitude)
lime.pdr = [float(pulse_shape.resolution)] * len(pulse_amplitude) # We set the first len(pulse_amplitude) of the p_dur
lime.pam = list(pulse_amplitude) pdr = [float(pulse_shape.resolution)] * len(pulse_amplitude)
lime.pof = ([self.module.model.OFFSET_FIRST_PULSE] + pam = list(pulse_amplitude)
[int(pulse_shape.resolution * Decimal(lime.sra))] * (len(pulse_amplitude) - 1)) pof = ([self.module.model.OFFSET_FIRST_PULSE] +
lime.pph = list(modulated_phase) [int(pulse_shape.resolution * Decimal(lime.srate))] * (len(pulse_amplitude) - 1))
pph = list(modulated_phase)
return pfr, pdr, pam, pof, pph
def extend_pulse_lists(self, lime, pulse_amplitude, pulse_shape, modulated_phase): def extend_pulse_lists(self, lime, pulse_amplitude, pulse_shape, modulated_phase):
"""This method extends the pulse lists of the limr object. """This method extends the pulse lists of the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
pulse_amplitude (float): The pulse amplitude pulse_amplitude (float): The pulse amplitude
pulse_shape (PulseShape): The pulse shape pulse_shape (PulseShape): The pulse shape
modulated_phase (float): The modulated phase modulated_phase (float): The modulated phase
""" """
lime.pfr.extend([float(self.module.model.if_frequency)] * len(pulse_amplitude)) pfr = ([float(self.module.model.if_frequency)] * len(pulse_amplitude))
lime.pdr.extend([float(pulse_shape.resolution)] * len(pulse_amplitude)) pdr = ([float(pulse_shape.resolution)] * len(pulse_amplitude))
lime.pam.extend(list(pulse_amplitude)) pam = (list(pulse_amplitude))
lime.pph.extend(list(modulated_phase)) pph = (list(modulated_phase))
return pfr, pdr, pam, pph
def calculate_and_set_offsets(self, lime, pulse_shape, events, current_event, pulse_amplitude): def calculate_and_set_offsets(self, lime, pulse_shape, events, current_event, pulse_amplitude):
"""This method calculates and sets the offsets for the limr object. """This method calculates and sets the offsets for the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
pulse_shape (PulseShape): The pulse shape pulse_shape (PulseShape): The pulse shape
events (list): The pulse sequence events events (list): The pulse sequence events
current_event (Event): The current event current_event (Event): The current event
@ -415,15 +464,16 @@ class LimeNQRController(BaseSpectrometerController):
total_blank_duration = sum(blank_durations) total_blank_duration = sum(blank_durations)
# Calculate the offset for the current pulse # Calculate the offset for the current pulse
# The first pulse offset is already set, so calculate subsequent ones # The first pulse offset is already set, so calculate subsequent ones
offset_for_current_pulse = int(np.ceil(total_blank_duration * lime.sra)) offset_for_current_pulse = int(np.ceil(total_blank_duration * lime.srate))
# Offset for the current pulse should be added only once # Offset for the current pulse should be added only once
lime.pof.append(offset_for_current_pulse) pof = (offset_for_current_pulse)
# Set the offset for the remaining samples of the current pulse (excluding the first sample) # Set the offset for the remaining samples of the current pulse (excluding the first sample)
# We subtract 1 because we have already set the offset for the current pulse's first sample # We subtract 1 because we have already set the offset for the current pulse's first sample
offset_per_sample = int(float(pulse_shape.resolution) * lime.sra) offset_per_sample = int(float(pulse_shape.resolution) * lime.srate)
lime.pof.extend([offset_per_sample] * (len(pulse_amplitude) - 1)) pof.extend([offset_per_sample] * (len(pulse_amplitude) - 1))
return pof
def get_blank_durations_before_event(self, events, current_event): def get_blank_durations_before_event(self, events, current_event):
"""This method returns the blank durations before the current event. """This method returns the blank durations before the current event.
@ -467,7 +517,7 @@ class LimeNQRController(BaseSpectrometerController):
"""This method translates the RX event of the pulse sequence to the limr object. """This method translates the RX event of the pulse sequence to the limr object.
Args: Args:
lime (limr): The limr object that is used to communicate with the pulseN driver lime (PyLimeConfig): The PyLimeConfig object that is used to communicate with the pulseN driver
Returns: Returns:
tuple: A tuple containing the start and stop time of the RX event in µs tuple: A tuple containing the start and stop time of the RX event in µs
@ -527,7 +577,7 @@ class LimeNQRController(BaseSpectrometerController):
Returns: Returns:
float: The offset for the RX event float: The offset for the RX event
""" """
return self.module.model.OFFSET_FIRST_PULSE * (1 / lime.sra) return self.module.model.OFFSET_FIRST_PULSE * (1 / lime.srate)
def set_frequency(self, value: float): def set_frequency(self, value: float):