nanovna-saver/src/NanoVNASaver/Calibration.py

438 wiersze
16 KiB
Python

# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020,2021 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import cmath
import math
import os
import re
from collections import defaultdict, UserDict
from dataclasses import dataclass
from typing import List
from scipy.interpolate import interp1d
from NanoVNASaver.RFTools import Datapoint
IDEAL_SHORT = complex(-1, 0)
IDEAL_OPEN = complex(1, 0)
IDEAL_LOAD = complex(0, 0)
IDEAL_THROUGH = complex(1, 0)
RXP_CAL_HEADER = re.compile(r"""
^ \# \s+ Hz \s+
ShortR \s+ ShortI \s+ OpenR \s+ OpenI \s+
LoadR \s+ LoadI
(?P<through> \s+ ThroughR \s+ ThroughI)?
(?P<thrurefl> \s+ ThrureflR \s+ ThrureflI)?
(?P<isolation> \s+ IsolationR \s+ IsolationI)?
\s* $
""", re.VERBOSE | re.IGNORECASE)
RXP_CAL_LINE = re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)
( \s+ (?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+))?
( \s+ (?P<thrureflr>[-0-9Ee.]+) \s+ (?P<thrurefli>[-0-9Ee.]+))?
( \s+ (?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+))?
\s* $
""", re.VERBOSE)
logger = logging.getLogger(__name__)
def correct_delay(d: Datapoint, delay: float, reflect: bool = False):
mult = 2 if reflect else 1
corr_data = d.z * cmath.exp(
complex(0, 1) * 2 * math.pi * d.freq * delay * -1 * mult)
return Datapoint(d.freq, corr_data.real, corr_data.imag)
@dataclass
class CalData:
# pylint: disable=too-many-instance-attributes
short: complex = complex(0.0, 0.0)
open: complex = complex(0.0, 0.0)
load: complex = complex(0.0, 0.0)
through: complex = complex(0.0, 0.0)
thrurefl: complex = complex(0.0, 0.0)
isolation: complex = complex(0.0, 0.0)
freq: int = 0
e00: float = 0.0 # Directivity
e11: float = 0.0 # Port1 match
delta_e: float = 0.0 # Tracking
e10e01: float = 0.0 # Forward Reflection Tracking
# 2 port
e30: float = 0.0 # Forward isolation
e22: float = 0.0 # Port2 match
e10e32: float = 0.0 # Forward transmission
def __str__(self):
return (
f'{self.freq}'
f' {self.short.real} {self.short.imag}'
f' {self.open.real} {self.open.imag}'
f' {self.load.real} {self.load.imag}' + (
f' {self.through.real} {self.through.imag}'
f' {self.thrurefl.real} {self.thrurefl.imag}'
f' {self.isolation.real} {self.isolation.imag}'
if self.through else ''
)
)
@dataclass
class CalElement:
# pylint: disable=too-many-instance-attributes
short_is_ideal: bool = True
short_l0: float = 5.7e-12
short_l1: float = -8.96e-20
short_l2: float = -1.1e-29
short_l3: float = -4.12e-37
short_length: float = -34.2 # ps
open_is_ideal: bool = True
open_c0: float = 2.1e-14
open_c1: float = 5.67e-23
open_c2: float = -2.39e-31
open_c3: float = 2.0e-40
open_length: float = 0.0
load_is_ideal: bool = True
load_r: float = 50.0
load_l: float = 0.0
load_c: float = 0.0
load_length: float = 0.0
through_is_ideal: bool = True
through_length: float = 0.0
class CalDataSet(UserDict):
def __init__(self):
super().__init__()
self.notes = ""
self.data: defaultdict[int, CalData] = defaultdict(CalData)
def __str__(self):
return (
(
"# Calibration data for NanoVNA-Saver\n"
+ "\n".join([f"! {note}" for note in self.notes.splitlines()])
+ "\n" + "# Hz ShortR ShortI OpenR OpenI LoadR LoadI"
+ (" ThroughR ThroughI ThrureflR"
" ThrureflI IsolationR IsolationI\n"
if self.complete2port() else "\n")
+ "\n".join([
f"{self.data.get(freq)}" for freq in self.frequencies()
]) + "\n"
)
if self.complete1port() else ""
)
def _append_match(self, m: re.Match, header: str,
line_nr: int, line: str) -> None:
cal = m.groupdict()
columns = {
col[:-1] for col in cal.keys() if cal[col] and col != "freq"
}
if "through" in columns and header == "sol":
logger.warning("Through data with sol header. %i: %s",
line_nr, line)
# fix short data (without thrurefl)
if "thrurefl" in columns and "isolation" not in columns:
cal["isolationr"] = cal["thrureflr"]
cal["isolationi"] = cal["thrurefli"]
cal["thrureflr"], cal["thrurefli"] = None, None
for name in columns:
self.insert(
name,
Datapoint(int(cal["freq"]),
float(cal[f"{name}r"]),
float(cal[f"{name}i"])))
def from_str(self, text: str) -> 'CalDataSet':
# reset data
self.notes = ""
self.data = defaultdict(CalData)
header = ""
# parse text
for i, line in enumerate(text.splitlines(), 1):
line = line.strip()
if line.startswith("!"):
self.notes += f"{line[2:]}\n"
continue
if m := RXP_CAL_HEADER.search(line):
if header:
logger.warning(
"Duplicate header in cal data. %i: %s", i, line)
header = "through" if m.group("through") else "sol"
continue
if not line or line.startswith("#"):
continue
m = RXP_CAL_LINE.search(line)
if not m:
logger.warning("Illegal caldata. Line %i: %s", i, line)
continue
if not header:
logger.warning(
"Caldata without having read header: %i: %s", i, line)
self._append_match(m, header, line, i)
return self
def insert(self, name: str, dp: Datapoint):
if name not in {'short', 'open', 'load',
'through', 'thrurefl', 'isolation'}:
raise KeyError(name)
freq = dp.freq
setattr(self.data[freq], name, (dp.z))
self.data[freq].freq = freq
def frequencies(self) -> List[int]:
return sorted(self.data.keys())
def get(self, key: int, default: CalData = None) -> CalData:
return self.data.get(key, default)
def items(self):
yield from self.data.items()
def values(self):
for freq in self.frequencies():
yield self.get(freq)
def size_of(self, name: str) -> int:
return len(
[True for val in self.data.values() if getattr(val, name)]
)
def complete1port(self) -> bool:
for val in self.data.values():
if not all((val.short, val.open, val.load)):
return False
return any(self.data)
def complete2port(self) -> bool:
if not self.complete1port():
return False
for val in self.data.values():
if not all((val.through, val.thrurefl, val.isolation)):
return False
return any(self.data)
class Calibration:
def __init__(self):
self.notes = []
self.dataset = CalDataSet()
self.cal_element = CalElement()
self.interp = {}
self.isCalculated = False
self.source = "Manual"
def insert(self, name: str, data: List[Datapoint]):
for dp in data:
self.dataset.insert(name, dp)
def size(self) -> int:
return len(self.dataset.frequencies())
def data_size(self, name) -> int:
return self.dataset.size_of(name)
def isValid1Port(self) -> bool:
return self.dataset.complete1port()
def isValid2Port(self) -> bool:
return self.dataset.complete2port()
def _calc_port_1(self, freq: int, cal: CalData):
g1 = self.gamma_short(freq)
g2 = self.gamma_open(freq)
g3 = self.gamma_load(freq)
gm1 = cal.short
gm2 = cal.open
gm3 = cal.load
denominator = (g1 * (g2 - g3) * gm1 +
g2 * g3 * gm2 - g2 * g3 * gm3 -
(g2 * gm2 - g3 * gm3) * g1)
cal.e00 = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 -
(g2 * g3 * gm2 - g2 * g3 * gm3 -
(g3 * gm2 - g2 * gm3) * g1) * gm1
) / denominator
cal.e11 = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) +
g3 * gm2 - g2 * gm3) / denominator
cal.delta_e = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 *
gm3) * gm1 + (g2 * gm3 - g3 * gm3) *
gm2) / denominator
def _calc_port_2(self, freq: int, cal: CalData):
gt = self.gamma_through(freq)
gm4 = cal.through
gm5 = cal.thrurefl
gm6 = cal.isolation
gm7 = gm5 - cal.e00
cal.e30 = cal.isolation
cal.e10e01 = cal.e00 * cal.e11 - cal.delta_e
cal.e22 = gm7 / (
gm7 * cal.e11 * gt ** 2 + cal.e10e01 * gt ** 2)
cal.e10e32 = (gm4 - gm6) * (
1 - cal.e11 * cal.e22 * gt ** 2) / gt
def calc_corrections(self):
if not self.isValid1Port():
logger.warning(
"Tried to calibrate from insufficient data.")
raise ValueError(
"All of short, open and load calibration steps"
"must be completed for calibration to be applied.")
logger.debug("Calculating calibration for %d points.", self.size())
for freq, caldata in self.dataset.items():
try:
self._calc_port_1(freq, caldata)
if self.isValid2Port():
self._calc_port_2(freq, caldata)
except ZeroDivisionError as exc:
self.isCalculated = False
logger.error(
"Division error - did you use the same measurement"
" for two of short, open and load?")
raise ValueError(
f"Two of short, open and load returned the same"
f" values at frequency {freq}Hz.") from exc
self.gen_interpolation()
self.isCalculated = True
logger.debug("Calibration correctly calculated.")
def gamma_short(self, freq: int) -> complex:
if self.cal_element.short_is_ideal:
return IDEAL_SHORT
logger.debug("Using short calibration set values.")
cal_element = self.cal_element
Zsp = complex(0.0, 2.0 * math.pi * freq * (
cal_element.short_l0 + cal_element.short_l1 * freq +
cal_element.short_l2 * freq**2 + cal_element.short_l3 * freq**3))
# Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21)
return (Zsp / 50.0 - 1.0) / (Zsp / 50.0 + 1.0) * cmath.exp(
complex(0.0,
-4.0 * math.pi * freq * cal_element.short_length))
def gamma_open(self, freq: int) -> complex:
if self.cal_element.open_is_ideal:
return IDEAL_OPEN
logger.debug("Using open calibration set values.")
cal_element = self.cal_element
Zop = complex(0.0, 2.0 * math.pi * freq * (
cal_element.open_c0 + cal_element.open_c1 * freq +
cal_element.open_c2 * freq**2 + cal_element.open_c3 * freq**3))
return ((1.0 - 50.0 * Zop) / (1.0 + 50.0 * Zop)) * cmath.exp(
complex(0.0,
-4.0 * math.pi * freq * cal_element.open_length))
def gamma_load(self, freq: int) -> complex:
if self.cal_element.load_is_ideal:
return IDEAL_LOAD
logger.debug("Using load calibration set values.")
cal_element = self.cal_element
Zl = complex(cal_element.load_r, 0.0)
if cal_element.load_c > 0.0:
Zl = cal_element.load_r / complex(
1.0,
2.0 * cal_element.load_r * math.pi * freq * cal_element.load_c)
if cal_element.load_l > 0.0:
Zl = Zl + complex(0.0, 2 * math.pi * freq * cal_element.load_l)
return (Zl / 50.0 - 1.0) / (Zl / 50.0 + 1.0) * cmath.exp(
complex(0.0, -4 * math.pi * freq * cal_element.load_length))
def gamma_through(self, freq: int) -> complex:
if self.cal_element.through_is_ideal:
return IDEAL_THROUGH
logger.debug("Using through calibration set values.")
cal_element = self.cal_element
return cmath.exp(
complex(0.0, -2.0 * math.pi * cal_element.through_length * freq))
def gen_interpolation(self):
(freq, e00, e11, delta_e, e10e01, e30, e22, e10e32) = zip(*[
(c.freq, c.e00, c.e11, c.delta_e, c.e10e01, c.e30, c.e22, c.e10e32)
for c in self.dataset.values()])
self.interp = {
"e00": interp1d(freq, e00,
kind="slinear", bounds_error=False,
fill_value=(e00[0], e00[-1])),
"e11": interp1d(freq, e11,
kind="slinear", bounds_error=False,
fill_value=(e11[0], e11[-1])),
"delta_e": interp1d(freq, delta_e,
kind="slinear", bounds_error=False,
fill_value=(delta_e[0], delta_e[-1])),
"e10e01": interp1d(freq, e10e01,
kind="slinear", bounds_error=False,
fill_value=(e10e01[0], e10e01[-1])),
"e30": interp1d(freq, e30,
kind="slinear", bounds_error=False,
fill_value=(e30[0], e30[-1])),
"e22": interp1d(freq, e22,
kind="slinear", bounds_error=False,
fill_value=(e22[0], e22[-1])),
"e10e32": interp1d(freq, e10e32,
kind="slinear", bounds_error=False,
fill_value=(e10e32[0], e10e32[-1])),
}
def correct11(self, dp: Datapoint):
i = self.interp
s11 = (dp.z - i["e00"](dp.freq)) / (
(dp.z * i["e11"](dp.freq)) - i["delta_e"](dp.freq))
return Datapoint(dp.freq, s11.real, s11.imag)
def correct21(self, dp: Datapoint, dp11: Datapoint):
i = self.interp
s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
s21 = s21 * (i["e10e01"](dp.freq) / (i["e11"](dp.freq)
* dp11.z - i["delta_e"](dp.freq)))
return Datapoint(dp.freq, s21.real, s21.imag)
def save(self, filename: str):
self.dataset.notes = "\n".join(self.notes)
if not self.isValid1Port():
raise ValueError("Not a valid calibration")
with open(filename, mode="w", encoding='utf-8') as calfile:
calfile.write(str(self.dataset))
def load(self, filename):
self.source = os.path.basename(filename)
with open(filename, encoding='utf-8') as calfile:
self.dataset = CalDataSet().from_str(calfile.read())
self.notes = self.dataset.notes.splitlines()