initial version of dupe checking against n1mm db, usage of annotation API

update-annotate
Brian Moran 2024-02-21 11:26:14 -08:00
rodzic b26b9db97e
commit e3801e393d
4 zmienionych plików z 294 dodań i 3 usunięć

Wyświetl plik

@ -21,6 +21,9 @@ class QCOLOR:
return QCOLOR(QCOLOR.SPEC_RGB, 255, 255, 0, 0)
@classmethod
def Green(cls):
return QCOLOR(QCOLOR.SPEC_RGB, 255, 0, 255, 0)
@classmethod
def RGBA(cls, alpha, red, green, blue):
return QCOLOR(QCOLOR.SPEC_RGB, alpha, red, green, blue)

Wyświetl plik

@ -2,6 +2,15 @@
import struct
import datetime
class QCOLOR:
def __init__(self, alpha = 0xff, red = 0xff, green = 0xff, blue = 0xff):
self.valid = False
self.spec = 0;
self.terminator = 0;
self.alpha = alpha;
self.red = red;
self.blue = green;
self.green = blue;
class PacketUtil:
@classmethod
@ -116,6 +125,11 @@ class PacketReader(object):
self.ptr_pos += 4
return the_int32
def QUInt32(self):
self.check_ptr_bound('QUInt32', 4) # sure we could inspect that, but that is slow.
(the_uint32,) = struct.unpack('>L',self.packet[self.ptr_pos:self.ptr_pos+4])
self.ptr_pos += 4
return the_uint32
def QInt8(self):
self.check_ptr_bound('QInt8', 1)
@ -144,6 +158,38 @@ class PacketReader(object):
self.ptr_pos += str_len
return str.decode('utf-8')
def QColor(self):
try:
spec = self.QInt8()
alpha = self.QUInt8()
alpha2 = self.QUInt8()
if alpha != alpha2:
raise Exception('Invalid QColor alpha value')
red = self.QUInt8()
red2 = self.QUInt8()
if red != red2:
raise Exception('Invalid QColor red value')
green = self.QUInt8()
green2 = self.QUInt8()
if green != green2:
raise Exception('Invalid QColor green value')
blue = self.QUInt8()
blue2 = self.QUInt8()
if blue != blue2:
raise Exception('Invalid QColor blue value')
terminator = self.QInt16()
if terminator != 0:
raise Exception('Invalid QColor terminator value')
except struct.error:
raise Exception('No data for QCOLOR value')
return QCOLOR(spec, alpha, red, green, blue)
class GenericWSJTXPacket(object):
SCHEMA_VERSION = 3
MINIMUM_SCHEMA_SUPPORTED = 2
@ -202,6 +248,9 @@ class StatusPacket(GenericWSJTXPacket):
def __init__(self, addr_port, magic, schema, pkt_type, id, pkt):
GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt)
ps = PacketReader(pkt)
self.highlighted_calls = 0xffffffff;
self.annotated_calls = 0xffffffff;
the_type = ps.QInt32()
self.wsjtx_id = ps.QString()
self.dial_frequency = ps.QInt64()
@ -228,15 +277,24 @@ class StatusPacket(GenericWSJTXPacket):
self.sub_mode = ps.QString()
self.fast_mode = ps.QInt8()
# new in wsjtx-2.0.0
self.special_op_mode = ps.QInt8()
self.frequency_tolerance = ps.QUInt32()
self.tr_period = ps.QUInt32()
self.configuration_name = ps.QString()
self.tx_message = ps.QString()
if ps.at_eof():
return
self.highlighted_calls = ps.QUInt32()
self.annotated_calls = ps.QUInt32()
def __repr__(self):
str = 'StatusPacket: from {}:{}\n\twsjtx id:{}\tde_call:{}\tde_grid:{}\n'.format(self.addr_port[0], self.addr_port[1],self.wsjtx_id,
self.de_call, self.de_grid)
str += "\tfrequency:{}\trx_df:{}\ttx_df:{}\tdx_call:{}\tdx_grid:{}\treport:{}\n".format(self.dial_frequency, self.rx_df, self.tx_df, self.dx_call, self.dx_grid, self.report)
str += "\ttransmitting:{}\t decoding:{}\ttx_enabled:{}\ttx_watchdog:{}\tsub_mode:{}\tfast_mode:{}\tspecial_op_mode:{}".format(self.transmitting, self.decoding, self.tx_enabled, self.tx_watchdog,
self.sub_mode, self.fast_mode, self.special_op_mode)
str += "\ttransmitting:{}\t decoding:{}\ttx_enabled:{}\ttx_watchdog:{}\tsub_mode:{}\tfast_mode:{}\n\tspecial_op_mode:{}\tfrequency_tolerance:{}\n".format(self.transmitting, self.decoding, self.tx_enabled, self.tx_watchdog, self.sub_mode, self.fast_mode, self.special_op_mode, self.frequency_tolerance)
str += "\ttr_period:{}\tconfiguration_name:{}\ttx_message:{}\n\thighlighted_calls:{}\tannotated_calls:{}\n".format(self.tr_period, self.configuration_name, self.tx_message, self.highlighted_calls, self.annotated_calls)
return str
@ -381,6 +439,26 @@ class HighlightCallsignPacket(GenericWSJTXPacket):
pkt.write_QBool(highlight_last_only)
return pkt.packet
class AnnotateCallsignPacket(GenericWSJTXPacket):
TYPE_VALUE = 16
def __init__(self, addr_port, magic, schema, pkt_type, id, pkt):
GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt)
# handle packet-specific stuff.
# the callsign field can contain text, callsigns, entire lines.
@classmethod
def Builder(cls, to_wsjtx_id='WSJT-X', callsign="K1JT", sort_order_provided=True, sort_order=0xFFFFFFFF):
# build the packet to send
pkt = PacketWriter()
pkt.write_QInt32(AnnotateCallsignPacket.TYPE_VALUE)
pkt.write_QString(to_wsjtx_id)
pkt.write_QString(callsign)
pkt.write_QBool(True)
pkt.write_QInt32(sort_order)
return pkt.packet
class WSJTXPacketClassFactory(GenericWSJTXPacket):
PACKET_TYPE_TO_OBJ_MAP = {

Wyświetl plik

@ -0,0 +1,81 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import pywsjtx.extra.simple_server
import re
import random
TEST_MULTICAST = True
MYCALL = 'TO4A'
if TEST_MULTICAST:
IP_ADDRESS = '224.1.1.1'
PORT = 5007
else:
IP_ADDRESS = '127.0.0.1'
PORT = 2237
MY_MAX_SCHEMA = 3
s = pywsjtx.extra.simple_server.SimpleServer(IP_ADDRESS, PORT, timeout=2.0)
while True:
(pkt, addr_port) = s.rx_packet()
print("Packet from {}".format(addr_port))
if (pkt != None):
the_packet = pywsjtx.WSJTXPacketClassFactory.from_udp_packet(addr_port, pkt)
if type(the_packet) == pywsjtx.HeartBeatPacket:
max_schema = max(the_packet.max_schema, MY_MAX_SCHEMA)
reply_beat_packet = pywsjtx.HeartBeatPacket.Builder(the_packet.wsjtx_id,max_schema)
s.send_packet(addr_port, reply_beat_packet)
if type(the_packet) == pywsjtx.DecodePacket:
m = re.match(r"^CQ\s+(\S+)\s+", the_packet.message)
if False and m:
print("Callsign {}".format(m.group(1)))
callsign = m.group(1)
color_pkt = pywsjtx.HighlightCallsignPacket.Builder(the_packet.wsjtx_id, callsign,
pywsjtx.QCOLOR.White(),
pywsjtx.QCOLOR.Red(),
True)
normal_pkt = pywsjtx.HighlightCallsignPacket.Builder(the_packet.wsjtx_id, callsign,
pywsjtx.QCOLOR.Uncolor(),
pywsjtx.QCOLOR.Uncolor(),
True)
s.send_packet(addr_port, color_pkt)
#print(pywsjtx.PacketUtil.hexdump(color_pkt))
m = re.match(re.compile("^({mycall})\s+(\S+)\s+".format(mycall = MYCALL)), the_packet.message)
if m:
print("******************************** CALLER {}".format(m.group(2)))
callsign = m.group(2)
color_pkt = pywsjtx.HighlightCallsignPacket.Builder(the_packet.wsjtx_id, callsign,
pywsjtx.QCOLOR.Red(),
pywsjtx.QCOLOR.White(), # RGBA(255, 50, 137, 48 ),
False)
normal_pkt = pywsjtx.HighlightCallsignPacket.Builder(the_packet.wsjtx_id, callsign,
pywsjtx.QCOLOR.Uncolor(),
pywsjtx.QCOLOR.Uncolor(),
False)
annotate_pkt = pywsjtx.AnnotateCallsignPacket.Builder(the_packet.wsjtx_id, "UA0LBF",
True,
12);
annotate_pkt2 = pywsjtx.AnnotateCallsignPacket.Builder(the_packet.wsjtx_id, "HL3AMO",
True,
67);
if True:
s.send_packet(addr_port, color_pkt)
s.send_packet(addr_port, annotate_pkt)
s.send_packet(addr_port, annotate_pkt2)
else:
s.send_packet(addr_port, normal_pkt)
print(the_packet)

Wyświetl plik

@ -0,0 +1,129 @@
import os
import sys
import sqlite3
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import pywsjtx.extra.simple_server
import re
TEST_MULTICAST = True
MATCH_MY_CALL = True # use False to get packets for any call, useful for debugging
# N1MM database to check against
DBFILE = '/Users/brianmoran/Documents/T88WA/T88WA/COMBINED/t88wa_superset.s3db'
MYCALL = 'N9ADG' # this will get updated from the status packets
dial_frequency = 14074000 # this will get updated from the status packets
# ContestNR column should NOT be equal to -1
if TEST_MULTICAST:
IP_ADDRESS = '224.1.1.1'
PORT = 5007
else:
IP_ADDRESS = '127.0.0.1'
PORT = 2237
MY_MAX_SCHEMA = 3
s = pywsjtx.extra.simple_server.SimpleServer(IP_ADDRESS, PORT, timeout=2.0)
def get_db():
db = sqlite3.connect(DBFILE)
return db
def lookup_dupe_status(callsign):
db = get_db()
c = db.cursor()
c.execute("SELECT mode, band, count(*) FROM DXLOG WHERE call = ? AND ContestNR <> -1 group by band, mode", (callsign,))
return c.fetchall()
def calculate_dupe_score(current_band, all_records):
score = 0
for rec in all_records:
mode, band, qcount = rec
if (mode == 'FT8' and current_band == band):
score += qcount * 2000
elif (mode != 'FT8' and current_band == band):
score += qcount * 300
else:
score += qcount * 20
return score
def band_for(dial_frequency):
band = 14.0
if dial_frequency >= 1800000 and dial_frequency < 2000000:
band = 1.8
elif dial_frequency >= 3500000 and dial_frequency < 4000000:
band = 3.5
elif dial_frequency >= 5000000 and dial_frequency < 6000000:
band = 5
elif dial_frequency >= 7000000 and dial_frequency < 7500000:
band = 7
elif dial_frequency >= 10000000 and dial_frequency < 10500000:
band = 10
elif dial_frequency >= 14000000 and dial_frequency < 14400000:
band = 14
elif dial_frequency >= 18000000 and dial_frequency < 19000000:
band = 18
elif dial_frequency >= 21000000 and dial_frequency < 21600000:
band = 21
elif dial_frequency >= 24000000 and dial_frequency < 25000000:
band = 24
elif dial_frequency >= 28000000 and dial_frequency < 30000000:
band = 28
elif dial_frequency >= 50000000 and dial_frequency < 55000000:
band = 50
elif dial_frequency >= 144000000 and dial_frequency < 148000000:
band = 144
return band
test_data = [('FT8', 7.0, 1), ('CW', 10.0, 1), ('FT8', 14.0, 1), ('FT8', 18.0, 1), ('FT8', 28.0, 1)]
test_band = 7.0
print("Dupe score for data {} {}".format(test_band, calculate_dupe_score(test_band, test_data)))
#exit(0);
while True:
(pkt, addr_port) = s.rx_packet()
if addr_port is None:
print(".")
continue
#print("Packet from {}".format(addr_port))
if (pkt != None):
the_packet = pywsjtx.WSJTXPacketClassFactory.from_udp_packet(addr_port, pkt)
print(the_packet)
if type(the_packet) == pywsjtx.StatusPacket:
dial_frequency = the_packet.dial_frequency
MYCALL = the_packet.de_call
if type(the_packet) == pywsjtx.HeartBeatPacket:
max_schema = max(the_packet.max_schema, MY_MAX_SCHEMA)
reply_beat_packet = pywsjtx.HeartBeatPacket.Builder(the_packet.wsjtx_id,max_schema)
s.send_packet(addr_port, reply_beat_packet)
if type(the_packet) == pywsjtx.DecodePacket:
if the_packet.message is None:
print("No messages in packet!!!!")
continue
# get the callsigns calling
if MATCH_MY_CALL:
m = re.match(re.compile("^({mycall})\s+(\S+)\s+".format(mycall = MYCALL)), the_packet.message)
else:
m = re.match(r"^(\S+)\s+(\S+)\s+", the_packet.message)
if m:
callsign = m.group(2)
#print(lookup_dupe_status(callsign))
dupe_tuples = lookup_dupe_status(callsign)
dupe_score = calculate_dupe_score(band_for(dial_frequency), dupe_tuples)
# I just like saying "dupe tuple" in my head
print("{} Dupe Score on {} is {} - {}\n".format(callsign, band_for(dial_frequency), dupe_score, dupe_tuples))
if dupe_score > 0:
annotate_pkt = pywsjtx.AnnotateCallsignPacket.Builder(the_packet.wsjtx_id
, callsign
, True
, dupe_score)
s.send_packet(addr_port, annotate_pkt)