From e24ad51cad96b9ec2406f062c7dbe12072ff81ff Mon Sep 17 00:00:00 2001 From: bmo Date: Thu, 3 Jan 2019 22:46:38 -0800 Subject: [PATCH] keep track of exchanges to colorize mults that way, too. Use multicast, handle unicast rebroadcast/relay to N1MM Logger --- samples/n1mm_arrl_ru.py | 178 +++++++++++++++++++++++++--------------- 1 file changed, 114 insertions(+), 64 deletions(-) diff --git a/samples/n1mm_arrl_ru.py b/samples/n1mm_arrl_ru.py index a6626e3..d0b51af 100644 --- a/samples/n1mm_arrl_ru.py +++ b/samples/n1mm_arrl_ru.py @@ -9,6 +9,8 @@ N1MM_DB_FILE = "C:\\Users\\brian\\Documents\\N1MM Logger+\\Databases\\n9adg_2018 # Download the CTY file from the same place that WL_CTY comes from. This is the country prefix lookup file. CTY_FILE = "C:\\Users\\brian\\Documents\\N1MM Logger+\\SupportFiles\\CTY.DAT" + +# Make sure CONTESTNR correstponds to the ARRL RU contest in your N1MM Database. CONTESTNR = 1 # TODO: Find via Contest_name and START_DATE -- if multiple, show the CONTESTNR for all and have to use that strategy @@ -16,17 +18,23 @@ CONTESTNR = 1 #START_DATE="2018-12-08 00:0:00" TEST_MULTICAST = True +RETRANSMIT_UDP = True if TEST_MULTICAST: IP_ADDRESS = '224.1.1.1' PORT = 5007 else: IP_ADDRESS = '127.0.0.1' - PORT = 2237 + PORT = 2238 -MY_MAX_SCHEMA = 3 +RETRANSMIT_IP_ADDRESS = '127.0.0.1' +RETRANSMIT_IP_BOUND_PORT = 0 # 0 means pick an unused port +RETRANSMIT_IP_PORT = 2237 + +MY_MAX_SCHEMA = 2 LOOKUP_THREADS = 4 +STP_MAX_SECONDS = 20 # how often do we hit N1MM to see what sections we've worked? import sqlite3 import os @@ -34,21 +42,24 @@ import sys import re import queue import threading -from datetime import datetime -import serial +#from datetime import datetime +#import serial import logging +import time sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import pywsjtx import pywsjtx.extra.simple_server # -# TODO - listen for UDP packets from N1MM for logged QSOs to use as source of packets; process those and add to the queue, so colorization happens in somewhat realtime -# TODO - Interpret callsigns calling me to look up & colorize +# DONE! - listen for UDP packets from N1MM for logged QSOs to use as source of packets; process those and add to the queue, so colorization happens in somewhat realtime +# DONE! - Interpret callsigns calling me to look up & colorize # K1ABC W9XYZ EN42 -# TODO - Watch exchanges in general build table of callsigns and exchanges so we can find mults that way +# DONE! - Watch exchanges so we can color mults that way # K1ABC W9XYZ 579 IL # K1ABC W9XYZ R 559 IL + + # ------------------- File picker instead of constant? #import tkinter as tk #from tkinter import filedialog @@ -120,12 +131,17 @@ class N1MMLoggerPlus(): rows = c.fetchall() return rows[0]['C'] - def sections_for_prefix(self, country_prefix): + def sections_for_prefixes(self, country_prefix_list): + from itertools import chain if self.n1mm_sql_connection is None: raise Exception("N1MM database is not open") c = self.n1mm_sql_connection.cursor() - c.execute('select DISTINCT sect from dxlog where ContestNR = ? AND CountryPrefix=? AND IsMultiplier1<>0 ORDER BY sect ASC', (self.contestnr, country_prefix,)) + placeholder = '?' + placeholders = ', '.join(placeholder * len(country_prefix_list)) + query = 'select DISTINCT sect from dxlog where ContestNR = {} AND CountryPrefix in ({}) AND IsMultiplier1<>0 ORDER BY sect ASC' .format(self.contestnr, placeholders) + print(query) + c.execute(query, country_prefix_list) rows = c.fetchall() sections = [ item['sect'] for item in rows ] return sections @@ -216,14 +232,8 @@ class Cty: from threading import RLock class StateProvinceKeeper: - - - - - class __OnlyOne: - #__already_worked = [] def __init__(self): self.__already_worked = [] self.logger = logging.getLogger(__name__) @@ -253,46 +263,13 @@ class StateProvinceKeeper: StateProvinceKeeper.instance = StateProvinceKeeper.__OnlyOne() return StateProvinceKeeper.instance - - def __init__(self,st_pr_db_file_name,**kwargs): self.db_file_name = st_pr_db_file_name self.logger = kwargs.get("logger", logging.getLogger(__name__)) - self.create_db_if_needed() + #self.create_db_if_needed() self.sql_connection = None - def open_db(self): - if self.sql_connection is not None: - logging.error("Database is already open [{}]".format(self.db_file_name)) - raise Exception("Database is already open [{}]".format(self.db_file_name)) - cx = sqlite3.connect(self.db_file_name) - cx.row_factory = sqlite3.Row - self.sql_connection = cx - - def create_db_if_needed(self): - if os.path.isfile(self.db_file_name) == False: - self.logger.info("Creating DB in {}".format(self.db_file_name)) - cx = sqlite3.connect(self.db_file_name) - cx.row_factory = sqlite3.Row - db_connection = cx - cx.execute("CREATE TABLE IF NOT EXISTS SECTION_MULTS (CountryPrefix Text Not NULL, Sect Text NOT NULL, Call Not Null);") - cx.execute("CREATE UNIQUE INDEX idx_call ON SECTION_MULTS(Call);") - cx.execute("CREATE INDEX idx_sect ON SECTION_MULTS(Sect);") - cx.execute("CREATE INDEX idx_prefix ON SECTION_MULTS(CountryPrefix);") - cx.close() - - def section_for(self,call): - c = self.sql_connection.cursor() - c.execute("SELECT Sect from SECTION_MULTS where Call = ?;",(call,)) - rows = c.fetchall() - if len(rows)==0: - return None - return rows[0]['Sect'] - - def add(self, call, prefix, section): - c = self.sql_connection.cursor() - c.execute("REPLACE INTO SECTION_MULTS(call, CountryPrefix, sect) VALUES( ?, ?, ?);",(call, prefix, section,) ) class CallsignWorker: threads = [] @@ -330,19 +307,55 @@ class CallsignWorker: logging.warning(self.n1mm_args) n1mm.get_contest(**self.n1mm_args) + stp = StateProvinceKeeper() + + while True: input_pkt = self.input_queue.get() + is_section_mult = False if input_pkt is None: break prefix = self.cty.prefix_for(input_pkt['callsign']) dupe_status = n1mm.simple_dupe_status(input_pkt['callsign']) + if not dupe_status and input_pkt.get('exchange') and prefix in ['K', 'VE']: + is_section_mult = not stp.already_worked(input_pkt['exchange']) + logging.info("Section multiplier {} {}".format(input_pkt['callsign'], input_pkt['exchange'])) is_mult = not dupe_status and n1mm.prefix_worked_count(prefix)==0 input_pkt['dupe'] = dupe_status - input_pkt['is_mult'] = is_mult + input_pkt['is_mult'] = is_mult or is_section_mult + input_pkt['is_section_mult'] = is_section_mult input_pkt['prefix'] = prefix logging.debug("Thread: {} - Callsign status {} prefix:{} dupe:{} mult:{}".format(threading.current_thread().name, input_pkt['callsign'], prefix, dupe_status, is_mult, )) self.output_queue.put(input_pkt) +class Retransmitter: + def __init__(self, ip_address, ip_port, upstream_server, **kwargs): + self.server = pywsjtx.extra.simple_server.SimpleServer(ip_address, ip_port, **kwargs) + self.upstream_server = upstream_server + self.logger = kwargs.get("logger", logging.getLogger(__name__)) + + # wait for packets from the ip_address:ip_port. Resend them to upstream_server + # send packets to port 2237 + self._t = threading.Thread(target=self.packet_resender) + self._t.start() + + def send(self, message, from_addr_port, to_addr_port): + addr, port = from_addr_port + self.from_addr_port = (addr,port) #from_addr_port + self.server.send_packet(to_addr_port, message) + logging.debug("sent packet from {} to N1MM Logger port {} {} bytes".format(from_addr_port, to_addr_port, len(message))) + + + def stop_threads(self): + self._t.join() + + def packet_resender(self): + while True: + input_pkt, addr_port = self.server.rx_packet() + if (input_pkt != None and self.from_addr_port != None): + logging.debug("RX from N1MM Logger {}, Sending {} bytes to upstream {}".format(addr_port,len(input_pkt), self.from_addr_port)) + self.upstream_server.send_packet(self.from_addr_port, input_pkt) + def main(): logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] %(message)s") consoleFormatter = logging.Formatter('%(asctime)s %(message)s') @@ -365,20 +378,20 @@ def main(): n1mm = N1MMLoggerPlus(N1MM_DB_FILE,logger=logging.getLogger()) n1mm.open_db() n1mm.get_contest(contestnr=CONTESTNR) - already_worked = n1mm.sections_for_prefix('K') + already_worked = n1mm.sections_for_prefixes(['K','VE']) + print(already_worked) + + stp = StateProvinceKeeper() + stp.update_already_worked(already_worked) - #print("Sections {}".format(n1mm.sections_for_prefix('K'))) - #print("VE Sections {}".format(n1mm.sections_for_prefix('VE'))) - #tp = StateProvinceKeeper("SectionMults.s3db") - #stp = StateProvinceKeeper() - #stp.update_already_worked(already_worked) #print("AZ already worked? {}".format(stp.already_worked("AZ"))) #stp.open_db() #stp.add('N9ADG','K','WA') #stp.add('K9CT','K','IL') #print("K9CT's section {}".format(stp.section_for('K9CT'))) - return(-1) + retransmitter = None + # take calls that are CQing, or replying, etc. and colorize them after the dupe check cw = CallsignWorker(LOOKUP_THREADS, cty, N1MM_DB_FILE,{'contestnr':CONTESTNR}) @@ -387,16 +400,26 @@ def main(): # put on queue s = pywsjtx.extra.simple_server.SimpleServer(IP_ADDRESS, PORT, timeout=2.0) + + if RETRANSMIT_UDP: + retransmitter = Retransmitter(RETRANSMIT_IP_ADDRESS,0,s,timeout=0.5) + mult_foreground_color = pywsjtx.QCOLOR.White() mult_background_color = pywsjtx.QCOLOR.Red() dupe_background_color = pywsjtx.QCOLOR.RGBA(255,211,211,211) # light grey dupe_foreground_color = pywsjtx.QCOLOR.RGBA(255,169,169,169) # dark grey + stp_age = time.time() + while True: (pkt, addr_port) = s.rx_packet() if (pkt != None): + if RETRANSMIT_UDP: + # retransmit to someone else (e.g. N1MM Logger) + retransmitter.send(pkt, addr_port, (RETRANSMIT_IP_ADDRESS, RETRANSMIT_IP_PORT)) + the_packet = pywsjtx.WSJTXPacketClassFactory.from_udp_packet(addr_port, pkt) if type(the_packet) == pywsjtx.HeartBeatPacket: @@ -404,13 +427,25 @@ def main(): reply_beat_packet = pywsjtx.HeartBeatPacket.Builder(the_packet.wsjtx_id, max_schema) s.send_packet(addr_port, reply_beat_packet) - if type(the_packet) == pywsjtx.DecodePacket: + while type(the_packet) == pywsjtx.DecodePacket: m = re.match(r"^CQ\s+(\S{3,}?)\s+", the_packet.message) or re.match(r"^CQ\s+\S{2}\s+(\S{3,}?)\s+", the_packet.message) if m: callsign = m.group(1) print("Callsign {}".format(callsign)) cw.input_queue.put({'callsign':callsign, 'input':the_packet, 'addr_port':addr_port}) + break + # K1ABC W9XYZ 579 IL + # K1ABC W9XYZ R 559 IL + m = re.match(r"(\S{3,}?)\s+(\S{3,}?)(\sR)?\s5\d\d\s([A-Z]{2,3})", the_packet.message) + if m: + callsign = m.group(2) + section = m.group(4) + #print("Callsign {} - section {}".format(callsign, section)) + + cw.input_queue.put({'callsign':callsign, 'input':the_packet, 'addr_port':addr_port, 'exchange':section}) + break + break print(the_packet) # service queue @@ -427,7 +462,15 @@ def main(): True) s.send_packet(resolved['addr_port'], color_pkt) - if resolved['is_mult']: + if resolved['is_section_mult']: # color the whole thing + color_pkt = pywsjtx.HighlightCallsignPacket.Builder(wsjtx_id, resolved['input'].message, + mult_background_color, + mult_foreground_color, + True) + s.send_packet(resolved['addr_port'], color_pkt) + + pass + elif resolved['is_mult']: color_pkt = pywsjtx.HighlightCallsignPacket.Builder(wsjtx_id, resolved['callsign'], mult_background_color, mult_foreground_color, @@ -435,11 +478,18 @@ def main(): s.send_packet(resolved['addr_port'], color_pkt) if not resolved['is_mult'] and not resolved['dupe']: - color_pkt = pywsjtx.HighlightCallsignPacket.Builder(wsjtx_id, resolved['callsign'], - pywsjtx.QCOLOR.Uncolor(), - pywsjtx.QCOLOR.Uncolor(), - True) - s.send_packet(resolved['addr_port'], color_pkt) + #color_pkt = pywsjtx.HighlightCallsignPacket.Builder(wsjtx_id, resolved['callsign'], + # pywsjtx.QCOLOR.Uncolor(), + # pywsjtx.QCOLOR.Uncolor(), + # True) + #s.send_packet(resolved['addr_port'], color_pkt) + pass + + if (time.time() - stp_age) > STP_MAX_SECONDS: + logging.debug("Updating sections from N1MM") + already_worked = n1mm.sections_for_prefixes(['K', 'VE']) + stp.update_already_worked(already_worked) + stp_age = time.time() if __name__ == "__main__": main()