Merge pull request #226 from zarath/feature/refactor_hardware

This patchset refactors many hardware related functions.
It can break things especially with AVNA/NanoVNA-F/-H/-H4. As I don't have access to that hardware devices I need some feedback, if general functionality is given.

I personally was impressed, how much faster sweeping is with the original NanoVNA
pull/232/head
Holger Müller 2020-07-05 14:08:24 +02:00 zatwierdzone przez GitHub
commit 496494900f
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
13 zmienionych plików z 386 dodań i 514 usunięć

Wyświetl plik

@ -20,8 +20,7 @@ import logging
from time import sleep
from typing import List
import serial
from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
@ -30,66 +29,18 @@ logger = logging.getLogger(__name__)
class AVNA(VNA):
name = "AVNA"
def __init__(self, app, serial_port):
super().__init__(app, serial_port)
def __init__(self, iface: Interface):
super().__init__(iface)
self.version = Version(self.readVersion())
self.features.add("Customizable data points")
def isValid(self):
return True
def getCalibration(self) -> str:
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
with self.app.serialLock:
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
return "Unknown"
def readFrequencies(self) -> List[str]:
return self.readValues("frequencies")
def resetSweep(self, start: int, stop: int):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
self.writeSerial("resume")
def readVersion(self):
logger.debug("Reading version info.")
if not self.serial.is_open:
return
with self.app.serialLock:
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
return ""
def setSweep(self, start, stop):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
sleep(1)

Wyświetl plik

