From 8e7e6b81aca16d4eea7f93203309abd918c7dc8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Holger=20M=C3=BCller?= Date: Sat, 4 Jul 2020 13:10:29 +0200 Subject: [PATCH] Moved duplicate code to parent class --- NanoVNASaver/Hardware/AVNA.py | 46 ------------------ NanoVNASaver/Hardware/NanoVNA.py | 74 +++++++---------------------- NanoVNASaver/Hardware/NanoVNA_F.py | 18 +------ NanoVNASaver/Hardware/NanoVNA_V2.py | 3 ++ NanoVNASaver/Hardware/VNA.py | 40 ++++++++++++++++ 5 files changed, 62 insertions(+), 119 deletions(-) diff --git a/NanoVNASaver/Hardware/AVNA.py b/NanoVNASaver/Hardware/AVNA.py index 4a4279d..194fcd7 100644 --- a/NanoVNASaver/Hardware/AVNA.py +++ b/NanoVNASaver/Hardware/AVNA.py @@ -20,8 +20,6 @@ import logging from time import sleep from typing import List -import serial - from NanoVNASaver.Hardware.Serial import Interface from NanoVNASaver.Hardware.VNA import VNA, Version @@ -39,27 +37,6 @@ class AVNA(VNA): def isValid(self): return True - def getCalibration(self) -> str: - logger.debug("Reading calibration info.") - if not self.serial.is_open: - return "Not connected." - with self.serial.lock: - try: - data = "a" - while data != "": - data = self.serial.readline().decode('ascii') - self.serial.write("cal\r".encode('ascii')) - result = "" - data = "" - sleep(0.1) - while "ch>" not in data: - data = self.serial.readline().decode('ascii') - result += data - values = result.splitlines() - return values[1] - except serial.SerialException as exc: - logger.exception("Exception while reading calibration info: %s", exc) - return "Unknown" def readFrequencies(self) -> List[str]: return self.readValues("frequencies") @@ -68,29 +45,6 @@ class AVNA(VNA): self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints)) self.writeSerial("resume") - def readVersion(self): - logger.debug("Reading version info.") - if not self.serial.is_open: - return - with self.serial.lock: - try: - data = "a" - while data != "": - data = self.serial.readline().decode('ascii') - self.serial.write("version\r".encode('ascii')) - result = "" - data = "" - sleep(0.1) - while "ch>" not in data: - data = self.serial.readline().decode('ascii') - result += data - values = result.splitlines() - logger.debug("Found version info: %s", values[1]) - return values[1] - except serial.SerialException as exc: - logger.exception("Exception while reading firmware version: %s", exc) - return "" - def setSweep(self, start, stop): self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints)) sleep(1) diff --git a/NanoVNASaver/Hardware/NanoVNA.py b/NanoVNASaver/Hardware/NanoVNA.py index 8fe33af..a9850c6 100644 --- a/NanoVNASaver/Hardware/NanoVNA.py +++ b/NanoVNASaver/Hardware/NanoVNA.py @@ -58,48 +58,31 @@ class NanoVNA(VNA): def isValid(self): return True - def getCalibration(self) -> str: - logger.debug("Reading calibration info.") - if not self.serial.is_open: - return "Not connected." + def _capture_data(self) -> bytes: with self.serial.lock: - try: - drain_serial(self.serial) - self.serial.write("cal\r".encode('ascii')) - result = "" - data = "" - sleep(0.1) - while "ch>" not in data: - data = self.serial.readline().decode('ascii') - result += data - values = result.splitlines() - return values[1] - except serial.SerialException as exc: - logger.exception("Exception while reading calibration info: %s", exc) - return "Unknown" + drain_serial(self.serial) + timeout = self.serial.timeout + self.serial.write("capture\r".encode('ascii')) + self.serial.timeout = 4 + self.serial.readline() + image_data = self.serial.read( + self.screenwidth * self.screenheight * 2) + self.serial.timeout = timeout + rgb_data = struct.unpack( + f">{self.screenwidth * self.screenheight}H", + image_data) + rgb_array = np.array(rgb_data, dtype=np.uint32) + return (0xFF000000 + + ((rgb_array & 0xF800) << 8) + + ((rgb_array & 0x07E0) << 5) + + ((rgb_array & 0x001F) << 3)) def getScreenshot(self) -> QtGui.QPixmap: logger.debug("Capturing screenshot...") if not self.serial.is_open: return QtGui.QPixmap() try: - with self.serial.lock: - drain_serial(self.serial) - timeout = self.serial.timeout - self.serial.write("capture\r".encode('ascii')) - self.serial.timeout = 4 - self.serial.readline() - image_data = self.serial.read( - self.screenwidth * self.screenheight * 2) - self.serial.timeout = timeout - rgb_data = struct.unpack( - f">{self.screenwidth * self.screenheight}H", - image_data) - rgb_array = np.array(rgb_data, dtype=np.uint32) - rgba_array = (0xFF000000 + - ((rgb_array & 0xF800) << 8) + - ((rgb_array & 0x07E0) << 5) + - ((rgb_array & 0x001F) << 3)) + rgba_array = self._capture_data() image = QtGui.QImage( rgba_array, self.screenwidth, @@ -119,27 +102,6 @@ class NanoVNA(VNA): self.writeSerial("sweep {start} {stop} {self.datapoints}") self.writeSerial("resume") - def readVersion(self): - logger.debug("Reading version info.") - if not self.serial.is_open: - return "" - try: - with self.serial.lock: - drain_serial(self.serial) - self.serial.write("version\r".encode('ascii')) - result = "" - data = "" - sleep(0.1) - while "ch>" not in data: - data = self.serial.readline().decode('ascii') - result += data - values = result.splitlines() - logger.debug("Found version info: %s", values[1]) - return values[1] - except serial.SerialException as exc: - logger.exception("Exception while reading firmware version: %s", exc) - return "" - def setSweep(self, start, stop): if self.useScan: self.writeSerial(f"scan {start} {stop} {self.datapoints}") diff --git a/NanoVNASaver/Hardware/NanoVNA_F.py b/NanoVNASaver/Hardware/NanoVNA_F.py index eb22245..dba4b4c 100644 --- a/NanoVNASaver/Hardware/NanoVNA_F.py +++ b/NanoVNASaver/Hardware/NanoVNA_F.py @@ -39,23 +39,7 @@ class NanoVNA_F(NanoVNA): if not self.serial.is_open: return QtGui.QPixmap() try: - with self.serial.lock: - drain_serial(self.serial) - self.serial.write("capture\r".encode('ascii')) - timeout = self.serial.timeout - self.serial.timeout = 4 - self.serial.readline() - image_data = self.serial.read( - self.screenwidth * self.screenheight * 2) - self.serial.timeout = timeout - rgb_data = struct.unpack( - f"<{self.screenwidth * self.screenheight}H", image_data) - rgb_array = np.array(rgb_data, dtype=np.uint32) - rgba_array = (0xFF000000 + - ((rgb_array & 0xF800) << 8) + # G?! - ((rgb_array & 0x07E0) >> 3) + # B - ((rgb_array & 0x001F) << 11)) # G - + rgba_array = self._capture_data() unwrapped_array = np.empty( self.screenwidth*self.screenheight, dtype=np.uint32) diff --git a/NanoVNASaver/Hardware/NanoVNA_V2.py b/NanoVNASaver/Hardware/NanoVNA_V2.py index 9048d7e..046569d 100644 --- a/NanoVNASaver/Hardware/NanoVNA_V2.py +++ b/NanoVNASaver/Hardware/NanoVNA_V2.py @@ -100,6 +100,9 @@ class NanoVNAV2(VNA): if self.isDFU(): raise IOError('Device is in DFU mode') + def getCalibration(self) -> str: + return "Unknown" + def readFirmware(self) -> str: # read register 0xf3 and 0xf4 (firmware major and minor version) cmd = pack(" str: + logger.debug("Reading calibration info.") + if not self.serial.is_open: + return "Not connected." + with self.serial.lock: + try: + drain_serial(self.serial) + self.serial.write("cal\r".encode('ascii')) + result = "" + data = "" + sleep(0.1) + while "ch>" not in data: + data = self.serial.readline().decode('ascii') + result += data + values = result.splitlines() + return values[1] + except serial.SerialException as exc: + logger.exception("Exception while reading calibration info: %s", exc) return "Unknown" def getScreenshot(self) -> QtGui.QPixmap: @@ -139,6 +156,29 @@ class VNA: "Exception while reading %s: %s", value, exc) return [] + def readVersion(self) -> str: + logger.debug("Reading version info.") + if not self.serial.is_open: + return "" + try: + with self.serial.lock: + drain_serial(self.serial) + self.serial.write("version\r".encode('ascii')) + result = "" + data = "" + sleep(0.1) + while "ch>" not in data: + data = self.serial.readline().decode('ascii') + result += data + values = result.splitlines() + logger.debug("Found version info: %s", values[1]) + return values[1] + except serial.SerialException as exc: + logger.exception("Exception while reading firmware version: %s", exc) + return "" + + + def writeSerial(self, command): if not self.serial.is_open: logger.warning("Writing without serial port being opened (%s)",