Changes content in separated file 'AVNA.py'.

pull/181/head
Holger Mueller 2020-05-18 20:31:27 +02:00
rodzic 904d8fa287
commit 61cc7d1373
1 zmienionych plików z 5 dodań i 682 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
# NanoVNASaver - a python program to view and export Touchstone data from a NanoVNA
# NanoVNASaver
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019. Rune B. Broberg
#
# This program is free software: you can redistribute it and/or modify
@ -14,295 +15,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import re
import struct
import platform
from time import sleep
from typing import List, Tuple
from collections import namedtuple
from typing import List
import serial
import numpy as np
from PyQt5 import QtWidgets, QtGui
from serial.tools import list_ports
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
Device = namedtuple("Device", "vid pid name")
DEVICETYPES = (
Device(0x0483, 0x5740, "NanoVNA"),
Device(0x16c0, 0x0483, "AVNA"),
Device(0x04b4, 0x0008, "NanaVNA-V2"),
)
# The USB Driver for NanoVNA V2 seems to deliver an
# incompatible hardware info like:
# 'PORTS\\VID_04B4&PID_0008\\DEMO'
# This function will fix it.
def _fix_v2_hwinfo(dev):
if dev.hwid == r'PORTS\VID_04B4&PID_0008\DEMO':
dev.vid, dev.pid = 0x04b4, 0x0008
return dev
# Get list of interfaces with VNAs connected
def get_interfaces() -> List[Tuple[str, str]]:
return_ports = []
for d in list_ports.comports():
if platform.system() == 'Windows' and d.vid is None:
d = _fix_v2_hwinfo(d)
for t in DEVICETYPES:
if d.vid == t.vid and d.pid == t.pid:
port = d.device
logger.info("Found %s (%04x %04x) on port %s", t.name, d.vid, d.pid, d.device)
return_ports.append((port, port + " (" + t.name + ")"))
return return_ports
def detect_version(serialPort: serial.Serial) -> str:
serialPort.timeout = 0.1
# drain any outstanding data in the serial incoming buffer
data = "a"
while len(data) != 0:
data = serialPort.read(128)
# send a \r and see what we get
serialPort.write(b"\r")
# will wait up to 0.1 seconds
data = serialPort.readline().decode('ascii')
if data == 'ch> ':
# this is an original nanovna
return 'nanovna'
if data == '2':
# this is a nanovna v2
return 'nanovnav2'
logger.error('Unknown VNA type: hardware responded to \r with: %s', data)
return 'unknown'
class VNA:
name = "VNA"
validateInput = True
features = []
datapoints = 101
def __init__(self, app: QtWidgets.QWidget, serial_port: serial.Serial):
self.app = app
self.serial = serial_port
self.version: Version = Version("0.0.0")
@staticmethod
def getVNA(app, serial_port: serial.Serial) -> 'VNA':
logger.info("Finding correct VNA type...")
for _ in range(3):
vnaType = detect_version(serial_port)
if vnaType != "unkown":
break
if vnaType == 'nanovnav2':
logger.info("Type: NanoVNA-V2")
return NanoVNAV2(app, serial_port)
logger.info("Finding firmware variant...")
serial_port.timeout = 0.05
tmp_vna = VNA(app, serial_port)
tmp_vna.flushSerialBuffers()
firmware = tmp_vna.readFirmware()
if firmware.find("AVNA + Teensy") > 0:
logger.info("Type: AVNA")
return AVNA(app, serial_port)
if firmware.find("NanoVNA-H") > 0:
logger.info("Type: NanoVNA-H")
return NanoVNA_H(app, serial_port)
if firmware.find("NanoVNA-F") > 0:
logger.info("Type: NanoVNA-F")
return NanoVNA_F(app, serial_port)
elif firmware.find("NanoVNA") > 0:
logger.info("Type: Generic NanoVNA")
return NanoVNA(app, serial_port)
else:
logger.warning("Did not recognize NanoVNA type from firmware.")
return NanoVNA(app, serial_port)
def readFeatures(self) -> List[str]:
features = []
raw_help = self.readFromCommand("help")
logger.debug("Help command output:")
logger.debug(raw_help)
# Detect features from the help command
if "capture" in raw_help:
features.append("Screenshots")
return features
def readFrequencies(self) -> List[str]:
pass
def readValues11(self) -> List[str]:
pass
def readValues21(self) -> List[str]:
pass
def resetSweep(self, start: int, stop: int):
pass
def isValid(self):
return False
def isDFU(self):
return False
def getFeatures(self) -> List[str]:
return self.features
def getCalibration(self) -> str:
return "Unknown"
def getScreenshot(self) -> QtGui.QPixmap:
return QtGui.QPixmap()
def flushSerialBuffers(self):
if self.app.serialLock.acquire():
self.serial.write(b"\r\n\r\n")
sleep(0.1)
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
sleep(0.1)
self.app.serialLock.release()
def readFirmware(self) -> str:
if self.app.serialLock.acquire():
result = ""
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("info\r".encode('ascii'))
result = ""
data = ""
sleep(0.01)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
except serial.SerialException as exc:
logger.exception("Exception while reading firmware data: %s", exc)
finally:
self.app.serialLock.release()
return result
else:
logger.error("Unable to acquire serial lock to read firmware.")
return ""
def readFromCommand(self, command) -> str:
if self.app.serialLock.acquire():
result = ""
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write((command + "\r").encode('ascii'))
result = ""
data = ""
sleep(0.01)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
except serial.SerialException as exc:
logger.exception("Exception while reading %s: %s", command, exc)
finally:
self.app.serialLock.release()
return result
else:
logger.error("Unable to acquire serial lock to read %s", command)
return ""
def readValues(self, value) -> List[str]:
logger.debug("VNA reading %s", value)
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
# Then send the command to read data
self.serial.write(str(value + "\r").encode('ascii'))
result = ""
data = ""
sleep(0.05)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
values = result.split("\r\n")
except serial.SerialException as exc:
logger.exception("Exception while reading %s: %s", value, exc)
return []
finally:
self.app.serialLock.release()
logger.debug("VNA done reading %s (%d values)", value, len(values)-2)
return values[1:-1]
else:
logger.error("Unable to acquire serial lock to read %s", value)
return []
def writeSerial(self, command):
if not self.serial.is_open:
logger.warning("Writing without serial port being opened (%s)", command)
return
if self.app.serialLock.acquire():
try:
self.serial.write(str(command + "\r").encode('ascii'))
self.serial.readline()
except serial.SerialException as exc:
logger.exception("Exception while writing to serial port (%s): %s", command, exc)
finally:
self.app.serialLock.release()
return
def setSweep(self, start, stop):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
class InvalidVNA(VNA):
name = "Invalid"
datapoints = 0
def setSweep(self, start, stop):
return
def resetSweep(self, start, stop):
return
def writeSerial(self, command):
return
def readFirmware(self):
return
def readFrequencies(self) -> List[int]:
return []
def readValues11(self) -> List[str]:
return []
def readValues21(self) -> List[str]:
return []
def readValues(self, value):
return
def flushSerialBuffers(self):
return
class AVNA(VNA):
name = "AVNA"
datapoints = 101
@ -382,401 +103,3 @@ class AVNA(VNA):
def setSweep(self, start, stop):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
sleep(1)
class NanoVNA(VNA):
name = "NanoVNA"
datapoints = 101
screenwidth = 320
screenheight = 240
def __init__(self, app, serial_port):
super().__init__(app, serial_port)
self.version = Version(self.readVersion())
self.features = []
logger.debug("Testing against 0.2.0")
if self.version.version_string.find("extended with scan") > 0:
logger.debug("Incompatible scan command detected.")
self.features.append("Incompatible scan command")
self.useScan = False
elif self.version >= Version("0.2.0"):
logger.debug("Newer than 0.2.0, using new scan command.")
self.features.append("New scan command")
self.useScan = True
else:
logger.debug("Older than 0.2.0, using old sweep command.")
self.features.append("Original sweep method")
self.useScan = False
self.features.extend(self.readFeatures())
def isValid(self):
return True
def getCalibration(self) -> str:
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
finally:
self.app.serialLock.release()
return "Unknown"
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
return QtGui.QPixmap()
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("capture\r".encode('ascii'))
timeout = self.serial.timeout
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(">" + str(self.screenwidth * self.screenheight) + "H", image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
image = QtGui.QImage(rgba_array, self.screenwidth, self.screenheight, QtGui.QImage.Format_ARGB32)
logger.debug("Captured screenshot")
return QtGui.QPixmap(image)
except serial.SerialException as exc:
logger.exception("Exception while capturing screenshot: %s", exc)
finally:
self.app.serialLock.release()
return QtGui.QPixmap()
def readFrequencies(self) -> List[str]:
return self.readValues("frequencies")
def readValues11(self) -> List[str]:
return self.readValues("data 0")
def readValues21(self) -> List[str]:
return self.readValues("data 1")
def resetSweep(self, start: int, stop: int):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
self.writeSerial("resume")
def readVersion(self):
logger.debug("Reading version info.")
if not self.serial.is_open:
return
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
sleep(0.1)
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
finally:
self.app.serialLock.release()
return
def setSweep(self, start, stop):
if self.useScan:
self.writeSerial("scan " + str(start) + " " + str(stop) + " " + str(self.datapoints))
else:
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
sleep(1)
class NanoVNA_H(NanoVNA):
name = "NanoVNA-H"
class NanoVNA_H4(NanoVNA):
name = "NanoVNA-H4"
screenwidth = 640
screenheight = 240
class NanoVNA_F(NanoVNA):
name = "NanoVNA-F"
screenwidth = 800
screenheight = 480
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
return QtGui.QPixmap()
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write("capture\r".encode('ascii'))
timeout = self.serial.timeout
self.serial.timeout = 4
self.serial.readline()
image_data = self.serial.read(self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack("<" + str(self.screenwidth * self.screenheight) + "H", image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) + # G?!
((rgb_array & 0x07E0) >> 3) + # B
((rgb_array & 0x001F) << 11)) # G
# logger.debug("Value yellow: %s", hex(rgb_array[10*400+36])) # This ought to be yellow
# logger.debug("Value white: %s", hex(rgb_array[50*400+261])) # This ought to be white
# logger.debug("Value cyan: %s", hex(rgb_array[10*400+252])) # This ought to be cyan
#
# rgba_array = (0xFF000000 + ((rgb_array & 0x001F) << 11)) # Exclusively green?
# rgba_array[10*400+36] = 0xFFFF0000
# rgba_array[50*400+261] = 0xFFFF0000
# rgba_array[10*400+252] = 0xFFFF0000
# At this point, the RGBA array is structured as 4 small images:
# 13
# 24
# each of which represents the pixels in a differently structured larger image:
# 12
# 34
# Let us unwrap.
unwrapped_array = np.empty(self.screenwidth*self.screenheight, dtype=np.uint32)
for y in range(self.screenheight//2):
for x in range(self.screenwidth//2):
unwrapped_array[2 * x + 2 * y * self.screenwidth] = rgba_array[x + y * self.screenwidth]
unwrapped_array[(2 * x) + 1 + 2 * y * self.screenwidth] = \
rgba_array[x + (self.screenheight//2 + y) * self.screenwidth]
unwrapped_array[2 * x + (2 * y + 1) * self.screenwidth] = \
rgba_array[x + self.screenwidth//2 + y * self.screenwidth]
unwrapped_array[(2 * x) + 1 + (2 * y + 1) * self.screenwidth] = \
rgba_array[x + self.screenwidth//2 + (self.screenheight//2 + y) * self.screenwidth]
image = QtGui.QImage(unwrapped_array, self.screenwidth, self.screenheight, QtGui.QImage.Format_ARGB32)
logger.debug("Captured screenshot")
return QtGui.QPixmap(image)
except serial.SerialException as exc:
logger.exception("Exception while capturing screenshot: %s", exc)
finally:
self.app.serialLock.release()
return QtGui.QPixmap()
def _unpackSigned32(b):
return int.from_bytes(b[0:4], 'little', signed=True)
def _unpackUnsigned16(b):
return int.from_bytes(b[0:2], 'little', signed=False)
class NanoVNAV2(VNA):
name = "NanoVNA-V2"
datapoints = 101
screenwidth = 320
screenheight = 240
def __init__(self, app, serialPort):
super().__init__(app, serialPort)
if platform.system() != 'Windows':
import tty
tty.setraw(self.serial.fd)
self.serial.timeout = 3
# reset protocol to known state
self.serial.write([0, 0, 0, 0, 0, 0, 0, 0])
self.version = self.readVersion()
self.firmware = self.readFirmware()
# firmware major version of 0xff indicates dfu mode
if self.firmware.major == 0xff:
self._isDFU = True
return
self._isDFU = False
self.sweepStartHz = 200e6
self.sweepStepHz = 1e6
self.sweepData = [(0, 0)] * self.datapoints
self._updateSweep()
def isValid(self):
if self.isDFU():
return False
return True
def isDFU(self):
return self._isDFU
def checkValid(self):
if self.isDFU():
raise IOError('Device is in DFU mode')
def readFirmware(self) -> str:
# read register 0xf3 and 0xf4 (firmware major and minor version)
cmd = b"\x10\xf3\x10\xf4"
self.serial.write(cmd)
resp = self.serial.read(2)
if len(resp) != 2:
logger.error("Timeout reading version registers")
return None
return Version.getVersion(major=resp[0], minor=resp[1], revision=0)
def readFrequencies(self) -> List[str]:
self.checkValid()
freqs = [self.sweepStartHz + i*self.sweepStepHz for i in range(self.datapoints)]
return [str(int(f)) for f in freqs]
def readValues(self, value) -> List[str]:
self.checkValid()
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
# reset protocol to known state
self.serial.write([0, 0, 0, 0, 0, 0, 0, 0])
# cmd: write register 0x30 to clear FIFO
self.serial.write([0x20, 0x30, 0x00])
# cmd: read FIFO, addr 0x30
self.serial.write([0x18, 0x30, self.datapoints])
# each value is 32 bytes
nBytes = self.datapoints * 32
# serial .read() will wait for exactly nBytes bytes
arr = self.serial.read(nBytes)
if nBytes != len(arr):
logger.error("expected %d bytes, got %d", nBytes, len(arr))
return []
for i in range(self.datapoints):
b = arr[i*32:]
fwd = complex(_unpackSigned32(b[0:]), _unpackSigned32(b[4:]))
refl = complex(_unpackSigned32(b[8:]), _unpackSigned32(b[12:]))
thru = complex(_unpackSigned32(b[16:]), _unpackSigned32(b[20:]))
freqIndex = _unpackUnsigned16(b[24:])
#print('freqIndex', freqIndex)
self.sweepData[freqIndex] = (refl / fwd, thru / fwd)
ret = [x[0] for x in self.sweepData]
ret = [str(x.real) + ' ' + str(x.imag) for x in ret]
return ret
if value == "data 1":
ret = [x[1] for x in self.sweepData]
ret = [str(x.real) + ' ' + str(x.imag) for x in ret]
return ret
def readValues11(self) -> List[str]:
return self.readValues("data 0")
def readValues21(self) -> List[str]:
return self.readValues("data 1")
def resetSweep(self, start: int, stop: int):
self.setSweep(start, stop)
return
# returns device variant
def readVersion(self):
# read register 0xf0 (device type), 0xf2 (board revision)
cmd = b"\x10\xf0\x10\xf2"
self.serial.write(cmd)
resp = self.serial.read(2)
if len(resp) != 2:
logger.error("Timeout reading version registers")
return None
return Version.getVersion(major=resp[0], minor=0, revision=resp[1])
def setSweep(self, start, stop):
step = (stop - start) / (self.datapoints - 1)
if start == self.sweepStartHz and step == self.sweepStepHz:
return
self.sweepStartHz = start
self.sweepStepHz = step
logger.info('NanoVNAV2: set sweep start %d step %d', self.sweepStartHz, self.sweepStepHz)
self._updateSweep()
return
def _updateSweep(self):
self.checkValid()
cmd = b"\x23\x00" + int.to_bytes(int(self.sweepStartHz), 8, 'little')
cmd += b"\x23\x10" + int.to_bytes(int(self.sweepStepHz), 8, 'little')
cmd += b"\x21\x20" + int.to_bytes(int(self.datapoints), 2, 'little')
self.serial.write(cmd)
class Version:
major = 0
minor = 0
revision = 0
note = ""
version_string = ""
def __init__(self, version_string):
self.version_string = version_string
results = re.match(r"(.*\s+)?(\d+)\.(\d+)\.(\d+)(.*)", version_string)
if results:
self.major = int(results.group(2))
self.minor = int(results.group(3))
self.revision = int(results.group(4))
self.note = results.group(5)
logger.debug("Parsed version as \"%d.%d.%d%s\"", self.major, self.minor, self.revision, self.note)
@staticmethod
def getVersion(major: int, minor: int, revision: int, note=""):
return Version(str(major) + "." + str(minor) + "." + str(revision) + note)
def __gt__(self, other: "Version"):
return self.major > other.major or self.major == other.major and self.minor > other.minor or \
self.major == other.major and self.minor == other.minor and self.revision > other.revision
def __lt__(self, other: "Version"):
return other > self
def __ge__(self, other: "Version"):
return self > other or self == other
def __le__(self, other: "Version"):
return self < other or self == other
def __eq__(self, other: "Version"):
return self.major == other.major and self.minor == other.minor and self.revision == other.revision and \
self.note == other.note
def __str__(self):
return str(self.major) + "." + str(self.minor) + "." + str(self.revision) + str(self.note)