@ -18,8 +18,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import platform
from typing import List, Tuple
from collections import namedtuple
from typing import List
import serial
from serial.tools import list_ports
@ -30,16 +30,16 @@ from NanoVNASaver.Hardware.NanoVNA_F import NanoVNA_F
from NanoVNASaver.Hardware.NanoVNA_H import NanoVNA_H, NanoVNA_H4
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.NanoVNA_V2 import NanoVNAV2
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.Serial import drain_serial, Interface
logger = logging.getLogger(__name__)
Device = namedtuple("Device", "vid pid name")
USBDevice = namedtuple("Device", "vid pid name")
DEVICETYPES = (
Device(0x0483, 0x5740, "NanoVNA"),
Device(0x16c0, 0x0483, "AVNA"),
Device(0x04b4, 0x0008, "NanaVNA-V2"),
USBDEVICETYPES = (
USBDevice(0x0483, 0x5740, "NanoVNA"),
USBDevice(0x16c0, 0x0483, "AVNA"),
USBDevice(0x04b4, 0x0008, "S-A-A-2"),
)
RETRIES = 3
TIMEOUT = 0.2
@ -56,60 +56,63 @@ def _fix_v2_hwinfo(dev):
# Get list of interfaces with VNAs connected
def get_interfaces() -> List[Tuple[str, str]]:
return_ports = []
def get_interfaces() -> List[Interface]:
interfaces = []
# serial like usb interfaces
for d in list_ports.comports():
if platform.system() == 'Windows' and d.vid is None:
d = _fix_v2_hwinfo(d)
for t in DEVICETYPES:
if d.vid == t.vid and d.pid == t.pid:
port = d.device
logger.info("Found %s (%04x %04x) on port %s",
t.name, d.vid, d.pid, d.device)
return_ports.append((port, f"{port} ({t.name})"))
return return_ports
for t in USBDEVICETYPES:
if d.vid != t.vid or d.pid != t.pid:
continue
logger.debug("Found %s USB:(%04x:%04x) on port %s",
t.name, d.vid, d.pid, d.device)
iface = Interface('serial', t.name)
iface.port = d.device
interfaces.append(iface)
return interfaces
def get_VNA(app, serial_port: serial.Serial) -> 'VNA':
serial_port.timeout = TIMEOUT
def get_VNA(iface: Interface) -> 'VNA':
# serial_port.timeout = TIMEOUT
logger.info("Finding correct VNA type...")
with app.serialLock:
vna_version = detect_version(serial_port)
with iface.lock:
vna_version = detect_version(iface)
if vna_version == 'v2':
logger.info("Type: NanoVNA-V2")
return NanoVNAV2(app, serial_port)
return NanoVNAV2(iface)
logger.info("Finding firmware variant...")
tmp_vna = VNA(app, serial_port)
tmp_vna = VNA(iface)
tmp_vna.flushSerialBuffers()
firmware = tmp_vna.readFirmware()
if firmware.find("AVNA + Teensy") > 0:
logger.info("Type: AVNA")
return AVNA(app, serial_port)
return AVNA(iface)
if firmware.find("NanoVNA-H 4") > 0:
logger.info("Type: NanoVNA-H4")
vna = NanoVNA_H4(app, serial_port)
vna = NanoVNA_H4(iface)
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
vna._datapoints = (201, 101)
vna.datapoints = (201, 101)
return vna
if firmware.find("NanoVNA-H") > 0:
logger.info("Type: NanoVNA-H")
vna = NanoVNA_H(app, serial_port)
vna = NanoVNA_H(iface)
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
vna._datapoints = (201, 101)
vna.datapoints = (201, 101)
return vna
if firmware.find("NanoVNA-F") > 0:
logger.info("Type: NanoVNA-F")
return NanoVNA_F(app, serial_port)
return NanoVNA_F(iface)
if firmware.find("NanoVNA") > 0:
logger.info("Type: Generic NanoVNA")
return NanoVNA(app, serial_port)
return NanoVNA(iface)
logger.warning("Did not recognize NanoVNA type from firmware.")
return NanoVNA(app, serial_port)
return NanoVNA(iface)
def detect_version(serial_port: serial.Serial) -> str:

Wyświetl plik

@ -25,7 +25,7 @@ import serial
import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.Serial import drain_serial, Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
@ -36,70 +36,53 @@ class NanoVNA(VNA):
screenwidth = 320
screenheight = 240
def __init__(self, app, serial_port):
super().__init__(app, serial_port)
self.version = Version(self.readVersion())
def __init__(self, iface: Interface):
super().__init__(iface)
version_string = self.readVersion()
self.version = Version(version_string)
self.sweep_method = "sweep"
self.start = 27000000
self.stop = 30000000
self._sweepdata = []
logger.debug("Testing against 0.2.0")
if self.version.version_string.find("extended with scan") > 0:
logger.debug("Incompatible scan command detected.")
self.features.add("Incompatible scan command")
self.useScan = False
if self.version >= Version("0.7.1"):
self.features.add("Scan mask command")
self.sweep_method = "scan_mask"
elif self.version >= Version("0.2.0"):
logger.debug("Newer than 0.2.0, using new scan command.")
self.features.add("New scan command")
self.useScan = True
else:
logger.debug("Older than 0.2.0, using old sweep command.")
self.features.add("Original sweep method")
self.useScan = False
self.features.add("Scan command")
self.sweep_method = "scan"
self.readFeatures()
def isValid(self):
return True
def getCalibration(self) -> str:
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
with self.app.serialLock:
try:
drain_serial(self.serial)
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
return "Unknown"
def _capture_data(self) -> bytes:
with self.serial.lock:
drain_serial(self.serial)
timeout = self.serial.timeout
self.serial.write("capture\r".encode('ascii'))
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f">{self.screenwidth * self.screenheight}H",
image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
return (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
if not self.connected():
return QtGui.QPixmap()
try:
with self.app.serialLock:
drain_serial(self.serial)
timeout = self.serial.timeout
self.serial.write("capture\r".encode('ascii'))
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f">{self.screenwidth * self.screenheight}H",
image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
rgba_array = self._capture_data()
image = QtGui.QImage(
rgba_array,
self.screenwidth,
@ -112,37 +95,61 @@ class NanoVNA(VNA):
"Exception while capturing screenshot: %s", exc)
return QtGui.QPixmap()
def readFrequencies(self) -> List[str]:
return self.readValues("frequencies")
def resetSweep(self, start: int, stop: int):
self.writeSerial("sweep {start} {stop} {self.datapoints}")
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
self.writeSerial("resume")
def readVersion(self):
logger.debug("Reading version info.")
if not self.serial.is_open:
return ""
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
return ""
def setSweep(self, start, stop):
if self.useScan:
self.writeSerial(f"scan {start} {stop} {self.datapoints}")
else:
self.start = start
self.stop = stop
if self.sweep_method == "sweep":
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
sleep(1)
elif self.sweep_method == "scan":
self.writeSerial(f"scan {start} {stop} {self.datapoints}")
def readFrequencies(self) -> List[int]:
if self.sweep_method != "scan_mask":
return super().readFrequencies()
step = (self.stop - self.start) / (self.datapoints - 1.0)
return [round(self.start + i * step) for i in range(self.datapoints)]
def readValues(self, value) -> List[str]:
if self.sweep_method != "scan_mask":
return super().readValues(value)
logger.debug("readValue with scan mask (%s)", value)
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
self._sweepdata = []
try:
with self.serial.lock:
drain_serial(self.serial)
self.serial.write(
(f"scan {self.start} {self.stop}"
f' {self.datapoints} 0b110\r'
).encode("ascii"))
self.serial.readline()
logger.info("reading values")
retries = 0
while True:
line = self.serial.readline()
line = line.decode("ascii").strip()
if not line:
retries += 1
logger.info("Retry nr: %s", retries)
if retries > 10:
raise IOError("too many retries")
sleep(0.2)
continue
if line.startswith("ch>"):
break
data = line.split()
self._sweepdata.append((
f"{data[0]} {data[1]}",
f"{data[2]} {data[3]}"))
except IOError as exc:
logger.error("Error readValues: %s", exc)
if value == "data 0":
return [x[0] for x in self._sweepdata]
if value == "data 1":
return [x[1] for x in self._sweepdata]

Wyświetl plik

@ -17,14 +17,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import struct
import serial
import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.Serial import drain_serial
logger = logging.getLogger(__name__)
@ -36,26 +34,10 @@ class NanoVNA_F(NanoVNA):
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
if not self.connected():
return QtGui.QPixmap()
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("capture\r".encode('ascii'))
timeout = self.serial.timeout
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f"<{self.screenwidth * self.screenheight}H", image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) + # G?!
((rgb_array & 0x07E0) >> 3) + # B
((rgb_array & 0x001F) << 11)) # G
rgba_array = self._capture_data()
unwrapped_array = np.empty(
self.screenwidth*self.screenheight,
dtype=np.uint32)

