Started implementation of lime communication.

This commit is contained in:
jupfi 2023-07-14 15:08:36 +02:00
parent ecd779ed1f
commit 3ef32c2c53
3 changed files with 128 additions and 25 deletions

3
.gitignore vendored
View file

@ -23,4 +23,5 @@ venv/
# Contrib files without permission
src/nqrduck_spectrometer_limenqr/contrib/limr.py
src/nqrduck_spectrometer_limenqr/contrib/pulseN_test_USB.cpp
src/nqrduck_spectrometer_limenqr/contrib/pulseN_test_USB.cpp
src/nqrduck_spectrometer_limenqr/contrib/pulseN_test_USB

View file

@ -26,6 +26,8 @@ classifiers = [
dependencies = [
"nqrduck-spectrometer",
"pyqt6",
"h5py",
"serial",
]
[project.entry-points."nqrduck"]

View file

@ -1,4 +1,7 @@
import logging
import tempfile
from scipy.fftpack import fft, fftshift, fftfreq
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
from nqrduck.module.module_controller import ModuleController
@ -26,75 +29,172 @@ class LimeNQRController(BaseSpectrometerController):
except Exception as e:
logger.error("Error while loading pulseN_test_USB.cpp: %s", e)
lime.noi = -1 # No initialisation
lime.nrp = 1 # Numer of repetitions
lime.tdi = -45 # TX I DC correction
lime.tdq = 0 # TX Q DC correction
lime.tgi = 2047 # TX I Gain correction
lime.tgq = 2039 # TX Q Gain correction
lime.tpc = 3 # TX phase adjustment
lime.rgi = 2047
lime.rgq = 2047
lime.rdi = 0
lime.rdq = 0
lime.rpc = 0
lime = self.update_settings(lime)
lime = self.translate_pulse_sequence(lime)
for key in sorted(lime.parsinp):
val = getattr(lime, key)
if val != []:
# logger.debug("Attribute: %s, Value: %s, Descr.: %s" % key, val, lime.parsinp[key][1])
logger.debug(key + ": " + str(val) + " " + lime.parsinp[key][1])
# Create temp folder for .hdf files
temp_dir = tempfile.TemporaryDirectory()
path = Path(temp_dir.name)
logger.debug("Created temporary directory: %s", path)
lime.spt = path
lime.fpa = "temp"
logger.debug("Starting measurement")
lime.run()
logger.debug("Reading hdf file")
lime.readHDF()
#evaluation range, defines: blanking time and window length
evran = [13, 55]
#np.where sometimes does not work out, so it is put in a try except
#always check the console for errors
try:
evidx = np.where( (lime.HDF.tdx > evran[0]) & (lime.HDF.tdx < evran[1]) )[0]
except:
print("error due to np.where evaluation!")
#time domain x and y data
tdx = lime.HDF.tdx[evidx]
tdy = lime.HDF.tdy[evidx]
#correcting a offset in the time domain by subtracting the mean
tdy_mean = tdy[:,0]-np.mean(tdy)
#fft of the corrected time domain data
fdy1 = fftshift(fft(tdy_mean,axis=0),axes=0)
#fft freq and fft shift is here used to scale the x axis (frequency axis)
fdx1 = fftfreq(len(fdy1))*lime.sra/1e6
fdx1 = fftshift(fdx1)
#scaling factor which converts the y axis (usually a proportional number of points) into uV
fac_p_to_uV = 447651/1e6
#tdy_mean = tdy_mean/l.nav/fac_p_to_uV/RX_gainfactor
plt.figure(1);
plt.plot(tdx,tdy_mean/lime.nav)
plt.xlabel("t in µs")
plt.ylabel("Amplitude / points")
plt.show()
def update_settings(self, lime):
logger.debug("Updating settings for spectrometer: %s for measurement", self.module.model.name)
l.t3d = [0, 0, 0, 0]
lime.t3d = [0, 0, 0, 0]
for category in self.module.model.settings.keys():
for setting in self.module.model.settings[category]:
logger.debug("Setting %s has value %s", setting.name, setting.value)
if setting.name == "RX Gain":
lime.rgn = setting.value
lime.rgn = setting.get_setting()
elif setting.name == "TX Gain":
lime.tgn = setting.value
lime.tgn = setting.get_setting()
elif setting.name == "Averages":
lime.nav = setting.value
lime.nav = int(setting.get_setting())
elif setting.name == "Sampling Frequency":
lime.sra = setting.value
lime.sra = setting.get_setting()
elif setting.name == "RX LPF BW":
lime.rlp = setting.value
lime.rlp = setting.get_setting()
elif setting.name == "TX LPF BW":
lime.tlp = setting.value
elif setting.name == "IF frequency":
lime.lof = self.target_frequency - setting.value
lime.tlp = setting.get_setting()
elif setting.name == "IF Frequency":
lime.lof = self.module.model.target_frequency - setting.get_setting()
elif setting.name == "Acquisition time":
lime.tac = 82e-6
elif setting.name == "Enable":
lime.t3d[0] = int(setting.value)
elif setting.name == "Gate padding left":
lime.t3d[1] = setting.value
lime.t3d[1] = int(setting.get_setting())
elif setting.name == "Gate shift":
lime.t3d[2] = setting.value
lime.t3d[2] = int(setting.get_setting())
elif setting.name == "Gate padding right":
lime.t3d[3] = setting.value
lime.t3d[3] = int(setting.get_setting())
elif setting.name == "Acquisition time":
lime.tac = setting.get_setting()
return lime
def translate_pulse_sequence(self, lime):
"""This method translates the pulse sequence into the format required by the lime spectrometer.
"""
events = self.module.model.pulse_programmer.model.pulse_sequence.events
for event in self.module.model.pulse_programmer.model.pulse_sequence.events.values():
for event in events:
logger.debug("Event %s has parameters: %s", event.name, event.parameters)
for parameter in event.parameters.values():
logger.debug("Parameter %s has options: %s", parameter.name, parameter.options)
if parameter.name == "TX":
if parameter.name == "TX" and parameter.options["TX Amplitude"].value > 0:
if len(lime.pfr) == 0:
# Add the TX pulse to the pulse frequency list (lime.pfr)
lime.pfr = [self.module.model.if_frequency]
lime.pfr = [float(self.module.model.if_frequency)]
# Add the duration of the TX pulse to the pulse duration list (lime.pdr)
lime.pdr = [event.duration]
lime.pdr = [float(event.duration)]
# Add the TX pulse amplitude to the pulse amplitude list (lime.pam)
lime.pam = [parameter.options["TX Amplitude"].value]
lime.pam = [float(parameter.options["TX Amplitude"].value)]
# Add the pulse offset to the pulse offset list (lime.pof)
# This leads to a default offset of 300 samples for the first pulse
lime.pof = [300]
# Add the TX pulse phase to the pulse phase list (lime.pph) -> not yet implemented
else:
lime.pfr.append(self.module.model.if_frequency)
lime.pdr.append(event.duration)
lime.pam.append(parameter.options["TX Amplitude"].value)
lime.pof.append(np.ceil(lime.pfr[-2] * lime.sra))
logger.debug("Adding TX pulse to existing pulse sequence")
logger.debug("Setting if frequency to: %s", self.module.model.if_frequency)
lime.pfr.append(float(self.module.model.if_frequency))
logger.debug("Setting pulse duration to: %s", event.duration)
lime.pdr.append(float(event.duration))
logger.debug("Setting pulse amplitude to: %s", parameter.options["TX Amplitude"].value)
lime.pam.append(float(parameter.options["TX Amplitude"].value))
# Get the length of the previous event without a tx pulse
blank = []
previous_events = events[:events.index(event)]
# Firstuful this is ugly as hell and needs to be refactored
# Secondly this just sets the pulse offsets.
for prev_event in previous_events[::-1]:
logger.debug("Previous event: %s with duration: %s", prev_event.name, prev_event.duration)
for parameter in prev_event.parameters.values():
if parameter.name == "TX" and parameter.options["TX Amplitude"].value == 0:
blank.append(float(prev_event.duration))
elif parameter.name == "TX" and parameter.options["TX Amplitude"].value > 0:
break
else:
continue
break
logger.debug("Found blanks: %s", blank)
# The acquisition time can be calculated from the buffer length of 4096 samples and the sampling frequency
# 82µs is the shortest possible acquisition time
prev_duration = lime.pdr[-2] + sum(blank)
logger.debug("Setting pulse offset to: %s", prev_duration)
lime.pof.append(np.ceil(prev_duration * lime.sra))
# The last event is the repetition time event
lime.trp = event.duration
lime.trp = float(event.duration)
lime.npu = len(lime.pfr)
return lime