refactor
Mikhail Yudin 2023-06-10 23:49:58 +07:00
rodzic e8b2cbfc40
commit d2a03f3b80
6 zmienionych plików z 93 dodań i 89 usunięć

Wyświetl plik

@ -4,7 +4,7 @@ import os
from pathlib import Path
from sys import argv
from lib.encdec import eprint, encrypt, decrypt
from lib.uvk5 import eprint, encrypt, decrypt
def usage(info = None):

Wyświetl plik

@ -1,46 +0,0 @@
from binascii import crc_hqx as crc16
from itertools import cycle
from sys import stderr
from pathlib import Path
# Structure of pre-encoded payload
# 8196 | 16 | ... | 2 |
# data | version | data | crc |
LIB_DIR = Path(__file__).parent
DATA_DIR = LIB_DIR / '..' / 'data'
KEY = (DATA_DIR / 'key.bin').read_bytes()
COMM_KEY = (DATA_DIR / 'key.bin').read_bytes()
V_START = 8192
V_END = V_START + 16
CRC_LEN = 2
def eprint(*args, **kwargs):
print(*args, **kwargs, file=stderr)
def xor(var):
return bytes(a ^ b for a, b in zip(var, cycle(KEY)))
def xor_comm(var):
return bytes(a ^ b for a, b in zip(var, cycle(COMM_KEY)))
def make_16byte_version(version):
return bytes([ord(c) for c in version] + [0] * (16 - len(version)))
def decrypt(data):
decrypted = xor(data)
version = decrypted[V_START:V_END].decode().rstrip('\x00')
return (decrypted[:V_START] + decrypted[V_END:-CRC_LEN], version)
def encrypt(data, version='2.01.26'):
v = make_16byte_version(version)
encrypted = xor(data[:V_START] + v + data[V_START:])
checksum = crc16(encrypted, 0).to_bytes(2, 'little')
return encrypted + checksum

Wyświetl plik

@ -5,7 +5,7 @@ from pathlib import Path
from sys import argv
from configparser import ConfigParser
from lib.encdec import decrypt, encrypt, eprint
from uvk5 import decrypt, encrypt, eprint
ADDR_DIR = Path(__file__).parent / 'addresses'
MODS_DIR = Path(__file__).parent / 'mods'

132
uvk5.py
Wyświetl plik