Wyświetl plik

@ -21,6 +21,7 @@ import platform
from struct import pack, unpack_from
from typing import List
from NanoVNASaver.Hardware.Serial import Interface
from NanoVNASaver.Hardware.VNA import VNA, Version
if platform.system() != 'Windows':
@ -59,14 +60,14 @@ class NanoVNAV2(VNA):
screenwidth = 320
screenheight = 240
def __init__(self, app, serialPort):
super().__init__(app, serialPort)
def __init__(self, iface: Interface):
super().__init__(iface)
if platform.system() != 'Windows':
tty.setraw(self.serial.fd)
# reset protocol to known state
with self.app.serialLock:
with self.serial.lock:
self.serial.write(pack("<Q", 0))
self.version = self.readVersion()
@ -76,35 +77,24 @@ class NanoVNAV2(VNA):
self.features.add("Multi data points")
# firmware major version of 0xff indicates dfu mode
if self.firmware.major == 0xff:
self._isDFU = True
return
if self.firmware.data["major"] == 0xff:
raise IOError('Device is in DFU mode')
self._isDFU = False
self.sweepStartHz = 200e6
self.sweepStepHz = 1e6
self._sweepdata = []
self._updateSweep()
# self.setSweep(200e6, 300e6)
def isValid(self):
if self.isDFU():
return False
return True
def isDFU(self):
return self._isDFU
def checkValid(self):
if self.isDFU():
raise IOError('Device is in DFU mode')
def getCalibration(self) -> str:
return "Unknown"
def readFirmware(self) -> str:
# read register 0xf3 and 0xf4 (firmware major and minor version)
cmd = pack("<BBBB",
_CMD_READ, _ADDR_FW_MAJOR,
_CMD_READ, _ADDR_FW_MINOR)
with self.app.serialLock:
with self.serial.lock:
self.serial.write(cmd)
resp = self.serial.read(2)
if len(resp) != 2:
@ -112,26 +102,23 @@ class NanoVNAV2(VNA):
return None
return Version(f"{resp[0]}.{resp[1]}.0")
def readFrequencies(self) -> List[str]:
self.checkValid()
def readFrequencies(self) -> List[int]:
return [
str(int(self.sweepStartHz + i * self.sweepStepHz))
int(self.sweepStartHz + i * self.sweepStepHz)
for i in range(self.datapoints)]
def readValues(self, value) -> List[str]:
self.checkValid()
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
# reset protocol to known state
with self.app.serialLock:
with self.serial.lock:
self.serial.timeout = 8 # should be enough
self.serial.write(pack("<Q", 0))
# cmd: write register 0x30 to clear FIFO
self.serial.write(pack("<BBB",
_CMD_WRITE, _ADDR_VALUES_FIFO, 0))
_CMD_WRITE, _ADDR_VALUES_FIFO, 0))
# clear sweepdata
self._sweepdata = [(complex(), complex())] * self.datapoints
pointstodo = self.datapoints
@ -141,8 +128,8 @@ class NanoVNAV2(VNA):
# cmd: read FIFO, addr 0x30
self.serial.write(
pack("<BBB",
_CMD_READFIFO, _ADDR_VALUES_FIFO,
pointstoread))
_CMD_READFIFO, _ADDR_VALUES_FIFO,
pointstoread))
# each value is 32 bytes
nBytes = pointstoread * 32
@ -151,13 +138,13 @@ class NanoVNAV2(VNA):
arr = self.serial.read(nBytes)
if nBytes != len(arr):
logger.error("expected %d bytes, got %d",
nBytes, len(arr))
nBytes, len(arr))
return []
for i in range(pointstoread):
(fwd_real, fwd_imag, rev0_real, rev0_imag, rev1_real,
rev1_imag, freq_index) = unpack_from(
"<iiiiiihxxxxxx", arr, i * 32)
rev1_imag, freq_index) = unpack_from(
"<iiiiiihxxxxxx", arr, i * 32)
fwd = complex(fwd_real, fwd_imag)
refl = complex(rev0_real, rev0_imag)
thru = complex(rev1_real, rev1_imag)
@ -177,13 +164,12 @@ class NanoVNAV2(VNA):
def resetSweep(self, start: int, stop: int):
self.setSweep(start, stop)
return
# returns device variant
def readVersion(self):
# read register 0xf0 (device type), 0xf2 (board revision)
cmd = b"\x10\xf0\x10\xf2"
with self.app.serialLock:
with self.serial.lock:
self.serial.write(cmd)
resp = self.serial.read(2)
if len(resp) != 2:
@ -203,7 +189,6 @@ class NanoVNAV2(VNA):
return
def _updateSweep(self):
self.checkValid()
cmd = pack("<BBQ", _CMD_WRITE8,
_ADDR_SWEEP_START, int(self.sweepStartHz))
cmd += pack("<BBQ", _CMD_WRITE8,
@ -212,5 +197,5 @@ class NanoVNAV2(VNA):
_ADDR_SWEEP_POINTS, self.datapoints)
cmd += pack("<BBH", _CMD_WRITE2,
_ADDR_SWEEP_VALS_PER_FREQ, 1)
with self.app.serialLock:
with self.serial.lock:
self.serial.write(cmd)

