diff --git a/NanoVNASaver/Analysis/EFHWAnalysis.py b/NanoVNASaver/Analysis/EFHWAnalysis.py index 674ddee..e0b9126 100644 --- a/NanoVNASaver/Analysis/EFHWAnalysis.py +++ b/NanoVNASaver/Analysis/EFHWAnalysis.py @@ -16,154 +16,38 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import csv import logging from PyQt5 import QtWidgets import NanoVNASaver.AnalyticTools as at -from NanoVNASaver.Analysis.Base import Analysis, QHLine +from NanoVNASaver.Analysis.ResonanceAnalysis import ( + ResonanceAnalysis, format_resistence_neg +) from NanoVNASaver.Formatting import ( - format_frequency, format_complex_imp, - format_frequency_short, format_resistance) -from NanoVNASaver.RFTools import reflection_coefficient + format_frequency, format_complex_imp, format_frequency_short) logger = logging.getLogger(__name__) -def format_resistence_neg(x): - return format_resistance(x, allow_negative=True) - - -def vswr_transformed(z, ratio=49) -> float: - refl = reflection_coefficient(z / ratio) - mag = abs(refl) - return 1 if mag == 1 else (1 + mag) / (1 - mag) - - -class ResonanceAnalysis(Analysis): - - def __init__(self, app): - super().__init__(app) - - self._widget = QtWidgets.QWidget() - self.layout = QtWidgets.QFormLayout() - self._widget.setLayout(self.layout) - self.input_description = QtWidgets.QLineEdit("") - self.checkbox_move_marker = QtWidgets.QCheckBox() - self.layout.addRow(QtWidgets.QLabel("Settings")) - self.layout.addRow("Description", self.input_description) - self.layout.addRow(QHLine()) - - self.layout.addRow(QHLine()) - - self.results_label = QtWidgets.QLabel("Results") - self.layout.addRow(self.results_label) - - def _get_data(self, index): - s11 = self.app.data.s11 - my_data = { - "freq": s11[index].freq, - "s11": s11[index].z, - "lambda": s11[index].wavelength, - "impedance": s11[index].impedance(), - "vswr": s11[index].vswr, - } - my_data["vswr_49"] = vswr_transformed( - my_data["impedance"], 49) - my_data["vswr_4"] = vswr_transformed( - my_data["impedance"], 4) - my_data["r"] = my_data["impedance"].real - my_data["x"] = my_data["impedance"].imag - - return my_data - - def _get_crossing(self): - data = [d.phase for d in self.app.data.s11] - return at.zero_crossings(data) - - def runAnalysis(self): - self.reset() - filename = ( - os.path.join("/tmp/", f"{self.input_description.text()}.csv") - if self.input_description.text() - else None) - - crossing = self._get_crossing() - - logger.debug("Found %d sections ", - len(crossing)) - - results_header = self.layout.indexOf(self.results_label) - logger.debug("Results start at %d, out of %d", - results_header, self.layout.rowCount()) - for _ in range(results_header, self.layout.rowCount()): - self.layout.removeRow(self.layout.rowCount() - 1) - - if not crossing: - self.layout.addRow(QtWidgets.QLabel( - "No resonance found")) - - extended_data = [] - for m in crossing: - start, lowest, end = m - my_data = self._get_data(lowest) - s11_low = self.app.data.s11[lowest] - extended_data.append(my_data) - if start != end: - logger.debug( - "Section from %d to %d, lowest at %d", - start, end, lowest) - self.layout.addRow( - "Resonance", - QtWidgets.QLabel( - f"{format_frequency(s11_low.freq)}" - f" ({format_complex_imp(s11_low.impedance())})")) - else: - self.layout.addRow("Resonance", QtWidgets.QLabel( - format_frequency(self.app.data.s11[lowest].freq))) - self.layout.addWidget(QHLine()) - # Remove the final separator line - self.layout.removeRow(self.layout.rowCount() - 1) - if filename and extended_data: - with open(filename, 'w', encoding='utf-8', newline='') as csvfile: - fieldnames = extended_data[0].keys() - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - - writer.writeheader() - for row in extended_data: - writer.writerow(row) - - class EFHWAnalysis(ResonanceAnalysis): """ find only resonance when HI impedance """ - old_data = [] - def reset(self): - logger.debug("reset") + def __init__(self, app): + super().__init__(app) + self.old_data = [] - def runAnalysis(self): - self.reset() - if description := self.input_description.text(): - filename = os.path.join("/tmp/", f"{description}.csv") - else: - filename = None - crossing = self._get_crossing() - data = [d.impedance().real for d in self.app.data.s11] - maximums = sorted(at.maxima(data, threshold=500)) - results_header = self.layout.indexOf(self.results_label) - logger.debug("Results start at %d, out of %d", - results_header, self.layout.rowCount()) - - for _ in range(results_header, self.layout.rowCount()): - self.layout.removeRow(self.layout.rowCount() - 1) + def do_resonance_analysis(self): + s11 = self.app.data.s11 + maximums = sorted( + at.maxima([d.impedance().real for d in s11], + threshold=500)) extended_data = {} - logger.info("TO DO: find near data") - for lowest in crossing: + for lowest in self.crossing: my_data = self._get_data(lowest) if lowest in extended_data: extended_data[lowest].update(my_data) @@ -186,24 +70,24 @@ class EFHWAnalysis(ResonanceAnalysis): else: diff = self.compare({}, extended_data, fields=fields) self.old_data.append(extended_data) - for i, index in enumerate(sorted(extended_data.keys())): - s11_idx = self.app.data.s11[index] + for i, idx in enumerate(sorted(extended_data.keys())): self.layout.addRow( - f"{format_frequency_short(s11_idx.freq)}", + f"{format_frequency_short(s11[idx].freq)}", QtWidgets.QLabel( f" ({diff[i]['freq']})" - f" {format_complex_imp(s11_idx.impedance())}" + f" {format_complex_imp(s11[idx].impedance())}" f" ({diff[i]['r']}) {diff[i]['lambda']} m")) - if filename and extended_data: - with open(filename, 'w', newline='', encoding='utf-8') as csvfile: + if self.filename and extended_data: + with open( + self.filename, 'w', newline='', encoding='utf-8' + ) as csvfile: fieldnames = extended_data[sorted( extended_data.keys())[0]].keys() writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() - for index in sorted(extended_data.keys()): - row = extended_data[index] - writer.writerow(row) + for idx in sorted(extended_data.keys()): + writer.writerow(extended_data[idx]) def compare(self, old, new, fields=None): """ @@ -228,65 +112,51 @@ class EFHWAnalysis(ResonanceAnalysis): i_max = min(len(old_idx), len(new_idx)) i_tot = max(len(old_idx), len(new_idx)) - if i_max == i_tot: - logger.debug("may be the same antenna ... analyzing") - - else: + if i_max != i_tot: logger.warning("resonances changed from %s to %s", len(old_idx), len(new_idx)) - logger.debug("Trying to compare only first %s resonances", i_max) - split = 0 - max_delta_f = 1000000 # 1M + max_delta_f = 1_000_000 for i, k in enumerate(new_idx): - my_diff = {} - - logger.info("Risonance %s at %s", i, - format_frequency(new[k]["freq"])) - if len(old_idx) <= i + split: diff[i] = no_compare() continue + logger.info("Resonance %s at %s", i, new[k]["freq"]) + delta_f = new[k]["freq"] - old[old_idx[i + split]]["freq"] if abs(delta_f) < max_delta_f: logger.debug("can compare") + diff[i] = { + desc: fnc(new[k][desc] - old[old_idx[i + split]][desc]) + for desc, fnc in fields + } + logger.debug("Deltas %s", diff[i]) + continue - else: - logger.debug("can't compare, %s is too much ", - format_frequency(delta_f)) - if delta_f > 0: + logger.debug("can't compare, %s is too much ", + format_frequency(delta_f)) - logger.debug("possible missing band, ") - if len(old_idx) > (i + split + 1): - if (abs(new[k]["freq"] - - old[old_idx[i + split + 1]]["freq"]) < - max_delta_f): - logger.debug("new is missing band, compare next ") - split += 1 + if delta_f > 0: + logger.debug("possible missing band, ") + if len(old_idx) > (i + split + 1): + if (abs(new[k]["freq"] - + old[old_idx[i + split + 1]]["freq"]) < + max_delta_f): + logger.debug("new is missing band, compare next ") + split += 1 # FIXME: manage 2 or more band missing ?!? - else: - logger.debug("new band, non compare ") - diff[i] = no_compare() - continue - else: + continue logger.debug("new band, non compare ") diff[i] = no_compare() + continue - split -= 1 - continue - - for d, fn in fields: - my_diff[d] = fn(new[k][d] - old[old_idx[i + split]][d]) - logger.info("Delta %s = %s", d, - my_diff[d]) - - diff[i] = my_diff + logger.debug("new band, non compare ") + diff[i] = no_compare() + split -= 1 for i in range(i_max, i_tot): # add missing in old ... if any - diff[i] = no_compare() - return diff diff --git a/NanoVNASaver/Analysis/ResonanceAnalysis.py b/NanoVNASaver/Analysis/ResonanceAnalysis.py index 674ddee..f75613f 100644 --- a/NanoVNASaver/Analysis/ResonanceAnalysis.py +++ b/NanoVNASaver/Analysis/ResonanceAnalysis.py @@ -19,6 +19,7 @@ import os import csv import logging +from typing import List from PyQt5 import QtWidgets @@ -26,7 +27,7 @@ import NanoVNASaver.AnalyticTools as at from NanoVNASaver.Analysis.Base import Analysis, QHLine from NanoVNASaver.Formatting import ( format_frequency, format_complex_imp, - format_frequency_short, format_resistance) + format_resistance) from NanoVNASaver.RFTools import reflection_coefficient logger = logging.getLogger(__name__) @@ -46,7 +47,8 @@ class ResonanceAnalysis(Analysis): def __init__(self, app): super().__init__(app) - + self.crossing: List[int] = [] + self.filename = "" self._widget = QtWidgets.QWidget() self.layout = QtWidgets.QFormLayout() self._widget.setLayout(self.layout) @@ -79,34 +81,32 @@ class ResonanceAnalysis(Analysis): return my_data - def _get_crossing(self): - data = [d.phase for d in self.app.data.s11] - return at.zero_crossings(data) - def runAnalysis(self): self.reset() - filename = ( - os.path.join("/tmp/", f"{self.input_description.text()}.csv") - if self.input_description.text() - else None) - - crossing = self._get_crossing() - - logger.debug("Found %d sections ", - len(crossing)) + self.filename = os.path.join( + "/tmp/", f"{self.input_description.text()}.csv" + ) if self.input_description.text() else "" results_header = self.layout.indexOf(self.results_label) logger.debug("Results start at %d, out of %d", results_header, self.layout.rowCount()) + for _ in range(results_header, self.layout.rowCount()): self.layout.removeRow(self.layout.rowCount() - 1) - if not crossing: + self.crossing = at.zero_crossings([d.phase for d in self.app.data.s11]) + logger.debug("Found %d sections ", + len(self.crossing)) + if not self.crossing: self.layout.addRow(QtWidgets.QLabel( "No resonance found")) + return + self.do_resonance_analysis() + + def do_resonance_analysis(self): extended_data = [] - for m in crossing: + for m in self.crossing: start, lowest, end = m my_data = self._get_data(lowest) s11_low = self.app.data.s11[lowest] @@ -126,167 +126,12 @@ class ResonanceAnalysis(Analysis): self.layout.addWidget(QHLine()) # Remove the final separator line self.layout.removeRow(self.layout.rowCount() - 1) - if filename and extended_data: - with open(filename, 'w', encoding='utf-8', newline='') as csvfile: + if self.filename and extended_data: + with open( + self.filename, 'w', encoding='utf-8', newline='' + ) as csvfile: fieldnames = extended_data[0].keys() writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() for row in extended_data: writer.writerow(row) - - -class EFHWAnalysis(ResonanceAnalysis): - """ - find only resonance when HI impedance - """ - old_data = [] - - def reset(self): - logger.debug("reset") - - def runAnalysis(self): - self.reset() - if description := self.input_description.text(): - filename = os.path.join("/tmp/", f"{description}.csv") - else: - filename = None - crossing = self._get_crossing() - data = [d.impedance().real for d in self.app.data.s11] - maximums = sorted(at.maxima(data, threshold=500)) - results_header = self.layout.indexOf(self.results_label) - logger.debug("Results start at %d, out of %d", - results_header, self.layout.rowCount()) - - for _ in range(results_header, self.layout.rowCount()): - self.layout.removeRow(self.layout.rowCount() - 1) - extended_data = {} - - logger.info("TO DO: find near data") - for lowest in crossing: - my_data = self._get_data(lowest) - if lowest in extended_data: - extended_data[lowest].update(my_data) - else: - extended_data[lowest] = my_data - logger.debug("maximumx %s of type %s", maximums, type(maximums)) - for m in maximums: - logger.debug("m %s of type %s", m, type(m)) - my_data = self._get_data(m) - if m in extended_data: - extended_data[m].update(my_data) - else: - extended_data[m] = my_data - fields = [("freq", format_frequency_short), - ("r", format_resistence_neg), ("lambda", lambda x: round(x, 2))] - - if self.old_data: - diff = self.compare( - self.old_data[-1], extended_data, fields=fields) - else: - diff = self.compare({}, extended_data, fields=fields) - self.old_data.append(extended_data) - for i, index in enumerate(sorted(extended_data.keys())): - s11_idx = self.app.data.s11[index] - self.layout.addRow( - f"{format_frequency_short(s11_idx.freq)}", - QtWidgets.QLabel( - f" ({diff[i]['freq']})" - f" {format_complex_imp(s11_idx.impedance())}" - f" ({diff[i]['r']}) {diff[i]['lambda']} m")) - - if filename and extended_data: - with open(filename, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = extended_data[sorted( - extended_data.keys())[0]].keys() - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - for index in sorted(extended_data.keys()): - row = extended_data[index] - writer.writerow(row) - - def compare(self, old, new, fields=None): - """ - Compare data to help changes - - NB - must be same sweep - ( same index must be same frequence ) - :param old: - :param new: - """ - fields = fields or [("freq", str), ] - - def no_compare(): - - return {k: "-" for k, _ in fields} - - old_idx = sorted(old.keys()) - # 'odict_keys' object is not subscriptable - new_idx = sorted(new.keys()) - diff = {} - i_max = min(len(old_idx), len(new_idx)) - i_tot = max(len(old_idx), len(new_idx)) - - if i_max == i_tot: - logger.debug("may be the same antenna ... analyzing") - - else: - logger.warning("resonances changed from %s to %s", - len(old_idx), len(new_idx)) - - logger.debug("Trying to compare only first %s resonances", i_max) - - split = 0 - max_delta_f = 1000000 # 1M - for i, k in enumerate(new_idx): - my_diff = {} - - logger.info("Risonance %s at %s", i, - format_frequency(new[k]["freq"])) - - if len(old_idx) <= i + split: - diff[i] = no_compare() - continue - - delta_f = new[k]["freq"] - old[old_idx[i + split]]["freq"] - if abs(delta_f) < max_delta_f: - logger.debug("can compare") - - else: - logger.debug("can't compare, %s is too much ", - format_frequency(delta_f)) - if delta_f > 0: - - logger.debug("possible missing band, ") - if len(old_idx) > (i + split + 1): - if (abs(new[k]["freq"] - - old[old_idx[i + split + 1]]["freq"]) < - max_delta_f): - logger.debug("new is missing band, compare next ") - split += 1 - # FIXME: manage 2 or more band missing ?!? - else: - logger.debug("new band, non compare ") - diff[i] = no_compare() - continue - else: - logger.debug("new band, non compare ") - diff[i] = no_compare() - - split -= 1 - continue - - for d, fn in fields: - my_diff[d] = fn(new[k][d] - old[old_idx[i + split]][d]) - logger.info("Delta %s = %s", d, - my_diff[d]) - - diff[i] = my_diff - - for i in range(i_max, i_tot): - # add missing in old ... if any - - diff[i] = no_compare() - - return diff