Hamlib/rigs/prm80/pysimulprm80.py

250 wiersze
8.4 KiB
Python
Executable File

#!/usr/bin/env python3
# This file is part of Hamlib
# (C) 2021 Stephane Fillod
#
# SPDX-License-Identifier: GPL-3
"""
Protocol simulator of PRM80xx running firmware V5.
This script needs a POSIX system to run.
On one terminal, start this script:
$ ./pysimulprm80.py -l /tmp/prm80simul
Slave name /dev/pts/5
Name symlinked /tmp/prm80simul
...
On another terminal, test the prm80 backend:
$ rigctl -vvvvvv -r /tmp/prm80simul -m 28001
"""
# Implementation from
# - https://github.com/f4fez/prm80/blob/master/doc/Computer_commands_V4.md
# - https://github.com/f4fez/prm80/blob/master/doc/Computer_control.md
# - https://sourceforge.net/p/hamlib/discussion/25919/thread/93afa09f52/
# - https://github.com/f4fez/prm80/blob/master/src/inc_ser.a51
import argparse
import time
import os, pty, tty, termios
class Prm80Simul:
def __init__(self, pty_r, pty_w):
self.pty_r = pty_r
self.pty_w = pty_w
self.SetupPseudoSerial(self.pty_r)
self.SetupPseudoSerial(self.pty_w)
self.msg_version = "PRM8060 V5.0 430"
self.mdict = {
'V' : self.tch_Version,
'N' : self.tch_Nmem,
'K' : self.tch_Klock,
'F' : self.tch_Fsquelch,
'O' : self.tch_Ovolume,
'D' : self.tch_Dmode,
'T' : self.tch_Tstate,
'U' : self.tch_UprintRAM,
'P' : self.tch_Peditchannel,
'Q' : self.tch_Qmaxchan,
'C' : self.tch_Chanlist,
'R' : self.tch_Rfreq,
'E' : self.tch_Estate,
'#' : self.tch_diese,
'*' : self.tch_autre,
}
# Some live data simulated inside the rig
self.ChanNum = 0
self.LockByte = 0
self.Squelch = 0
self.Volume = 0x10
self.ModeByte = 0x16
self.ChanState = 0x0c
self.MaxChan = 80
self.RxPLL = 0x7970 # 410 MHz (w/ IF offset)
self.TxPLL = 0x8020 # 410 MHz
def SetupPseudoSerial(self, fd):
""" make it raw, at 4800 bauds (defaults to 8N1) """
tty.setraw(fd)
term_settings = termios.tcgetattr(fd)
term_settings[4] = termios.B4800 # ispeed
term_settings[5] = termios.B4800 # ospeed
termios.tcsetattr(fd, termios.TCSANOW, term_settings)
def pty_write(self, a):
""" helper write+flush """
self.pty_w.write(a)
self.flush_w()
def tch_autre(self):
""" Unknown command """
self.pty_write(b'* ?')
def tch_Version(self):
""" Print firmware version """
self.pty_write(self.msg_version.encode())
def tch_Nmem(self):
""" Set current channel """
self.pty_write(b'Channel : ')
self.ChanNum = int(pty_r.read(2).decode('ascii'))
def tch_Klock(self):
""" Set lock byte """
self.pty_write(b'Lock : ')
self.LockByte = int(pty_r.read(2).decode('ascii'), 16)
def tch_Fsquelch(self):
""" Set squelch. """
self.pty_write(b'Squelch : ')
self.Squelch = int(pty_r.read(2).decode('ascii'))
def tch_Ovolume(self):
""" Set volume. """
self.pty_write(b'Volume : ')
self.Volume = int(pty_r.read(2).decode('ascii'))
def tch_Dmode(self):
""" Set "Mode" byte. """
self.pty_write(b'Mode : ')
self.ModeByte = int(pty_r.read(2).decode('ascii'), 16)
def tch_Tstate(self):
""" Set Channel State. """
self.pty_write(b'Channel state : $')
self.ChanState = int(pty_r.read(2).decode('ascii'))
def tch_UprintRAM(self):
""" Print 80c552 internal RAM. """
self.pty_w.write(b'Display the 256 bytes from internal RAM : \r\n')
# 16 lines of 16 bytes each. The command needs about 3 sec.
# In V4, rssi_hold is at 0x53
self.pty_w.write(b'$00 : 00 00 00 D5 09 00 00 0F 7F 99 D2 7D B2 93 8F C5\r\n')
self.pty_w.write(b'$10 : 60 8B 43 CA 31 43 BB BA F9 27 87 47 06 52 BE 55\r\n')
self.pty_w.write(b'$20 : 81 2F FD 17 01 12 A3 00 13 02 34 96 0B 04 54 7A\r\n')
self.pty_w.write(b'$30 : 13 03 00 30 B4 00 03 00 FC 58 71 00 0F 08 08 82\r\n')
self.pty_w.write(b'$40 : 9E 86 EE 60 1F 1F 00 FB FF FF 44 55 FF 73 5A F5\r\n')
self.pty_w.write(b'$50 : 7E 02 06 72 B7 FF 8D B1 92 79 93 8D 3E 4E 8E 06\r\n')
self.pty_w.write(b'$60 : DB C7 3D 11 AE 5A 4F 3D 80 82 D1 8A 88 6C 7B E5\r\n')
self.pty_w.write(b'$70 : 2F 98 4C 72 5B A4 78 5C 7D 45 46 8C 25 23 BB 82\r\n')
self.pty_w.write(b'$80 : 60 1E 2F 42 28 38 E3 7D 94 BC CA B4 B1 43 AE 84\r\n')
self.pty_w.write(b'$90 : 81 88 75 1A 2D 93 7E E8 CF 79 B6 E0 0F 31 AD AA\r\n')
self.pty_w.write(b'$A0 : 4F 1C 0F 12 55 00 24 13 08 D2 0E C0 00 20 AF 0E\r\n')
self.pty_w.write(b'$B0 : 40 FF 40 00 B8 00 0F 0F B8 0F 10 B8 0F 14 10 FC\r\n')
self.pty_w.write(b'$C0 : 08 E8 D8 3F 33 20 D9 50 8F 1A A2 FF CC D4 2E 3A\r\n')
self.pty_w.write(b'$D0 : 12 22 B7 23 2F 8F 64 47 99 21 6C D8 B4 B0 C3 51\r\n')
self.pty_w.write(b'$E0 : 90 E8 67 5C 18 5E AB 2D BD 71 52 0F 96 6F DA 56\r\n')
self.pty_w.write(b'$F0 : 1D E1 08 42 C2 2D 3B C7 87 23 88 E3 1A 95 F1 EE\r\n')
self.flush_w()
def tch_Qmaxchan(self):
""" Set channels number. """
self.pty_write(b'Channels number (00 to 99) : ')
self.MaxChan = int(pty_r.read(2).decode('ascii'))
# This command gets an extra CRLF ??!
self.pty_write(b'\r\n')
def tch_Peditchannel(self):
""" Edit/Add channel """
# TODO keep edited channel content in the class
self.pty_write(b'Channel to set : ')
chan_num = int(pty_r.read(2).decode('ascii'))
self.pty_write(b'\r\n')
# NB: here prompts end with '$'
self.pty_write(b'PLL value to load : $')
pll_freq_value = int(pty_r.read(4).decode('ascii'), 16)
self.pty_write(b'\r\n')
self.pty_write(b'Channel state : $')
chan_state = int(pty_r.read(2).decode('ascii'), 16)
self.pty_write(b'\r\n')
# for higher channels, allow to test this scenario
if chan_num > self.MaxChan and chan_num != 99:
self.pty_write(b'This channel number doesn\'t exist. Add new channel (Y/N) ? ')
# response "Y/N"
yesno = pty_r.read(1).decode('ascii')
self.pty_write(b'\r\n')
def tch_Estate(self):
""" Show system state (Mode-Chan-Chanstate-Sql-Vol-Lock-RX freq-TX freq-RSSI) (FW V5)"""
#self.pty_write(b'16000C0010007970802018')
rssi=24
self.pty_write('{:02X}{:02X}{:02X}{:02X}{:02X}{:02X}{:04X}{:04X}{:02X}'.format(
self.ModeByte,
self.ChanNum,
self.ChanState,
self.Squelch, # low nibble
self.Volume,
self.LockByte,
self.RxPLL,
self.TxPLL,
rssi
).encode())
def tch_Chanlist(self):
""" Print channels list """
self.pty_write(b'Channels list :\r\n00 : 8464 84\r\n01 : 81B0 00\r\n02 : 8464 0C')
def tch_Rfreq(self):
""" Set synthetiser frequencies """
self.pty_write(b'RX frequency : ')
self.RxPLL = int(pty_r.read(4).decode('ascii'), 16)
self.pty_write(b'\r\nTX frequency : ')
self.TxPLL = int(pty_r.read(4).decode('ascii'), 16)
def tch_diese(self):
""" Ping """
self.pty_write(b'!')
def exec_cmd(self,cmd):
default='*'
self.mdict.get(cmd.upper(),default)() # get() method returns the function matching the argument
self.pty_write(b'\r\n>')
def flush_w(self):
self.pty_w.flush()
if __name__ == '__main__':
master, slave = pty.openpty()
s_name = os.ttyname(slave)
m_name = os.ttyname(master)
pty_r = os.fdopen(master, "rb")
pty_w = os.fdopen(master, "wb")
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--symlink",
help="make a symlink with the opened pty",
type=str,
dest='symlink')
options = parser.parse_args()
print ("Slave name ", s_name)
# Create a symlink
if options.symlink:
if os.access(options.symlink, os.F_OK, follow_symlinks=False):
os.remove(options.symlink)
os.symlink(s_name, options.symlink)
print ("Name symlinked ", options.symlink)
prm80 = Prm80Simul(pty_r, pty_w)
while True:
cmd = pty_r.read(1).decode('ascii')
print ("Received command ["+cmd+"]")
prm80.exec_cmd(cmd)
print ("Restart mainloop")