Wyświetl plik

@ -16,10 +16,28 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from threading import Lock
import serial
logger = logging.getLogger(__name__)
def drain_serial(serial_port: serial.Serial):
"""drain up to 10k outstanding data in the serial incoming buffer"""
for _ in range(80):
if len(serial_port.read(128)) == 0:
break
class Interface(serial.Serial):
def __init__(self, interface_type: str, comment, *args, **kwargs):
super().__init__(*args, **kwargs)
assert interface_type in ('serial', 'usb', 'bt', 'network')
self.type = interface_type
self.comment = comment
self.port = None
self.baudrate = 115200
self.lock = Lock()
def __str__(self):
return f"{self.port} ({self.comment})"

Wyświetl plik

@ -21,10 +21,10 @@ from time import sleep
from typing import List
import serial
from PyQt5 import QtWidgets, QtGui
from PyQt5 import QtGui
from NanoVNASaver.Settings import Version
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.Serial import Interface, drain_serial
logger = logging.getLogger(__name__)
@ -33,10 +33,9 @@ class VNA:
name = "VNA"
_datapoints = (101, )
def __init__(self, app: QtWidgets.QWidget, serial_port: serial.Serial):
self.app = app
self.serial = serial_port
self.version: Version = Version("0.0.0")
def __init__(self, iface: Interface):
self.serial = iface
self.version = Version("0.0.0")
self.features = set()
self.validateInput = True
self.datapoints = VNA._datapoints[0]
@ -54,9 +53,8 @@ class VNA:
return self.features
# TODO: check return types
def readFrequencies(self) -> List[int]:
return []
return [int(f) for f in self.readValues("frequencies")]
def resetSweep(self, start: int, stop: int):
pass
@ -64,20 +62,37 @@ class VNA:
def isValid(self):
return False
def isDFU(self):
return False
def connected(self) -> bool:
return self.serial.is_open
def getFeatures(self) -> List[str]:
return self.features
def getCalibration(self) -> str:
logger.debug("Reading calibration info.")
if not self.connected():
return "Not connected."
with self.serial.lock:
try:
drain_serial(self.serial)
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
return "Unknown"
def getScreenshot(self) -> QtGui.QPixmap:
return QtGui.QPixmap()
def flushSerialBuffers(self):
with self.app.serialLock:
with self.serial.lock:
self.serial.write("\r\n\r\n".encode("ascii"))
sleep(0.1)
self.serial.reset_input_buffer()
@ -86,7 +101,7 @@ class VNA:
def readFirmware(self) -> str:
try:
with self.app.serialLock:
with self.serial.lock:
drain_serial(self.serial)
self.serial.write("info\r".encode('ascii'))
result = ""
@ -103,7 +118,7 @@ class VNA:
def readFromCommand(self, command) -> str:
try:
with self.app.serialLock:
with self.serial.lock:
drain_serial(self.serial)
self.serial.write(f"{command}\r".encode('ascii'))
result = ""
@ -121,7 +136,7 @@ class VNA:
def readValues(self, value) -> List[str]:
logger.debug("VNA reading %s", value)
try:
with self.app.serialLock:
with self.serial.lock:
drain_serial(self.serial)
self.serial.write(f"{value}\r".encode('ascii'))
result = ""
@ -140,12 +155,33 @@ class VNA:
"Exception while reading %s: %s", value, exc)
return []
def readVersion(self) -> str:
logger.debug("Reading version info.")
if not self.connected():
return ""
try:
with self.serial.lock:
drain_serial(self.serial)
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
return ""
def writeSerial(self, command):
if not self.serial.is_open:
if not self.connected():
logger.warning("Writing without serial port being opened (%s)",
command)
return
with self.app.serialLock:
with self.serial.lock:
try:
self.serial.write(f"{command}\r".encode('ascii'))
self.serial.readline()
@ -156,31 +192,3 @@ class VNA:
def setSweep(self, start, stop):
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
# TODO: should be dropped and the serial part should be a connection class
# which handles unconnected devices
class InvalidVNA(VNA):
name = "Invalid"
_datapoints = (0, )
def setSweep(self, start, stop):
return
def resetSweep(self, start, stop):
return
def writeSerial(self, command):
return
def readFirmware(self):
return
def readFrequencies(self) -> List[int]:
return []
def readValues(self, value):
return
def flushSerialBuffers(self):
return

Wyświetl plik

