kopia lustrzana https://github.com/NanoVNA-Saver/nanovna-saver
Refactored Calibration
Utilize scipy to implement spline interpolation between calibration data pointspull/205/head
rodzic
4045a18271
commit
3341564cb2
|
@ -17,13 +17,16 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
import logging
|
||||
import cmath
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
from collections import defaultdict, UserDict
|
||||
from typing import List
|
||||
|
||||
import numpy as np
|
||||
from scipy.interpolate import interp1d
|
||||
|
||||
from .RFTools import Datapoint
|
||||
from NanoVNASaver.RFTools import Datapoint
|
||||
|
||||
RXP_CAL_LINE = re.compile(r"""^\s*
|
||||
(?P<freq>\d+) \s+
|
||||
|
@ -37,38 +40,108 @@ RXP_CAL_LINE = re.compile(r"""^\s*
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def correct_delay(d: Datapoint, delay: float, reflect: bool = False):
|
||||
mult = 2 if reflect else 1
|
||||
corr_data = d.z * cmath.exp(
|
||||
complex(0, 1) * 2 * math.pi * d.freq * delay * -1 * mult)
|
||||
return Datapoint(d.freq, corr_data.real, corr_data.imag)
|
||||
|
||||
|
||||
class CalData(UserDict):
|
||||
def __init__(self):
|
||||
data = {
|
||||
"short": None,
|
||||
"open": None,
|
||||
"load": None,
|
||||
"through": None,
|
||||
"isolation": None,
|
||||
# the frequence
|
||||
"freq": 0,
|
||||
# 1 Port
|
||||
"e00": 0.0, # Directivity
|
||||
"e11": 0.0, # Port match
|
||||
"delta_e": 0.0, # Tracking
|
||||
# 2 port
|
||||
"e30": 0.0, # Port match
|
||||
"e10e32": 0.0, # Transmission
|
||||
}
|
||||
super().__init__(data)
|
||||
|
||||
def __str__(self):
|
||||
d = self.data
|
||||
s = (f'{d["freq"]}'
|
||||
f' {d["short"].re} {d["short"].im}'
|
||||
f' {d["open"].re} {d["open"].im}'
|
||||
f' {d["load"].re} {d["load"].im}')
|
||||
if d["through"] is not None:
|
||||
s += (f' {d["through"].re} {d["through"].im}'
|
||||
f' {d["isolation"].re} {d["isolation"].im}')
|
||||
return s
|
||||
|
||||
class CalDataSet:
|
||||
def __init__(self):
|
||||
self.data = defaultdict(CalData)
|
||||
|
||||
def insert(self, name: str, dp: Datapoint):
|
||||
if not name in self.data[dp.freq]:
|
||||
raise KeyError(name)
|
||||
self.data[dp.freq]["freq"] = dp.freq
|
||||
self.data[dp.freq][name] = dp
|
||||
|
||||
def frequencies(self) -> List[int]:
|
||||
return sorted(self.data.keys())
|
||||
|
||||
def get(self, freq: int) -> CalData:
|
||||
return self.data[freq]
|
||||
|
||||
def items(self):
|
||||
for item in self.data.items():
|
||||
yield item
|
||||
|
||||
def values(self):
|
||||
for freq in self.frequencies():
|
||||
yield self.get(freq)
|
||||
|
||||
def size_of(self, name: str) -> int:
|
||||
return len([v for v in self.data.values() if v[name] is not None])
|
||||
|
||||
def complete1port(self) -> bool:
|
||||
for val in self.data.values():
|
||||
for name in ("short", "open", "load"):
|
||||
if val[name] is None:
|
||||
return False
|
||||
return any(self.data)
|
||||
|
||||
def complete2port(self) -> bool:
|
||||
for val in self.data.values():
|
||||
for name in ("short", "open", "load", "through", "isolation"):
|
||||
if val[name] is None:
|
||||
return False
|
||||
return any(self.data)
|
||||
|
||||
|
||||
# TODO: make a real class of calibration
|
||||
class Calibration:
|
||||
CAL_NAMES = ("short", "open", "load", "through", "isolation",)
|
||||
IDEAL_SHORT = complex(-1, 0)
|
||||
IDEAL_OPEN = complex(1, 0)
|
||||
IDEAL_LOAD = complex(0, 0)
|
||||
|
||||
def __init__(self):
|
||||
|
||||
self.notes = []
|
||||
self.cals = {}
|
||||
self._reset_cals()
|
||||
self.frequencies = []
|
||||
# 1-port
|
||||
self.e00 = [] # Directivity
|
||||
self.e11 = [] # Port match
|
||||
self.deltaE = [] # Tracking
|
||||
self.dataset = CalDataSet()
|
||||
self.interp = {}
|
||||
|
||||
# 2-port
|
||||
self.e30 = [] # Port match
|
||||
self.e10e32 = [] # Transmission
|
||||
|
||||
self.shortIdeal = np.complex(-1, 0)
|
||||
self.useIdealShort = True
|
||||
self.shortL0 = 5.7 * 10E-12
|
||||
self.shortL1 = -8960 * 10E-24
|
||||
self.shortL2 = -1100 * 10E-33
|
||||
self.shortL3 = -41200 * 10E-42
|
||||
self.shortLength = -34.2 # Picoseconds
|
||||
self.shortLength = -34.2 # Picoseconfrequenciesds
|
||||
# These numbers look very large, considering what Keysight
|
||||
# suggests their numbers are.
|
||||
|
||||
self.useIdealOpen = True
|
||||
self.openIdeal = np.complex(1, 0)
|
||||
# Subtract 50fF for the nanoVNA calibration if nanoVNA is
|
||||
# calibrated?
|
||||
self.openC0 = 2.1 * 10E-14
|
||||
|
@ -82,7 +155,6 @@ class Calibration:
|
|||
self.loadL = 0
|
||||
self.loadC = 0
|
||||
self.loadLength = 0
|
||||
self.loadIdeal = np.complex(0, 0)
|
||||
|
||||
self.useIdealThrough = True
|
||||
self.throughLength = 0
|
||||
|
@ -91,18 +163,21 @@ class Calibration:
|
|||
|
||||
self.source = "Manual"
|
||||
|
||||
def _reset_cals(self):
|
||||
for name in Calibration.CAL_NAMES:
|
||||
self.cals[name] = []
|
||||
def insert(self, name: str, data: List[Datapoint]):
|
||||
for dp in data:
|
||||
self.dataset.insert(name, dp)
|
||||
|
||||
def isValid1Port(self):
|
||||
lengths = [len(self.cals[x])
|
||||
for x in Calibration.CAL_NAMES[:3]]
|
||||
return min(lengths) > 0 and min(lengths) == max(lengths)
|
||||
def size(self) -> int:
|
||||
return len(self.dataset.frequencies())
|
||||
|
||||
def isValid2Port(self):
|
||||
lengths = [len(self.cals[x]) for x in Calibration.CAL_NAMES]
|
||||
return min(lengths) > 0 and min(lengths) == max(lengths)
|
||||
def data_size(self, name) -> int:
|
||||
return self.dataset.size_of(name)
|
||||
|
||||
def isValid1Port(self) -> bool:
|
||||
return self.dataset.complete1port()
|
||||
|
||||
def isValid2Port(self) -> bool:
|
||||
return self.dataset.complete2port()
|
||||
|
||||
def calc_corrections(self):
|
||||
if not self.isValid1Port():
|
||||
|
@ -111,202 +186,170 @@ class Calibration:
|
|||
raise ValueError(
|
||||
"All of short, open and load calibration steps"
|
||||
"must be completed for calibration to be applied.")
|
||||
nr_points = len(self.cals["short"])
|
||||
logger.debug("Calculating calibration for %d points.", nr_points)
|
||||
self.frequencies = []
|
||||
self.e00 = [np.complex] * nr_points
|
||||
self.e11 = [np.complex] * nr_points
|
||||
self.deltaE = [np.complex] * nr_points
|
||||
self.e30 = [np.complex] * nr_points
|
||||
self.e10e32 = [np.complex] * nr_points
|
||||
if self.useIdealShort:
|
||||
logger.debug("Using ideal values.")
|
||||
else:
|
||||
logger.debug("Using calibration set values.")
|
||||
if self.isValid2Port():
|
||||
logger.debug("Calculating 2-port calibration.")
|
||||
else:
|
||||
logger.debug("Calculating 1-port calibration.")
|
||||
for i, cur_short in enumerate(self.cals["short"]):
|
||||
cur_open = self.cals["open"][i]
|
||||
cur_load = self.cals["load"][i]
|
||||
f = cur_short.freq
|
||||
self.frequencies.append(f)
|
||||
pi = math.pi
|
||||
logger.debug("Calculating calibration for %d points.", self.size())
|
||||
|
||||
if self.useIdealShort:
|
||||
g1 = self.shortIdeal
|
||||
else:
|
||||
Zsp = (
|
||||
np.complex(0, 1) * 2 * pi *
|
||||
f * (self.shortL0 +
|
||||
self.shortL1 * f +
|
||||
self.shortL2 * f**2 +
|
||||
self.shortL3 * f**3))
|
||||
gammaShort = ((Zsp/50) - 1) / ((Zsp/50) + 1)
|
||||
# (lower case) gamma = 2*pi*f
|
||||
# e^j*2*gamma*length
|
||||
# Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21)
|
||||
g1 = gammaShort * np.exp(
|
||||
np.complex(0, 1) * 2 * math.pi *
|
||||
2 * f * self.shortLength * -1)
|
||||
for freq, caldata in self.dataset.items():
|
||||
g1 = self.gamma_short(freq)
|
||||
g2 = self.gamma_open(freq)
|
||||
g3 = self.gamma_load(freq)
|
||||
|
||||
if self.useIdealOpen:
|
||||
g2 = self.openIdeal
|
||||
else:
|
||||
divisor = (
|
||||
2 * pi * f * (
|
||||
self.openC0 + self.openC1 * f +
|
||||
self.openC2 * f**2 + self.openC3 * f**3)
|
||||
)
|
||||
if divisor != 0:
|
||||
Zop = np.complex(0, -1) / divisor
|
||||
gammaOpen = ((Zop/50) - 1) / ((Zop/50) + 1)
|
||||
g2 = gammaOpen * np.exp(
|
||||
np.complex(0, 1) * 2 * math.pi *
|
||||
2 * f * self.openLength * -1)
|
||||
else:
|
||||
g2 = self.openIdeal
|
||||
if self.useIdealLoad:
|
||||
g3 = self.loadIdeal
|
||||
else:
|
||||
Zl = self.loadR + (
|
||||
np.complex(0, 1) * 2 * math.pi * f * self.loadL)
|
||||
g3 = ((Zl/50)-1) / ((Zl/50)+1)
|
||||
g3 = g3 * np.exp(
|
||||
np.complex(0, 1) * 2 * math.pi *
|
||||
2 * f * self.loadLength * -1)
|
||||
|
||||
gm1 = np.complex(cur_short.re, cur_short.im)
|
||||
gm2 = np.complex(cur_open.re, cur_open.im)
|
||||
gm3 = np.complex(cur_load.re, cur_load.im)
|
||||
gm1 = caldata["short"].z
|
||||
gm2 = caldata["open"].z
|
||||
gm3 = caldata["load"].z
|
||||
|
||||
try:
|
||||
denominator = (
|
||||
g1 * (g2 - g3) * gm1 +
|
||||
g2 * g3 * gm2 -
|
||||
g2 * g3 * gm3 -
|
||||
(g2 * gm2 - g3 * gm3) * g1)
|
||||
self.e00[i] = - (
|
||||
(g2 * gm3 - g3 * gm3) * g1 * gm2 -
|
||||
(g2 * g3 * gm2 - g2 * g3 * gm3 -
|
||||
(g3 * gm2 - g2 * gm3) * g1) * gm1
|
||||
) / denominator
|
||||
self.e11[i] = (
|
||||
(g2 - g3) * gm1 - g1 * (gm2 - gm3) +
|
||||
g3 * gm2 - g2 * gm3
|
||||
) / denominator
|
||||
self.deltaE[i] = - (
|
||||
(g1 * (gm2 - gm3) - g2 * gm2 + g3 * gm3) * gm1 +
|
||||
(g2 * gm3 - g3 * gm3) * gm2
|
||||
) / denominator
|
||||
denominator = (g1 * (g2 - g3) * gm1 +
|
||||
g2 * g3 * gm2 - g2 * g3 * gm3 -
|
||||
(g2 * gm2 - g3 * gm3) * g1)
|
||||
caldata["e00"] = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 -
|
||||
(g2 * g3 * gm2 - g2 * g3 * gm3 -
|
||||
(g3 * gm2 - g2 * gm3) * g1) * gm1
|
||||
) / denominator
|
||||
caldata["e11"] = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) +
|
||||
g3 * gm2 - g2 * gm3) / denominator
|
||||
caldata["delta_e"] = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 *
|
||||
gm3) * gm1 + (g2 * gm3 - g3 * gm3) *
|
||||
gm2) / denominator
|
||||
except ZeroDivisionError:
|
||||
self.isCalculated = False
|
||||
logger.error(
|
||||
"Division error - did you use the same measurement"
|
||||
" for two of short, open and load?")
|
||||
logger.debug(
|
||||
"Division error at index %d"
|
||||
" Short == Load: %s Short == Open: %s"
|
||||
" Open == Load: %s", i,
|
||||
cur_short == cur_load, cur_short == cur_open,
|
||||
cur_open == cur_load)
|
||||
raise ValueError(
|
||||
f"Two of short, open and load returned the same"
|
||||
f" values at frequency {f}Hz.")
|
||||
f" values at frequency {freq}Hz.")
|
||||
|
||||
if self.isValid2Port():
|
||||
cur_through = self.cals["through"][i]
|
||||
cur_isolation = self.cals["isolation"][i]
|
||||
|
||||
self.e30[i] = np.complex(
|
||||
cur_isolation.re, cur_isolation.im)
|
||||
s21m = np.complex(cur_through.re, cur_through.im)
|
||||
if not self.useIdealThrough:
|
||||
gammaThrough = np.exp(
|
||||
np.complex(0, 1) * 2 * math.pi *
|
||||
self.throughLength * f * -1)
|
||||
s21m = s21m / gammaThrough
|
||||
self.e10e32[i] = (s21m - self.e30[i]) * (
|
||||
1 - (self.e11[i] * self.e11[i]))
|
||||
gt = self.gamma_through(freq)
|
||||
gm4 = caldata["through"].z
|
||||
caldata["e10e32"] = (gm4 / gt - caldata["e30"]
|
||||
) * (1 - caldata["e11"] ** 2)
|
||||
caldata["e30"] = caldata["isolation"].z
|
||||
|
||||
self.gen_interpolation()
|
||||
self.isCalculated = True
|
||||
logger.debug("Calibration correctly calculated.")
|
||||
|
||||
def correct11(self, re, im, freq):
|
||||
s11m = np.complex(re, im)
|
||||
distance = 10**10
|
||||
index = 0
|
||||
for i, cur_short in enumerate(self.cals["short"]):
|
||||
if abs(cur_short.freq - freq) < distance:
|
||||
index = i
|
||||
distance = abs(cur_short.freq - freq)
|
||||
# TODO: Interpolate with the adjacent data point
|
||||
# to get better corrections?
|
||||
def gamma_short(self, freq: int) -> complex:
|
||||
g = Calibration.IDEAL_SHORT
|
||||
if not self.useIdealShort:
|
||||
logger.debug("Using short calibration set values.")
|
||||
Zsp = complex(0, 1) * 2 * math.pi * freq * (
|
||||
self.shortL0 + self.shortL1 * freq +
|
||||
self.shortL2 * freq**2 + self.shortL3 * freq**3)
|
||||
# Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21)
|
||||
g = ((Zsp / 50 - 1) / (Zsp / 50 + 1)) * cmath.exp(
|
||||
complex(0, 1) * 2 * math.pi * 2 * freq *
|
||||
self.shortLength * -1)
|
||||
return g
|
||||
|
||||
s11 = (s11m - self.e00[index]) / (
|
||||
(s11m * self.e11[index]) - self.deltaE[index])
|
||||
return s11.real, s11.imag
|
||||
def gamma_open(self, freq: int) -> complex:
|
||||
g = Calibration.IDEAL_OPEN
|
||||
if not self.useIdealOpen:
|
||||
logger.debug("Using open calibration set values.")
|
||||
divisor = (2 * math.pi * freq * (
|
||||
self.openC0 + self.openC1 * freq +
|
||||
self.openC2 * freq**2 + self.openC3 * freq**3))
|
||||
if divisor != 0:
|
||||
Zop = complex(0, -1) / divisor
|
||||
g = ((Zop / 50 - 1) / (Zop / 50+ 1)) * cmath.exp(
|
||||
complex(0, 1) * 2 * math.pi *
|
||||
2 * freq * self.openLength * -1)
|
||||
return g
|
||||
|
||||
def correct21(self, re, im, freq):
|
||||
s21m = np.complex(re, im)
|
||||
distance = 10**10
|
||||
index = 0
|
||||
for i, cur_through in enumerate(self.cals["through"]):
|
||||
if abs(cur_through.freq - freq) < distance:
|
||||
index = i
|
||||
distance = abs(cur_through.freq - freq)
|
||||
s21 = (s21m - self.e30[index]) / self.e10e32[index]
|
||||
return s21.real, s21.imag
|
||||
def gamma_load(self, freq: int) -> complex:
|
||||
g = Calibration.IDEAL_LOAD
|
||||
if not self.useIdealLoad:
|
||||
logger.debug("Using load calibration set values.")
|
||||
Zl = self.loadR + (complex(0, 1) * 2 *
|
||||
math.pi * freq * self.loadL)
|
||||
g = (((Zl / 50) - 1) / ((Zl / 50) + 1)) * cmath.exp(
|
||||
complex(0, 1) * 2 * math.pi *
|
||||
2 * freq * self.loadLength * -1)
|
||||
return g
|
||||
|
||||
@staticmethod
|
||||
def correctDelay11(d: Datapoint, delay):
|
||||
input_val = np.complex(d.re, d.im)
|
||||
output = input_val * np.exp(np.complex(0, 1) * 2 * 2 * math.pi * d.freq * delay * -1)
|
||||
return Datapoint(d.freq, output.real, output.imag)
|
||||
def gamma_through(self, freq: int) -> complex:
|
||||
g = complex(1, 0)
|
||||
if not self.useIdealThrough:
|
||||
logger.debug("Using through calibration set values.")
|
||||
g = cmath.exp(complex(0, 1) * 2 * math.pi *
|
||||
self.throughLength * freq * -1)
|
||||
return g
|
||||
|
||||
@staticmethod
|
||||
def correctDelay21(d: Datapoint, delay):
|
||||
input_val = np.complex(d.re, d.im)
|
||||
output = input_val * np.exp(np.complex(0, 1) * 2 * math.pi * d.freq * delay * -1)
|
||||
return Datapoint(d.freq, output.real, output.imag)
|
||||
def gen_interpolation(self):
|
||||
freq = []
|
||||
e00 = []
|
||||
e11 = []
|
||||
delta_e = []
|
||||
e30 = []
|
||||
e10e32 = []
|
||||
|
||||
for caldata in self.dataset.values():
|
||||
freq.append(caldata["freq"])
|
||||
e00.append(caldata["e00"])
|
||||
e11.append(caldata["e11"])
|
||||
delta_e.append(caldata["delta_e"])
|
||||
e30.append(caldata["e30"])
|
||||
e10e32.append(caldata["e10e32"])
|
||||
|
||||
self.interp["e00"] = interp1d(freq, e00, kind="slinear")
|
||||
self.interp["e11"] = interp1d(freq, e11, kind="slinear")
|
||||
self.interp["delta_e"] = interp1d(freq, delta_e, kind="slinear")
|
||||
self.interp["e30"] = interp1d(freq, e30, kind="slinear")
|
||||
self.interp["e10e32"] = interp1d(freq, e10e32, kind="slinear")
|
||||
|
||||
def correct11(self, dp: Datapoint):
|
||||
i = self.interp
|
||||
try:
|
||||
s11 = (dp.z - i["e00"](dp.freq)) / (
|
||||
(dp.z * i["e11"](dp.freq)) - i["delta_e"](dp.freq))
|
||||
return Datapoint(dp.freq, s11.real, s11.imag)
|
||||
except ValueError:
|
||||
# TODO: implement warn message in gui
|
||||
logger.info("Data outside calibration")
|
||||
|
||||
nearest = sorted(self.dataset.frequencies(),
|
||||
key=lambda k: abs(dp.freq - k))[0]
|
||||
ds = self.dataset.get(nearest)
|
||||
s11 = (dp.z - ds["e00"]) / (
|
||||
(dp.z * ds["e11"]) - ds["delta_e"])
|
||||
return Datapoint(dp.freq, s11.real, s11.imag)
|
||||
|
||||
def correct21(self, dp: Datapoint):
|
||||
i = self.interp
|
||||
try:
|
||||
s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
|
||||
return Datapoint(dp.freq, s21.real, s21.imag)
|
||||
except ValueError:
|
||||
# TODO: implement warn message in gui
|
||||
logger.info("Data outside calibration")
|
||||
|
||||
nearest = sorted(self.dataset.frequencies(),
|
||||
key=lambda k: abs(dp.freq - k))[0]
|
||||
ds = self.dataset.get(nearest)
|
||||
s21 = (dp.z - ds["e30"]) / ds["e10e32"]
|
||||
return Datapoint(dp.freq, s21.real, s21.imag)
|
||||
|
||||
# TODO: implement tests
|
||||
def save(self, filename: str):
|
||||
# Save the calibration data to file
|
||||
if not self.isValid1Port():
|
||||
raise ValueError("Not a valid 1-Port calibration")
|
||||
with open(filename, "w+") as calfile:
|
||||
with open(f"{filename}", "w") as calfile:
|
||||
calfile.write("# Calibration data for NanoVNA-Saver\n")
|
||||
for note in self.notes:
|
||||
calfile.write(f"! {note}\n")
|
||||
calfile.write(
|
||||
"# Hz ShortR ShortI OpenR OpenI LoadR LoadI"
|
||||
" ThroughR ThroughI IsolationR IsolationI\n")
|
||||
for i, cur_short in enumerate(self.cals["short"]):
|
||||
cur_open = self.cals["open"][i]
|
||||
cur_load = self.cals["load"][i]
|
||||
data = [
|
||||
cur_short.freq,
|
||||
cur_short.re, cur_short.im,
|
||||
cur_open.re, cur_open.im,
|
||||
cur_load.re, cur_load.im,
|
||||
]
|
||||
if self.isValid2Port():
|
||||
cur_through = self.cals["through"][i]
|
||||
cur_isolation = self.cals["isolation"][i]
|
||||
data.extend([
|
||||
cur_through.re, cur_through.im,
|
||||
cur_isolation.re, cur_isolation.im
|
||||
])
|
||||
calfile.write(" ".join([str(val) for val in data]))
|
||||
calfile.write("\n")
|
||||
for freq in self.dataset.frequencies():
|
||||
calfile.write(f"{self.dataset.get(freq)}\n")
|
||||
|
||||
# TODO: implement tests
|
||||
# TODO: Exception should be catched by caller
|
||||
def load(self, filename):
|
||||
self.source = os.path.basename(filename)
|
||||
self._reset_cals()
|
||||
self.dataset = CalDataSet()
|
||||
self.notes = []
|
||||
|
||||
parsed_header = False
|
||||
|
@ -342,7 +385,8 @@ class Calibration:
|
|||
nr_cals = 3
|
||||
|
||||
for name in Calibration.CAL_NAMES[:nr_cals]:
|
||||
self.cals[name].append(
|
||||
self.dataset.insert(
|
||||
name,
|
||||
Datapoint(int(cal["freq"]),
|
||||
float(cal[f"{name}r"]),
|
||||
float(cal[f"{name}i"])))
|
||||
|
|
|
@ -24,7 +24,7 @@ import numpy as np
|
|||
from PyQt5 import QtCore, QtWidgets
|
||||
from PyQt5.QtCore import pyqtSlot, pyqtSignal
|
||||
|
||||
from NanoVNASaver.Calibration import Calibration
|
||||
from NanoVNASaver.Calibration import correct_delay
|
||||
from NanoVNASaver.Formatting import parse_frequency
|
||||
from NanoVNASaver.RFTools import Datapoint
|
||||
|
||||
|
@ -123,7 +123,7 @@ class SweepWorker(QtCore.QRunnable):
|
|||
|
||||
# Setup complete
|
||||
|
||||
values = []
|
||||
values11 = []
|
||||
values21 = []
|
||||
frequencies = []
|
||||
|
||||
|
@ -140,14 +140,14 @@ class SweepWorker(QtCore.QRunnable):
|
|||
start + (self.vna.datapoints - 1) * stepsize,
|
||||
self.averages)
|
||||
|
||||
frequencies += freq
|
||||
values += val11
|
||||
values21 += val21
|
||||
frequencies.extend(freq)
|
||||
values11.extend(val11)
|
||||
values21.extend(val21)
|
||||
|
||||
self.percentage = (i + 1) * (self.vna.datapoints - 1) / \
|
||||
self.noSweeps
|
||||
logger.debug("Saving acquired data")
|
||||
self.saveData(frequencies, values, values21)
|
||||
self.saveData(frequencies, values11, values21)
|
||||
|
||||
else:
|
||||
for i in range(self.noSweeps):
|
||||
|
@ -160,13 +160,13 @@ class SweepWorker(QtCore.QRunnable):
|
|||
freq, val11, val21 = self.readSegment(
|
||||
start, start + (self.vna.datapoints - 1) * stepsize)
|
||||
|
||||
frequencies += freq
|
||||
values += val11
|
||||
values21 += val21
|
||||
frequencies.extend(freq)
|
||||
values11.extend(val11)
|
||||
values21.extend(val21)
|
||||
|
||||
self.percentage = (i + 1) * 100 / self.noSweeps
|
||||
logger.debug("Saving acquired data")
|
||||
self.saveData(frequencies, values, values21)
|
||||
self.saveData(frequencies, values11, values21)
|
||||
except NanoVNAValueException as e:
|
||||
self.error_message = str(e)
|
||||
self.stopped = True
|
||||
|
@ -187,10 +187,10 @@ class SweepWorker(QtCore.QRunnable):
|
|||
break
|
||||
start = sweep_from + i * self.vna.datapoints * stepsize
|
||||
try:
|
||||
_, values, values21 = self.readSegment(
|
||||
_, values11, values21 = self.readSegment(
|
||||
start, start + (self.vna.datapoints-1) * stepsize)
|
||||
logger.debug("Updating acquired data")
|
||||
self.updateData(values, values21, i, self.vna.datapoints)
|
||||
self.updateData(values11, values21, i, self.vna.datapoints)
|
||||
except NanoVNAValueException as e:
|
||||
self.error_message = str(e)
|
||||
self.stopped = True
|
||||
|
@ -242,18 +242,22 @@ class SweepWorker(QtCore.QRunnable):
|
|||
self.signals.updated.emit()
|
||||
|
||||
def saveData(self, frequencies, values11, values21):
|
||||
logger.debug("Freqs: %d, values11: %d, values21: %d",
|
||||
len(frequencies), len(values11), len(values21))
|
||||
v11 = values11[:]
|
||||
v21 = values21[:]
|
||||
raw_data11 = []
|
||||
raw_data21 = []
|
||||
logger.debug("Calculating data including corrections")
|
||||
for i, freq in enumerate(frequencies):
|
||||
re, im = values11[i]
|
||||
re21, im21 = values21[i]
|
||||
raw_data11 += [Datapoint(freq, re, im)]
|
||||
raw_data21 += [Datapoint(freq, re21, im21)]
|
||||
self.data11, self.data21 = self.applyCalibration(
|
||||
raw_data11, raw_data21)
|
||||
for freq in frequencies:
|
||||
real11, imag11 = v11.pop(0)
|
||||
real21, imag21 = v21.pop(0)
|
||||
raw_data11.append(Datapoint(freq, real11, imag11))
|
||||
raw_data21.append(Datapoint(freq, real21, imag21))
|
||||
self.rawData11 = raw_data11
|
||||
self.rawData21 = raw_data21
|
||||
self.data11, self.data21 = self.applyCalibration(
|
||||
raw_data11, raw_data21)
|
||||
logger.debug("Saving data to application (%d and %d points)",
|
||||
len(self.data11), len(self.data21))
|
||||
self.app.saveData(self.data11, self.data21)
|
||||
|
@ -266,12 +270,12 @@ class SweepWorker(QtCore.QRunnable):
|
|||
) -> (List[Datapoint], List[Datapoint]):
|
||||
if self.offsetDelay != 0:
|
||||
tmp = []
|
||||
for d in raw_data11:
|
||||
tmp.append(Calibration.correctDelay11(d, self.offsetDelay))
|
||||
for dp in raw_data11:
|
||||
tmp.append(correct_delay(dp, self.offsetDelay, reflect=True))
|
||||
raw_data11 = tmp
|
||||
tmp = []
|
||||
for d in raw_data21:
|
||||
tmp.append(Calibration.correctDelay21(d, self.offsetDelay))
|
||||
for dp in raw_data21:
|
||||
tmp.append(correct_delay(dp, self.offsetDelay))
|
||||
raw_data21 = tmp
|
||||
|
||||
if not self.app.calibration.isCalculated:
|
||||
|
@ -281,16 +285,14 @@ class SweepWorker(QtCore.QRunnable):
|
|||
data21: List[Datapoint] = []
|
||||
|
||||
if self.app.calibration.isValid1Port():
|
||||
for d in raw_data11:
|
||||
re, im = self.app.calibration.correct11(d.re, d.im, d.freq)
|
||||
data11.append(Datapoint(d.freq, re, im))
|
||||
for dp in raw_data11:
|
||||
data11.append(self.app.calibration.correct11(dp))
|
||||
else:
|
||||
data11 = raw_data11
|
||||
|
||||
if self.app.calibration.isValid2Port():
|
||||
for d in raw_data21:
|
||||
re, im = self.app.calibration.correct21(d.re, d.im, d.freq)
|
||||
data21.append(Datapoint(d.freq, re, im))
|
||||
for dp in raw_data21:
|
||||
data21.append(self.app.calibration.correct21(dp))
|
||||
else:
|
||||
data21 = raw_data21
|
||||
return data11, data21
|
||||
|
@ -333,6 +335,11 @@ class SweepWorker(QtCore.QRunnable):
|
|||
# S21
|
||||
values21 = self.readData("data 1")
|
||||
|
||||
if (len(frequencies) != len(values11) or
|
||||
len(frequencies) != len(values21)):
|
||||
logger.info("No valid data during this run")
|
||||
# TODO: display gui warning
|
||||
return [], [], []
|
||||
return frequencies, values11, values21
|
||||
|
||||
def readData(self, data):
|
||||
|
|
|
@ -27,8 +27,8 @@ from NanoVNASaver.Calibration import Calibration
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_cal_label(data: list, prefix: str = "Set") -> str:
|
||||
return f"{prefix} ({len(data)} points)"
|
||||
def _format_cal_label(size: int, prefix: str = "Set") -> str:
|
||||
return f"{prefix} ({size} points)"
|
||||
|
||||
|
||||
class CalibrationWindow(QtWidgets.QWidget):
|
||||
|
@ -230,11 +230,11 @@ class CalibrationWindow(QtWidgets.QWidget):
|
|||
|
||||
def cal_save(self, name: str):
|
||||
if name in ("through", "isolation"):
|
||||
self.app.calibration.cals[name] = self.app.data21[:]
|
||||
self.app.calibration.insert(name, self.app.data21)
|
||||
else:
|
||||
self.app.calibration.cals[name] = self.app.data[:]
|
||||
self.app.calibration.insert(name, self.app.data)
|
||||
self.cal_label[name].setText(
|
||||
_format_cal_label(self.app.data))
|
||||
_format_cal_label(len(self.app.data)))
|
||||
|
||||
def manual_save(self, name: str):
|
||||
if self.checkExpertUser():
|
||||
|
@ -529,7 +529,7 @@ class CalibrationWindow(QtWidgets.QWidget):
|
|||
try:
|
||||
self.app.calibration.calc_corrections()
|
||||
self.calibration_status_label.setText(
|
||||
_format_cal_label(self.app.calibration.cals["short"],
|
||||
_format_cal_label(self.app.calibration.size(),
|
||||
"Application calibration"))
|
||||
if self.use_ideal_values.isChecked():
|
||||
self.calibration_source_label.setText(self.app.calibration.source)
|
||||
|
@ -569,7 +569,7 @@ class CalibrationWindow(QtWidgets.QWidget):
|
|||
for i, name in enumerate(
|
||||
("short", "open", "load", "through", "isolation")):
|
||||
self.cal_label[name].setText(
|
||||
_format_cal_label(self.app.calibration.cals[name], "Loaded"))
|
||||
_format_cal_label(self.app.calibration.data_size(name), "Loaded"))
|
||||
if i == 2 and not self.app.calibration.isValid2Port():
|
||||
break
|
||||
self.calculate()
|
||||
|
|
Ładowanie…
Reference in New Issue