@ -1,31 +1,51 @@
#!/usr/bin/env python3
from binascii import crc_hqx
from sys import argv
from itertools import cycle
from sys import stderr, argv
from pathlib import Path
from time import time
from io import StringIO
from serial import Serial
from lib.encdec import eprint, xor_comm
DATA_DIR = Path(__file__).parent / 'data'
BLOCK_SIZE = 0x80
KEY_FW = (DATA_DIR / 'key-fw.bin').read_bytes()
KEY_COMM = (DATA_DIR / 'key-comm.bin').read_bytes()
PREAMBLE = b'\xab\xcd'
POSTAMBLE = b'\xdc\xba'
V_START = 8192
V_END = V_START + 16
CRC_LEN = 2
CMD_VERSION_REQ = 0x0514
CMD_VERSION_RES = 0x0515
CMD_SETTINGS_REQ = 0x051B
CMD_SETTINGS_RES = 0x051C
def chunk(data, n):
for i in range(0,len(data), n):
yield data[i:i+n]
CMD_SETTINGS_WRITE_REQ = 0x051D # then addr (0x0E70) then size (0x0160) then data
TIMESTAMP = int(time()).to_bytes(4, 'little')
def eprint(*args, **kwargs):
print(*args, **kwargs, file=stderr)
def xor(var):
return bytes(a ^ b for a, b in zip(var, cycle(KEY_FW)))
def xor_comm(var):
return bytes(a ^ b for a, b in zip(var, cycle(KEY_COMM)))
def make_16byte_version(version):
return bytes([ord(c) for c in version] + [0] * (16 - len(version)))
def i2b16(cmd_id):
return cmd_id.to_bytes(2,'little')
def i2b16(val):
return int(val).to_bytes(2,'little')
def i2b32(val):
return int(val).to_bytes(4,'little')
def b2i(data):
@ -39,34 +59,48 @@ def len16(data):
def crc16(data):
return i2b16(crc_hqx(data, 0))
def chunk(data, n):
for i in range(0,len(data), n):
yield data[i:i+n]
def decrypt(data):
decrypted = xor(data)
version = decrypted[V_START:V_END].decode().rstrip('\x00')
return (decrypted[:V_START] + decrypted[V_END:-CRC_LEN], version)
def cmd_make_req(cmd_id, body=b''):
data = body + TIMESTAMP
payload = i2b16(cmd_id) + len16(data) + data
encoded_payload = xor_comm(payload + crc16(payload))
return PREAMBLE + len16(payload) + encoded_payload + POSTAMBLE
def encrypt(data, version='2.01.26'):
v = make_16byte_version(version)
encrypted = xor(data[:V_START] + v + data[V_START:])
checksum = crc16(encrypted)
return encrypted + checksum
class UVK5(Serial):
BLOCK_SIZE = 0x80
PREAMBLE = b'\xab\xcd'
POSTAMBLE = b'\xdc\xba'
CMD_VERSION_REQ = 0x0514
CMD_VERSION_RES = 0x0515
CMD_SETTINGS_REQ = 0x051B
CMD_SETTINGS_RES = 0x051C
CMD_SETTINGS_WRITE_REQ = 0x051D # then addr (0x0E70) then size (0x0160) then data
def __init__(self, port: str | None = None) -> None:
self.timestamp = i2b32(time())
super().__init__(port, 38400, timeout=5)
def get_version(self):
return self.cmd(CMD_VERSION_REQ)[1][:10].decode().rstrip('\x00')
return self.cmd(UVK5.CMD_VERSION_REQ)[1][:10].decode().rstrip('\x00')
def read_mem(self, offset, size):
return self.cmd(CMD_SETTINGS_REQ, i2b16(offset) + i2b16(size))
return self.cmd(UVK5.CMD_SETTINGS_REQ, i2b16(offset) + i2b16(size))
def cmd(self, id, body = b''):
self.write(cmd_make_req(id, body))
self.write(self._cmd_make_req(id, body))
preamble = self.read(2)
if preamble != PREAMBLE:
if preamble != UVK5.PREAMBLE:
raise ValueError('Bad response (PRE)', preamble)
payload_len = b2i(self.read(2)) + 2 # CRC len
@ -75,7 +109,7 @@ class UVK5(Serial):
# crc = payload[-2:]
postamble = self.read(2)
if postamble != POSTAMBLE:
if postamble != UVK5.POSTAMBLE:
raise ValueError('Bad response (POST)')
# print(data.hex())
@ -85,6 +119,8 @@ class UVK5(Serial):
return (cmd_id, data)
def version(self):
self.get_version()
def channels(self):
names = []
@ -94,33 +130,47 @@ class UVK5(Serial):
names_offset = 0x0F50
settings_offset = 0x0000
passes = data_size//BLOCK_SIZE
passes = data_size//UVK5.BLOCK_SIZE
out = StringIO()
for block in range(passes):
offset = names_offset + block*BLOCK_SIZE
names_set = self.read_mem(offset, BLOCK_SIZE)[1][4:]
offset = names_offset + block*UVK5.BLOCK_SIZE
names_set = self.read_mem(offset, UVK5.BLOCK_SIZE)[1][4:]
names += [name.decode(errors='ignore').rstrip('\x00') for name in chunk(names_set, 16)]
for block in range(passes):
offset = settings_offset + block*BLOCK_SIZE
settings_set = self.read_mem(offset, BLOCK_SIZE)[1][4:]
offset = settings_offset + block*UVK5.BLOCK_SIZE
settings_set = self.read_mem(offset, UVK5.BLOCK_SIZE)[1][4:]
settings += [(b2i(setting[:4])/100000.0, ) for setting in chunk(settings_set, 16)]
for i, name in enumerate(names):
if name:
print(f'{i+1:0>3}. {name: <16} {settings[i][0]:0<8} M')
print(f'{i+1:0>3}. {name: <16} {settings[i][0]:0<8} M', file=out)
else:
print(f'{i+1:0>3}. -')
print(f'{i+1:0>3}. -', file=out)
def main(port, cmd, args):
with UVK5(port) as s:
print('FW version:', s.get_version())
getattr(s, cmd)(*args)
return out.getvalue()
def _cmd_make_req(self, cmd_id, body=b''):
data = body + self.timestamp
payload = i2b16(cmd_id) + len16(data) + data
encoded_payload = xor_comm(payload + crc16(payload))
return UVK5.PREAMBLE + len16(payload) + encoded_payload + UVK5.POSTAMBLE
if __name__ == '__main__':
if len(argv) < 3:
eprint(f'Usage: {argv[0]} <port> <command> [args]')
eprint(f'Usage: {argv[0]} <port> <command:(channels|version)> [args]')
exit(255)
main(argv[1], argv[2], argv[3:])
port = argv[1]
cmd = argv[2]
args = argv[3:]
with UVK5(port) as s:
s.get_version()
print(getattr(s, cmd)(*args))