@ -24,7 +24,6 @@ from collections import OrderedDict
from time import sleep, strftime, localtime
from typing import List
import serial
from PyQt5 import QtWidgets, QtCore, QtGui
from .Windows import (
@ -36,8 +35,8 @@ from .Formatting import (
format_frequency, format_frequency_short, format_frequency_sweep,
parse_frequency,
)
from .Hardware.Hardware import get_interfaces, get_VNA
from .Hardware.VNA import InvalidVNA
from .Hardware.Hardware import Interface, get_interfaces, get_VNA
from .Hardware.VNA import VNA
from .RFTools import Datapoint, corr_att_data
from .Charts.Chart import Chart
from .Charts import (
@ -92,9 +91,8 @@ class NanoVNASaver(QtWidgets.QWidget):
self.noSweeps = 1 # Number of sweeps to run
self.serialLock = threading.Lock()
self.serial = serial.Serial()
self.vna = InvalidVNA(self, serial)
self.interface = Interface("serial", "None")
self.vna = VNA(self.interface)
self.dataLock = threading.Lock()
# TODO: use Touchstone class as data container
@ -110,8 +108,6 @@ class NanoVNASaver(QtWidgets.QWidget):
self.markers = []
self.serialPort = ""
logger.debug("Building user interface")
self.baseTitle = f"NanoVNA Saver {NanoVNASaver.version}"
@ -456,7 +452,7 @@ class NanoVNASaver(QtWidgets.QWidget):
self.rescanSerialPort()
self.serialPortInput.setEditable(True)
btn_rescan_serial_port = QtWidgets.QPushButton("Rescan")
btn_rescan_serial_port.setFixedWidth(60)
btn_rescan_serial_port.setFixedWidth(65)
btn_rescan_serial_port.clicked.connect(self.rescanSerialPort)
serial_port_input_layout = QtWidgets.QHBoxLayout()
serial_port_input_layout.addWidget(self.serialPortInput)
@ -471,6 +467,7 @@ class NanoVNASaver(QtWidgets.QWidget):
serial_button_layout.addWidget(self.btnSerialToggle, stretch=1)
self.btnDeviceSettings = QtWidgets.QPushButton("Manage")
self.btnDeviceSettings.setFixedWidth(65)
self.btnDeviceSettings.clicked.connect(
lambda: self.display_window("device_settings"))
serial_button_layout.addWidget(self.btnDeviceSettings, stretch=0)
@ -555,8 +552,8 @@ class NanoVNASaver(QtWidgets.QWidget):
def rescanSerialPort(self):
self.serialPortInput.clear()
for port in get_interfaces():
self.serialPortInput.insertItem(1, port[1], port[0])
for iface in get_interfaces():
self.serialPortInput.insertItem(1, f"{iface}", iface)
def exportFile(self, nr_params: int = 1):
if len(self.data) == 0:
@ -600,70 +597,63 @@ class NanoVNASaver(QtWidgets.QWidget):
return
def serialButtonClick(self):
if self.serial.is_open:
self.stopSerial()
if not self.vna.connected():
self.connect_device()
else:
self.startSerial()
self.disconnect_device()
return
def startSerial(self):
with self.serialLock:
self.serialPort = self.serialPortInput.currentData()
if self.serialPort == "":
self.serialPort = self.serialPortInput.currentText()
logger.info("Opening serial port %s", self.serialPort)
def connect_device(self):
with self.interface.lock:
self.interface = self.serialPortInput.currentData()
logger.info("Connection %s", self.interface)
try:
self.serial = serial.Serial(port=self.serialPort, baudrate=115200)
self.serial.timeout = 0.05
except serial.SerialException as exc:
logger.error("Tried to open %s and failed: %s", self.serialPort, exc)
self.interface.open()
self.interface.timeout = 0.05
except (IOError, AttributeError) as exc:
logger.error("Tried to open %s and failed: %s",
self.interface, exc)
return
if not self.serial.isOpen() :
logger.error("Unable to open port %s", self.serialPort)
if not self.interface.isOpen():
logger.error("Unable to open port %s", self.interface)
return
self.btnSerialToggle.setText("Disconnect")
sleep(0.1)
try:
self.vna = get_VNA(self.interface)
except IOError as exc:
logger.error("Unable to connect to VNA: %s", exc)
sleep(0.05)
self.vna = get_VNA(self, self.serial)
self.vna.validateInput = self.settings.value("SerialInputValidation", True, bool)
self.worker.setVNA(self.vna)
logger.info(self.vna.readFirmware())
# connected
self.btnSerialToggle.setText("Disconnect")
frequencies = self.vna.readFrequencies()
if frequencies:
logger.info("Read starting frequency %s and end frequency %s",
frequencies[0], frequencies[100])
if int(frequencies[0]) == int(frequencies[100]) and (
self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100]) + 100000))
elif (self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100])))
self.sweepStartInput.textEdited.emit(
self.sweepStartInput.text())
self.sweepStartInput.textChanged.emit(
self.sweepStartInput.text())
else:
if not frequencies:
logger.warning("No frequencies read")
return
logger.info("Read starting frequency %s and end frequency %s",
frequencies[0], frequencies[-1])
self.sweepStartInput.setText(
format_frequency_sweep(frequencies[0]))
if frequencies[0] < frequencies[-1]:
self.sweepEndInput.setText(
format_frequency_sweep(frequencies[-1]))
else:
self.sweepEndInput.setText(
format_frequency_sweep(frequencies[-1] + 100000))
self.sweepStartInput.textEdited.emit(self.sweepStartInput.text())
self.sweepStartInput.textChanged.emit(self.sweepStartInput.text())
logger.debug("Starting initial sweep")
self.sweep()
return
def stopSerial(self):
with self.serialLock:
logger.info("Closing connection to NanoVNA")
self.serial.close()
self.btnSerialToggle.setText("Connect to NanoVNA")
def disconnect_device(self):
with self.interface.lock:
logger.info("Closing connection to %s", self.interface)
self.interface.close()
self.btnSerialToggle.setText("Connect to device")
def toggleSweepSettings(self, disabled):
self.sweepStartInput.setDisabled(disabled)
@ -673,8 +663,8 @@ class NanoVNASaver(QtWidgets.QWidget):
self.sweepCountInput.setDisabled(disabled)
def sweep(self):
# Run the serial port update
if not self.serial.is_open:
# Run the device data update
if not self.vna.connected():
return
self.worker.stopped = False

