import struct import datetime class PacketUtil: @classmethod # this hexdump brought to you by Stack Overflow def hexdump(cls, src, length=16): FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)]) lines = [] for c in range(0, len(src), length): chars = src[c:c + length] hex = ' '.join(["%02x" % x for x in chars]) printable = ''.join(["%s" % ((x <= 127 and FILTER[x]) or '.') for x in chars]) lines.append("%04x %-*s %s\n" % (c, length * 3, hex, printable)) return ''.join(lines) # timezone tomfoolery @classmethod def midnight_utc(cls): utcnow = datetime.datetime.utcnow() utcmidnight = datetime.datetime(utcnow.year, utcnow.month, utcnow.day, 0, 0) return utcmidnight class PacketWriter(object): def __init__(self ): self.ptr_pos = 0 self.packet = bytearray() # self.max_ptr_pos self.write_header() def write_header(self): self.write_QUInt32(GenericWSJTXPacket.MAGIC_NUMBER) self.write_QInt32(GenericWSJTXPacket.SCHEMA_VERSION) def write_QInt8(self, val): self.packet.extend(struct.pack('>b', val)) def write_QInt32(self, val): self.packet.extend(struct.pack('>l',val)) def write_QUInt32(self, val): self.packet.extend(struct.pack('>L', val)) def write_QInt64(self, val): self.packet.extend(struct.pack('>q',val)) def write_QFloat(self, val): self.packet.extent(struct.pack('>d', val)) def write_QString(self, str_val): length = len(str_val) self.write_QInt32(length) b_values = str_val if type(str_val) != bytes: b_values = str_val.encode() self.packet.extend(b_values) class PacketReader(object): def __init__(self, packet): self.ptr_pos = 0 self.packet = packet self.max_ptr_pos = len(packet)-1 self.skip_header() def at_eof(self): return self.ptr_pos > self.max_ptr_pos def skip_header(self): if self.max_ptr_pos < 8: raise Exception('Not enough data to skip header') self.ptr_pos = 8 def check_ptr_bound(self,field_type, length): if self.ptr_pos + length > self.max_ptr_pos+1: raise Exception('Not enough data to extract {}'.format(field_type)) ## grab data from the packet, incrementing the ptr_pos on the basis of the data we've gleaned def QInt32(self): self.check_ptr_bound('QInt32', 4) # sure we could inspect that, but that is slow. (the_int32,) = struct.unpack('>l',self.packet[self.ptr_pos:self.ptr_pos+4]) self.ptr_pos += 4 return the_int32 def QInt8(self): self.check_ptr_bound('QInt8', 1) (the_int8,) = struct.unpack('>b', self.packet[self.ptr_pos:self.ptr_pos+1]) self.ptr_pos += 1 return the_int8 def QInt64(self): self.check_ptr_bound('QInt64', 8) (the_int64,) = struct.unpack('>q', self.packet[self.ptr_pos:self.ptr_pos+8]) self.ptr_pos += 8 return the_int64 def QFloat(self): self.check_ptr_bound('QFloat', 8) (the_double,) = struct.unpack('>d', self.packet[self.ptr_pos:self.ptr_pos+8]) self.ptr_pos += 8 return the_double def QString(self): str_len = self.QInt32() if str_len == -1: return None self.check_ptr_bound('QString[{}]'.format(str_len),str_len) (str,) = struct.unpack('{}s'.format(str_len), self.packet[self.ptr_pos:self.ptr_pos + str_len]) self.ptr_pos += str_len return str.decode('utf-8') class GenericWSJTXPacket(object): SCHEMA_VERSION = 2 MINIMUM_SCHEMA_SUPPORTED = 2 MAXIMUM_SCHEMA_SUPPORTED = 2 MINIMUM_NETWORK_MESSAGE_SIZE = 8 MAXIMUM_NETWORK_MESSAGE_SIZE = 2048 MAGIC_NUMBER = 0xadbccbda SCHEMA_VERSION = 2 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): self.addr_port = addr_port self.magic = magic self.schema = schema self.pkt_type = pkt_type self.id = id self.pkt = pkt class InvalidPacket(GenericWSJTXPacket): TYPE_VALUE = -1 def __init__(self, addr_port, packet, message): self.packet = packet self.message = message self.addr_port = addr_port def __repr__(self): return 'Invalid Packet: %s from %s:%s\n%s' % (self.message, self.addr_port[0], self.addr_port[1], PacketUtil.hexdump(self.packet)) class HeartBeatPacket(GenericWSJTXPacket): TYPE_VALUE = 0 def __init__(self, addr_port: object, magic: object, schema: object, pkt_type: object, id: object, pkt: object) -> object: GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) ps = PacketReader(pkt) the_type = ps.QInt32() self.wsjtx_id = ps.QString() self.max_schema = ps.QInt32() self.version = ps.QInt8() self.revision = ps.QInt8() def __repr__(self): return 'HeartBeatPacket: from {}:{}\n\twsjtx id:{}\tmax_schema:{}\tversion:{}\trevision:{}' .format(self.addr_port[0], self.addr_port[1], self.wsjtx_id, self.max_schema, self.version, self.revision) @classmethod # make a heartbeat packet (a byte array) we can send to a 'client'. This should be it's own class. def Builder(cls,wsjtx_id='pywsjtx', max_schema=2, version=1, revision=1): # build the packet to send pkt = PacketWriter() pkt.write_QInt32(HeartBeatPacket.TYPE_VALUE) pkt.write_QString(wsjtx_id) pkt.write_QInt32(max_schema) pkt.write_QInt32(version) pkt.write_QInt32(revision) return pkt.packet class StatusPacket(GenericWSJTXPacket): TYPE_VALUE = 1 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) ps = PacketReader(pkt) the_type = ps.QInt32() self.wsjtx_id = ps.QString() self.dial_frequency = ps.QInt64() self.mode = ps.QString() self.dx_call = ps.QString() self.report = ps.QString() self.tx_mode = ps.QString() self.tx_enabled = ps.QInt8() self.transmitting = ps.QInt8() self.decoding = ps.QInt8() self.rx_df = ps.QInt32() self.tx_df = ps.QInt32() self.de_call = ps.QString() self.de_grid = ps.QString() self.dx_grid = ps.QString() self.tx_watchdog = ps.QInt8() self.sub_mode = ps.QString() self.fast_mode = ps.QInt8() def __repr__(self): str = 'StatusPacket: from {}:{}\n\twsjtx id:{}\tde_call:{}\tde_grid:{}\n'.format(self.addr_port[0], self.addr_port[1],self.wsjtx_id, self.de_call, self.de_grid) str += "\tfrequency:{}\trx_df:{}\ttx_df:{}\tdx_call:{}\tdx_grid:{}\treport:{}\n".format(self.dial_frequency, self.rx_df, self.tx_df, self.dx_call, self.dx_grid, self.report) str += "\ttransmitting:{}\t decoding:{}\ttx_enabled:{}\ttx_watchdog:{}\tsub_mode:{}\tfast_mode:{}".format(self.transmitting, self.decoding, self.tx_enabled, self.tx_watchdog, self.sub_mode, self.fast_mode) return str class DecodePacket(GenericWSJTXPacket): TYPE_VALUE = 2 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. ps = PacketReader(pkt) the_type = ps.QInt32() self.wsjtx_id = ps.QString() self.new_decode = ps.QInt8() self.millis_since_midnight = ps.QInt32() self.time = PacketUtil.midnight_utc() + datetime.timedelta(milliseconds=self.millis_since_midnight) self.snr = ps.QInt32() self.delta_t = ps.QFloat() self.delta_f = ps.QInt32() self.mode = ps.QString() self.message = ps.QString() self.low_confidence = ps.QInt8() self.off_air = ps.QInt8() def __repr__(self): str = 'DecodePacket: from {}:{}\n\twsjtx id:{}\tmessage:{}\n'.format(self.addr_port[0], self.addr_port[1], self.wsjtx_id, self.message) str += "\tdelta_f:{}\tnew:{}\ttime:{}\tsnr:{}\tdelta_f:{}\tmode:{}".format(self.delta_f, self.new_decode, self.time, self.snr, self.delta_f, self.mode) return str class ClearPacket(GenericWSJTXPacket): TYPE_VALUE = 3 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class ReplyPacket(GenericWSJTXPacket): TYPE_VALUE = 4 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class QSOLoggedPacket(GenericWSJTXPacket): TYPE_VALUE = 5 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class ClosePacket(GenericWSJTXPacket): TYPE_VALUE = 6 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class ReplayPacket(GenericWSJTXPacket): TYPE_VALUE = 7 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class HaltTxPacket(GenericWSJTXPacket): TYPE_VALUE = 8 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class FreeTextPacket(GenericWSJTXPacket): TYPE_VALUE = 9 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. @classmethod def Builder(cls,to_wsjtx_id='WSJT-X', text="", send=False): # build the packet to send pkt = PacketWriter() print('To_wsjtx_id ',to_wsjtx_id,' text ',text, 'send ',send) pkt.write_QInt32(FreeTextPacket.TYPE_VALUE) pkt.write_QString(to_wsjtx_id) pkt.write_QString(text) pkt.write_QInt8(send) return pkt.packet class WSPRDecodePacket(GenericWSJTXPacket): TYPE_VALUE = 10 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. class LocationChangePacket(GenericWSJTXPacket): TYPE_VALUE = 11 def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): GenericWSJTXPacket.__init__(self, addr_port, magic, schema, pkt_type, id, pkt) # handle packet-specific stuff. @classmethod def Builder(cls, to_wsjtx_id='WSJT-X', new_grid=""): # build the packet to send pkt = PacketWriter() pkt.write_QInt32(LocationChangePacket.TYPE_VALUE) pkt.write_QString(to_wsjtx_id) pkt.write_QString(new_grid) return pkt.packet class WSJTXPacketClassFactory(GenericWSJTXPacket): PACKET_TYPE_TO_OBJ_MAP = { HeartBeatPacket.TYPE_VALUE: HeartBeatPacket, StatusPacket.TYPE_VALUE: StatusPacket, DecodePacket.TYPE_VALUE: DecodePacket, ClearPacket.TYPE_VALUE: ClearPacket, ReplyPacket.TYPE_VALUE: ReplyPacket, QSOLoggedPacket.TYPE_VALUE: QSOLoggedPacket, ClosePacket.TYPE_VALUE: ClosePacket, ReplayPacket.TYPE_VALUE: ReplayPacket, HaltTxPacket.TYPE_VALUE: HaltTxPacket, FreeTextPacket.TYPE_VALUE: FreeTextPacket, WSPRDecodePacket.TYPE_VALUE: WSPRDecodePacket } def __init__(self, addr_port, magic, schema, pkt_type, id, pkt): self.addr_port = addr_port self.magic = magic self.schema = schema self.pkt_type = pkt_type self.pkt_id = id self.pkt = pkt def __repr__(self): return 'WSJTXPacketFactory: from {}:{}\n{}' .format(self.addr_port[0], self.addr_port[1], PacketUtil.hexdump(self.pkt)) # Factory-like method @classmethod def from_udp_packet(cls, addr_port, udp_packet): if len(udp_packet) < GenericWSJTXPacket.MINIMUM_NETWORK_MESSAGE_SIZE: return InvalidPacket( addr_port, udp_packet, "Packet too small") if len(udp_packet) > GenericWSJTXPacket.MAXIMUM_NETWORK_MESSAGE_SIZE: return InvalidPacket( addr_port, udp_packet, "Packet too large") (magic, schema, pkt_type, id_len) = struct.unpack('>LLLL', udp_packet[0:16]) if magic != GenericWSJTXPacket.MAGIC_NUMBER: return InvalidPacket( addr_port, udp_packet, "Invalid Magic Value") if schema < GenericWSJTXPacket.MINIMUM_SCHEMA_SUPPORTED or schema > GenericWSJTXPacket.MAXIMUM_SCHEMA_SUPPORTED: return InvalidPacket( addr_port, udp_packet, "Unsupported schema value {}".format(schema)) klass = WSJTXPacketClassFactory.PACKET_TYPE_TO_OBJ_MAP.get(pkt_type) if klass is None: return InvalidPacket( addr_port, udp_packet, "Unknown packet type {}".format(pkt_type)) return klass(addr_port, magic, schema, pkt_type, id, udp_packet)