Wyświetl plik

@ -155,36 +155,35 @@ class BandsModel(QtCore.QAbstractTableModel):
class Version:
RXP = re.compile(r"(.*\s+)?(\d+)\.(\d+)\.(\d+)(.*)")
RXP = re.compile(r"""^
\D*
(?P<major>\d+)\.
(?P<minor>\d+)\.
(?P<revision>\d+)
(?P<note>.*)
$""", re.VERBOSE)
def __init__(self, version_string: str):
self.major = 0
self.minor = 0
self.revision = 0
self.note = ""
self.version_string = version_string
results = Version.RXP.match(version_string)
if results:
self.major = int(results.group(2))
self.minor = int(results.group(3))
self.revision = int(results.group(4))
self.note = results.group(5)
logger.debug(
"Parsed version as \"%d.%d.%d%s\"",
self.major, self.minor, self.revision, self.note)
def __init__(self, vstring: str = "0.0.0"):
self.data = {
"major": 0,
"minor": 0,
"revision": 0,
"note": "",
}
try:
self.data = Version.RXP.search(vstring).groupdict()
for name in ("major", "minor", "revision"):
self.data[name] = int(self.data[name])
except AttributeError:
logger.error("Unable to parse version: %s", vstring)
def __gt__(self, other: "Version") -> bool:
if self.major > other.major:
return True
if self.major < other.major:
return False
if self.minor > other.minor:
return True
if self.minor < other.minor:
return False
if self.revision > other.revision:
return True
l, r = self.data, other.data
for name in ("major", "minor", "revision"):
if l[name] > r[name]:
return True
if l[name] < r[name]:
return False
return False
def __lt__(self, other: "Version") -> bool:
@ -197,11 +196,8 @@ class Version:
return self < other or self == other
def __eq__(self, other: "Version") -> bool:
return (
self.major == other.major and
self.minor == other.minor and
self.revision == other.revision and
self.note == other.note)
return self.data == other.data
def __str__(self) -> str:
return f"{self.major}.{self.minor}.{self.revision}{self.note}"
return (f'{self.data["major"]}.{self.data["minor"]}'
f'.{self.data["revision"]}{self.data["note"]}')

Wyświetl plik

@ -32,6 +32,7 @@ logger = logging.getLogger(__name__)
def truncate(values: List[List[Tuple]], count: int) -> List[List[Tuple]]:
"""truncate drops extrema from data list if averaging is active"""
keep = len(values) - count
logger.debug("Truncating from %d values to %d", len(values), keep)
if count < 1 or keep < 1:
@ -82,7 +83,7 @@ class SweepWorker(QtCore.QRunnable):
logger.info("Initializing SweepWorker")
self.running = True
self.percentage = 0
if not self.app.serial.is_open:
if not self.app.vna.connected():
logger.debug(
"Attempted to run without being connected to the NanoVNA")
self.running = False
@ -127,82 +128,52 @@ class SweepWorker(QtCore.QRunnable):
values21 = []
frequencies = []
if self.averaging:
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d averaged over %d readings",
i, self.averages)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start = sweep_from + i * self.vna.datapoints * stepsize
freq, val11, val21 = self.readAveragedSegment(
start,
start + (self.vna.datapoints - 1) * stepsize,
self.averages)
frequencies.extend(freq)
values11.extend(val11)
values21.extend(val21)
self.percentage = (i + 1) * (self.vna.datapoints - 1) / \
self.noSweeps
logger.debug("Saving acquired data")
self.saveData(frequencies, values11, values21)
else:
first_sweep = True
finished = False
while not finished:
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
finished = True
break
start = sweep_from + i * self.vna.datapoints * stepsize
try:
freq, val11, val21 = self.readSegment(
start, start + (self.vna.datapoints - 1) * stepsize)
if self.continuousSweep and not first_sweep:
_, values11, values21 = self.readSegment(
start, start + (self.vna.datapoints-1) * stepsize)
self.percentage = (i + 1) * 100 / self.noSweeps
self.updateData(values11, values21, i,
self.vna.datapoints)
continue
if self.averaging:
freq, val11, val21 = self.readAveragedSegment(
start,
start + (self.vna.datapoints - 1) * stepsize,
self.averages)
else:
freq, val11, val21 = self.readSegment(
start, start + (self.vna.datapoints - 1) * stepsize)
frequencies.extend(freq)
values11.extend(val11)
values21.extend(val21)
self.percentage = (i + 1) * 100 / self.noSweeps
logger.debug("Saving acquired data")
self.saveData(frequencies, values11, values21)
except NanoVNAValueException as e:
except ValueError as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepError.emit()
except NanoVNASerialException as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepFatalError.emit()
while self.continuousSweep and not self.stopped:
logger.debug("Continuous sweeping")
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start = sweep_from + i * self.vna.datapoints * stepsize
try:
_, values11, values21 = self.readSegment(
start, start + (self.vna.datapoints-1) * stepsize)
logger.debug("Updating acquired data")
self.updateData(values11, values21, i, self.vna.datapoints)
except NanoVNAValueException as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepError.emit()
except NanoVNASerialException as e:
self.error_message = str(e)
self.stopped = True
self.running = False
self.signals.sweepFatalError.emit()
if not self.continuousSweep:
finished = True
first_sweep = False
# Reset the device to show the full range if we were multisegment
if self.noSweeps > 1:
logger.debug("Resetting NanoVNA sweep to full range: %d to %d",
parse_frequency(
@ -267,7 +238,7 @@ class SweepWorker(QtCore.QRunnable):
def applyCalibration(self,
raw_data11: List[Datapoint],
raw_data21: List[Datapoint]
) -> (List[Datapoint], List[Datapoint]):
) -> Tuple[List[Datapoint], List[Datapoint]]:
if self.offsetDelay != 0:
tmp = []
for dp in raw_data11:
@ -329,7 +300,7 @@ class SweepWorker(QtCore.QRunnable):
self.vna.setSweep(start, stop)
# Let's check the frequencies first:
frequencies = self.readFreq()
frequencies = self.vna.readFrequencies()
# S11
values11 = self.readData("data 0")
# S21
@ -356,25 +327,19 @@ class SweepWorker(QtCore.QRunnable):
a, b = d.split(" ")
try:
if self.vna.validateInput and (
float(a) < -9.5 or float(a) > 9.5):
abs(float(a)) > 9.5 or
abs(float(b)) > 9.5):
logger.warning(
"Got a non-float data value: %s (%s)", d, a)
logger.debug("Re-reading %s", data)
"Got a non plausible data value: (%s)", d)
done = False
elif self.vna.validateInput and (
float(b) < -9.5 or float(b) > 9.5):
logger.warning(
"Got a non-float data value: %s (%s)", d, b)
logger.debug("Re-reading %s", data)
done = False
else:
returndata.append((float(a), float(b)))
except Exception as e:
break
returndata.append((float(a), float(b)))
except ValueError as exc:
logger.exception("An exception occurred reading %s: %s",
data, e)
logger.debug("Re-reading %s", data)
data, exc)
done = False
if not done:
logger.debug("Re-reading %s", data)
sleep(0.2)
count += 1
if count == 10:
@ -384,7 +349,7 @@ class SweepWorker(QtCore.QRunnable):
logger.critical(
"Tried and failed to read %s %d times. Giving up.",
data, count)
raise NanoVNAValueException(
raise IOError(
f"Failed reading {data} {count} times.\n"
f"Data outside expected valid ranges,"
f" or in an unexpected format.\n\n"
@ -392,40 +357,6 @@ class SweepWorker(QtCore.QRunnable):
f"device settings screen.")
return returndata
def readFreq(self):
# TODO: Figure out why frequencies sometimes arrive as non-integers
logger.debug("Reading frequencies")
returnfreq = []
done = False
count = 0
while not done:
done = True
returnfreq = []
tmpfreq = self.vna.readFrequencies()
if not tmpfreq:
logger.warning("Read no frequencies")
raise NanoVNASerialException(
"Failed reading frequencies: Returned no values.")
for f in tmpfreq:
if not f.isdigit():
logger.warning("Got a non-digit frequency: %s", f)
logger.debug("Re-reading frequencies")
done = False
count += 1
if count == 10:
logger.error(
"Tried and failed %d times to read frequencies.",
count)
if count >= 20:
logger.critical(
"Tried and failed to read frequencies from the"
" NanoVNA %d times.", count)
raise NanoVNAValueException(
f"Failed reading frequencies {count} times.")
else:
returnfreq.append(int(f))
return returnfreq
def setContinuousSweep(self, continuous_sweep: bool):
self.continuousSweep = continuous_sweep
@ -439,11 +370,3 @@ class SweepWorker(QtCore.QRunnable):
def setVNA(self, vna):
self.vna = vna
class NanoVNAValueException(Exception):
pass
class NanoVNASerialException(Exception):
pass

Wyświetl plik

@ -53,7 +53,7 @@ class AboutWindow(QtWidgets.QWidget):
f"NanoVNASaver version {self.app.version}"))
layout.addWidget(QtWidgets.QLabel(""))
layout.addWidget(QtWidgets.QLabel(
"\N{COPYRIGHT SIGN} Copyright 2019, 2020 Rune B. Broberg"
"\N{COPYRIGHT SIGN} Copyright 2019, 2020 Rune B. Broberg\n"
"\N{COPYRIGHT SIGN} Copyright 2020 NanoVNA-Saver Authors"
))
layout.addWidget(QtWidgets.QLabel(
@ -115,12 +115,12 @@ class AboutWindow(QtWidgets.QWidget):
self.updateLabels()
def updateLabels(self):
if self.app.vna.isValid():
logger.debug("Valid VNA")
v: Version = self.app.vna.version
try:
self.versionLabel.setText(
f"NanoVNA Firmware Version: {self.app.vna.name}"
f"{v.version_string}")
f"NanoVNA Firmware Version: {self.app.vna.name} "
f"v{self.app.vna.version}")
except (IOError, AttributeError):
pass
def updateSettings(self):
if self.updateCheckBox.isChecked():
@ -150,7 +150,7 @@ class AboutWindow(QtWidgets.QWidget):
self.app.settings.setValue("CheckForUpdates", "Ask")
def findUpdates(self, automatic=False):
latest_version = Version("")
latest_version = Version()
latest_url = ""
try:
req = request.Request(VERSION_URL)
@ -174,7 +174,7 @@ class AboutWindow(QtWidgets.QWidget):
self.updateLabel.setText("Connection error.")
return
logger.info("Latest version is %s", latest_version.version_string)
logger.info("Latest version is %s", latest_version)
this_version = Version(self.app.version)
logger.info("This is %s", this_version)
if latest_version > this_version:
@ -183,9 +183,9 @@ class AboutWindow(QtWidgets.QWidget):
QtWidgets.QMessageBox.information(
self,
"Updates available",
"There is a new update for NanoVNA-Saver available!\n" +
"Version " + latest_version.version_string + "\n\n" +
"Press \"About\" to find the update.")
f"There is a new update for NanoVNA-Saver available!\n"
f"Version {latest_version}\n\n"
f'Press "About" to find the update.')
else:
QtWidgets.QMessageBox.information(
self, "Updates available",

Wyświetl plik

@ -633,7 +633,7 @@ class CalibrationWindow(QtWidgets.QWidget):
self.btn_automatic.setDisabled(False)
return
logger.info("Starting automatic calibration assistant.")
if not self.app.serial.is_open:
if not self.app.vna.connected():
QtWidgets.QMessageBox(
QtWidgets.QMessageBox.Information,
"NanoVNA not connected",

Wyświetl plik

@ -35,6 +35,12 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
QtWidgets.QShortcut(QtCore.Qt.Key_Escape, self, self.hide)
self.label = {
"status": QtWidgets.QLabel("Not connected."),
"firmware": QtWidgets.QLabel("Not connected."),
"calibration": QtWidgets.QLabel("Not connected."),
}
top_layout = QtWidgets.QHBoxLayout()
left_layout = QtWidgets.QVBoxLayout()
right_layout = QtWidgets.QVBoxLayout()
@ -44,13 +50,13 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
status_box = QtWidgets.QGroupBox("Status")
status_layout = QtWidgets.QFormLayout(status_box)
self.statusLabel = QtWidgets.QLabel("Not connected.")
status_layout.addRow("Status:", self.statusLabel)
self.calibrationStatusLabel = QtWidgets.QLabel("Not connected.")
status_layout.addRow("Calibration:", self.calibrationStatusLabel)
status_layout.addRow("Status:", self.label["status"])
status_layout.addRow("Firmware:", self.label["firmware"])
status_layout.addRow("Calibration:", self.label["calibration"])
status_layout.addRow(QtWidgets.QLabel("Features:"))
self.featureList = QtWidgets.QListWidget()
status_layout.addRow(self.featureList)
@ -93,33 +99,36 @@ class DeviceSettingsWindow(QtWidgets.QWidget):
self.updateFields()
def updateFields(self):
if self.app.vna.isValid():
self.statusLabel.setText("Connected to " + self.app.vna.name + ".")
if self.app.worker.running:
self.calibrationStatusLabel.setText("(Sweep running)")
else:
self.calibrationStatusLabel.setText(self.app.vna.getCalibration())
if not self.app.vna.connected():
self.label["status"].setText("Not connected.")
self.label["firmware"].setText("Not connected.")
self.label["calibration"].setText("Not connected.")
self.featureList.clear()
self.featureList.addItem(self.app.vna.name + " v" + str(self.app.vna.version))
features = self.app.vna.getFeatures()
for item in features:
self.featureList.addItem(item)
self.btnCaptureScreenshot.setDisabled("Screenshots" not in features)
if "Customizable data points" in features:
self.datapoints.clear()
cur_dps = self.app.vna.datapoints
dplist = self.app.vna._datapoints[:]
for d in sorted(dplist):
self.datapoints.addItem(str(d))
self._set_datapoint_index(cur_dps)
else:
self.statusLabel.setText("Not connected.")
self.calibrationStatusLabel.setText("Not connected.")
self.featureList.clear()
self.featureList.addItem("Not connected.")
self.btnCaptureScreenshot.setDisabled(True)
return
self.label["status"].setText(
f"Connected to {self.app.vna.name}.")
self.label["firmware"].setText(
f"{self.app.vna.name} v{self.app.vna.version}")
if self.app.worker.running:
self.label["calibration"].setText("(Sweep running)")
else:
self.label["calibration"].setText(self.app.vna.getCalibration())
self.featureList.clear()
features = self.app.vna.getFeatures()
for item in features:
self.featureList.addItem(item)
self.btnCaptureScreenshot.setDisabled("Screenshots" not in features)
if "Customizable data points" in features:
self.datapoints.clear()
cur_dps = self.app.vna.datapoints
dplist = self.app.vna._datapoints[:]
for d in sorted(dplist):
self.datapoints.addItem(str(d))
self._set_datapoint_index(cur_dps)
def updateValidation(self, validate_data: bool):
self.app.vna.validateInput = validate_data