vesc object Display class

pull/14422/head
Kevin Donnelly 2024-04-11 23:50:23 +12:00
rodzic 6a19ec5c21
commit 03faafb51d
22 zmienionych plików z 1146 dodań i 60 usunięć

Wyświetl plik

@ -0,0 +1,87 @@
from observer import Subject, Observer
class Display(Observer):
_oled = None
def __init__(self, oled, np=0):
self._oled = oled
self._np = np
self.ao = self.analog_observer(self)
self.rpm_pixels = 69
self.cs = self.current_strip(45, 24, np)
def np_current(self, cur):
n = round(cur * 4)
if n > 0:
for i in range(n):
self.cs[i] = (0, 0, 100)
else:
n = n * -1
length = len(self.cs) - 1
for i in range(n):
self.cs[length - i] = (100, 0, 0)
def np_rpm(self, rpm):
n = int(rpm/20)
if n > 0:
for i in range(n):
self._np[i] = (0, 100, 0)
else:
n = n * -1
for i in range(n):
self._np[44 - i] = (100, 0, 0)
def clear(self):
# clear
for i in range(self.rpm_pixels):
self._np[i] = (0, 0, 0)
def __setitem__(self, key, item):
_item = str(item)
if key == "vin" and item:
self._oled.text("vin:" + _item, 0, 0, 1)
elif key == "rpm" and item:
self._oled.text("rpm:" + _item, 0, 12, 1)
self.np_rpm(item)
elif key == "avg_input_current" and item:
self._oled.text("ic:" + _item, 0, 24, 1)
elif key == "avg_motor_current" and item:
self._oled.text("mc:" + _item, 60, 24, 1)
self.np_current(item)
def update(self, subject: Subject) -> None:
self._oled.fill(0)
self.clear()
self["vin"] = subject.response.v_in
self["rpm"] = int(subject.response.rpm/11) # erpm/poles
self["avg_input_current"] = subject.response.avg_input_current
self["avg_motor_current"] = subject.response.avg_motor_current
self._np.write()
# self._oled.text("time: " + str(subject.response.time_ms), 0, 36, 1)
# self._oled.show()
class current_strip:
def __init__(self, offset, n, strip):
self._strip = strip
self._offset = offset
self.n = n
def __setitem__(self, index, value):
self._strip[self._offset + index] = value
def __len__(self):
return self.n
class analog_observer(Observer):
def __init__(self, parent):
self.parent = parent
def update(self, subject: Subject) -> None:
self.parent._oled.text("th:" + str(int(subject._values[0]/1000)) +
" br:" + str(int(subject._values[1]/1000)), 0, 48, 1)

Wyświetl plik

@ -50,6 +50,8 @@ class BLESimplePeripheral(Observer):
self._ble.config(mtu=500)
# self._ble.config(rxbuf=512)
self._ble.irq(self._irq)
self.ao = self.analog_observer(self)
self.vo = self.vesc_observer(self)
((self._handle_tx, self._handle_rx,), (self._handle_i, self._handle_o, self._handle_np, self._handle_iic), ) = self._ble.gatts_register_services((_UART_SERVICE, _VOLTAGE_IO_SERVICE, ))
self._connections = set()
@ -107,13 +109,36 @@ class BLESimplePeripheral(Observer):
def update(self, subject: Subject) -> None:
# (f"ble Observer: My value for pin {subject._pins[idx]} has just changed to: {value}")
data = struct.pack("<ll", subject._values[0], subject._values[1])
for conn_handle in self._connections:
data = struct.pack("<ll", subject._values[0], subject._values[1])
try:
self._ble.gatts_notify(conn_handle, self._handle_i, data)
except OSError:
Log(1, "failed to update with gatts_notify")
class analog_observer(Observer):
def __init__(self, parent):
self.parent = parent
def update(self, subject: Subject) -> None:
data = struct.pack("<ll", subject._values[0], subject._values[1])
for conn_handle in self.parent._connections:
try:
self.parent._ble.gatts_notify(conn_handle, self.parent._handle_i, data)
except OSError:
Log(1, "failed to update with gatts_notify")
class vesc_observer(Observer):
def __init__(self, parent):
self.parent = parent
def update(self, subject: Subject) -> None:
buffer = subject._buffer
self.parent.send_uart(buffer)
def ble_main(hwuart, sa=0, np=0, iic=0):
ble = bluetooth.BLE()

Wyświetl plik

@ -0,0 +1,72 @@
CRC16_XMODEM_TABLE = [
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
]
class CrcXmodem:
@staticmethod
def calc(data):
return __class__.crc16xmodem(data)
@staticmethod
def _crc16(data, crc, table):
"""Calculate CRC16 using the given table.
`data` - data for calculating CRC, must be bytes
`crc` - initial value
`table` - table for caclulating CRC (list of 256 integers)
Return calculated value of CRC
"""
for byte in data:
crc = ((crc<<8)&0xff00) ^ table[((crc>>8)&0xff)^byte]
return crc & 0xffff
@staticmethod
def crc16xmodem(data, crc=0):
"""Calculate CRC-CCITT (XModem) variant of CRC16.
`data` - data for calculating CRC, must be bytes
`crc` - initial value
Return calculated value of CRC
"""
return __class__._crc16(data, crc, CRC16_XMODEM_TABLE)
@staticmethod
def CrcXmodem(self,data, crc=0):
"""Calculate CRC-CCITT (XModem) variant of CRC16.
`data` - data for calculating CRC, must be bytes
`crc` - initial value
Return calculated value of CRC
"""
return __class__._crc16(data, crc, CRC16_XMODEM_TABLE)

Wyświetl plik

@ -9,7 +9,7 @@ def demo(np):
np[j] = (0, 0, 0)
np[i % n] = (255, 255, 255)
np.write()
time.sleep_ms(25)
time.sleep_ms(2)
# bounce
for i in range(4 * n):
@ -20,7 +20,7 @@ def demo(np):
else:
np[n - 1 - (i % n)] = (0, 0, 0)
np.write()
time.sleep_ms(60)
time.sleep_ms(6)
# fade in/out
for i in range(0, 4 * 256, 8):

Wyświetl plik

@ -1,6 +1,26 @@
from abc import ABC, abstractmethod
from random import randrange
class Observer(ABC):
"""
The Observer interface declares the update method, used by subjects.
"""
@abstractmethod
def update(self, subject: Subject) -> None:
"""
Receive update from subject.
"""
pass
"""
Concrete Observers react to the updates issued by the Subject they had been
attached to.
"""
class Subject(ABC):
"""
The Subject interface declares a set of methods for managing subscribers.
@ -81,25 +101,6 @@ class ConcreteSubject(Subject):
self.notify()
class Observer(ABC):
"""
The Observer interface declares the update method, used by subjects.
"""
@abstractmethod
def update(self, subject: Subject) -> None:
"""
Receive update from subject.
"""
pass
"""
Concrete Observers react to the updates issued by the Subject they had been
attached to.
"""
class ConcreteObserverA(Observer):
def update(self, subject: Subject) -> None:
if subject._state < 3:

Wyświetl plik

@ -1,9 +1,10 @@
from observer import Subject,Observer
from observer import Subject, Observer
from machine import Pin, ADC
from random import randrange
from log import Log
class AnalogPins(Subject):
"""
The Subject owns some important state and notifies observers when the state
@ -19,32 +20,28 @@ class AnalogPins(Subject):
"""
_observers: List[Observer] = []
_converters=[]
_pins=[]
_converters = []
_pins = []
"""
List of subscribers. In real life, the list of subscribers can be stored
more comprehensively (categorized by event type, etc.).
"""
def interruption_handler(self,timer):
def interruption_handler(self, timer):
self.update()
def __init__(self, pins):
self._pins=pins
for idx,pin in enumerate(pins):
self._pins = pins
for idx, pin in enumerate(pins):
Log(f"idx={idx} pin={pin}")
self._converters.append(ADC(Pin(pin)))
self._values.append(0)
for converter in self._converters:
converter.atten(converter.ATTN_11DB)
def attach(self, observer: Observer) -> None:
Log("Subject: Attached an observer.")
Log("Analog: Attached an observer.")
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
@ -63,7 +60,7 @@ class AnalogPins(Subject):
for observer in self._observers:
observer.update(self)
def update(self,timer) -> None:
def update(self) -> None:
"""
Usually, the subscription logic is only a fraction of what a Subject can
really do. Subjects commonly hold some important business logic, that
@ -72,8 +69,8 @@ class AnalogPins(Subject):
"""
self._state = randrange(0, 10)
for idx,converter in enumerate(self._converters):
self._values[idx]=converter.read_uv()
for idx, converter in enumerate(self._converters):
self._values[idx] = converter.read_uv()
"""
Log(f"Subject: My state has just changed to: {self._state}")
@ -85,5 +82,5 @@ class AnalogPins(Subject):
class ConcreteAnalogObserver(Observer):
def update(self, subject: Subject) -> None:
for idx,value in enumerate(subject._values):
for idx, value in enumerate(subject._values):
Log(f"Observer: My value for pin {subject._pins[idx]} has just changed to: {value}")

Wyświetl plik

@ -0,0 +1,62 @@
from observer import Subject, Observer
from log import Log
import pyvesc
from pyvesc.VESC.messages import GetValues_mp
class VESC(Subject):
response = None
_uart = None
_get_values_msg = None
m_get_values_pkt = None
_count = 0
_buffer = None
_observers: List[Observer] = []
def __init__(self, uart):
self._uart = uart
self._get_values_msg = GetValues_mp()
self.m_get_values_pkt = pyvesc.encode_request_mp(self._get_values_msg)
Log(self.m_get_values_pkt)
def attach(self, observer: Observer) -> None:
Log("VESC: Attached an observer.")
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
self._observers.remove(observer)
def notify(self) -> None:
"""
Trigger an update in each subscriber.
"""
Log(4,"Subject: Notifying observers...")
for observer in self._observers:
observer.update(self)
def update(self) -> int:
res = 1
if self._count < 1:
self._uart.write(self.m_get_values_pkt)
self._count = 10
# Log("Write usrt")
length = self._uart.any()
# Log(str(len))
if length > 61:
self._buffer = self._uart.read(length)
if len(self._buffer) > 61:
try:
(self.response, consumed) = pyvesc.decode_mp(self._get_values_msg, self._buffer)
except OSError:
Log("failed to decode_mp")
if self.response:
self.notify()
self._count = 0
else:
self._count -= 1
res = 0
return res

Wyświetl plik

@ -0,0 +1 @@
from .VESC import VESC

Wyświetl plik

@ -0,0 +1,126 @@
from pyvesc.protocol.base_mp import VESCMessage
class VedderCmd_mp:
COMM_FW_VERSION = 0
COMM_JUMP_TO_BOOTLOADER = 1
COMM_ERASE_NEW_APP = 2
COMM_WRITE_NEW_APP_DATA = 3
COMM_GET_VALUES = 4
COMM_SET_DUTY = 5
COMM_SET_CURRENT = 6
COMM_SET_CURRENT_BRAKE = 7
COMM_SET_RPM = 8
COMM_SET_POS = 9
COMM_SET_HANDBRAKE = 10
COMM_SET_DETECT = 11
COMM_SET_SERVO_POS = 12
COMM_SET_MCCONF = 13
COMM_GET_MCCONF = 14
COMM_GET_MCCONF_DEFAULT = 15
COMM_SET_APPCONF = 16
COMM_GET_APPCONF = 17
COMM_GET_APPCONF_DEFAULT = 18
COMM_SAMPLE_PRINT = 19
COMM_TERMINAL_CMD = 20
COMM_PRINT = 21
COMM_ROTOR_POSITION = 22
COMM_EXPERIMENT_SAMPLE = 23
COMM_DETECT_MOTOR_PARAM = 24
COMM_DETECT_MOTOR_R_L = 25
COMM_DETECT_MOTOR_FLUX_LINKAGE = 26
COMM_DETECT_ENCODER = 27
COMM_DETECT_HALL_FOC = 28
COMM_REBOOT = 29
COMM_ALIVE = 30
COMM_GET_DECODED_PPM = 31
COMM_GET_DECODED_ADC = 32
COMM_GET_DECODED_CHUK = 33
COMM_FORWARD_CAN = 34
COMM_SET_CHUCK_DATA = 35
COMM_CUSTOM_APP_DATA = 36
COMM_NRF_START_PAIRING = 37
COMM_GPD_SET_FSW = 38
COMM_GPD_BUFFER_NOTIFY = 39
COMM_GPD_BUFFER_SIZE_LEFT = 40
COMM_GPD_FILL_BUFFER = 41
COMM_GPD_OUTPUT_SAMPLE = 42
COMM_GPD_SET_MODE = 43
COMM_GPD_FILL_BUFFER_INT8 = 44
COMM_GPD_FILL_BUFFER_INT16 = 45
COMM_GPD_SET_BUFFER_INT_SCALE = 46
COMM_GET_VALUES_SETUP = 47
COMM_SET_MCCONF_TEMP = 48
COMM_SET_MCCONF_TEMP_SETUP = 49
COMM_GET_VALUES_SELECTIVE = 50
COMM_GET_VALUES_SETUP_SELECTIVE = 51
COMM_EXT_NRF_PRESENT = 52
COMM_EXT_NRF_ESB_SET_CH_ADDR = 53
COMM_EXT_NRF_ESB_SEND_DATA = 54
COMM_EXT_NRF_ESB_RX_DATA = 55
COMM_EXT_NRF_SET_ENABLED = 56
COMM_DETECT_MOTOR_FLUX_LINKAGE_OPENLOOP = 57
COMM_DETECT_APPLY_ALL_FOC = 58
COMM_JUMP_TO_BOOTLOADER_ALL_CAN = 59
COMM_ERASE_NEW_APP_ALL_CAN = 60
COMM_WRITE_NEW_APP_DATA_ALL_CAN = 61
COMM_PING_CAN = 62
COMM_APP_DISABLE_OUTPUT = 63
COMM_TERMINAL_CMD_SYNC = 64
COMM_GET_IMU_DATA = 65
COMM_BM_CONNECT = 66
COMM_BM_ERASE_FLASH_ALL = 67
COMM_BM_WRITE_FLASH = 68
COMM_BM_REBOOT = 69
COMM_BM_DISCONNECT = 70
COMM_BM_MAP_PINS_DEFAULT = 71
COMM_BM_MAP_PINS_NRF5X = 72
COMM_ERASE_BOOTLOADER = 73
COMM_ERASE_BOOTLOADER_ALL_CAN = 74
COMM_PLOT_INIT = 75
COMM_PLOT_DATA = 76
COMM_PLOT_ADD_GRAPH = 77
COMM_PLOT_SET_GRAPH = 78
COMM_GET_DECODED_BALANCE = 79
COMM_BM_MEM_READ = 80
COMM_WRITE_NEW_APP_DATA_LZO = 81
COMM_WRITE_NEW_APP_DATA_ALL_CAN_LZO = 82
COMM_BM_WRITE_FLASH_LZO = 83
COMM_SET_CURRENT_REL = 84
COMM_CAN_FWD_FRAME = 85
COMM_SET_BATTERY_CUT = 86
COMM_SET_BLE_NAME = 87
COMM_SET_BLE_PIN = 88
COMM_SET_CAN_MODE = 89
def GetValues_mp():
""" Gets internal sensor data
"""
id = VedderCmd_mp.COMM_GET_VALUES
fields = [
('temp_fet', 'h', 10),
('temp_motor', 'h', 10),
('avg_motor_current', 'i', 100),
('avg_input_current', 'i', 100),
('avg_id', 'i', 100),
('avg_iq', 'i', 100),
('duty_cycle_now', 'h', 1000),
('rpm', 'i', 1),
('v_in', 'h', 10),
('amp_hours', 'i', 10000),
('amp_hours_charged', 'i', 10000),
('watt_hours', 'i', 10000),
('watt_hours_charged', 'i', 10000),
('tachometer', 'i', 1),
('tachometer_abs', 'i', 1),
('mc_fault_code', 'c', 0),
('pid_pos_now', 'i', 1000000),
('app_controller_id', 'c', 0),
('time_ms', 'i', 1),
]
my_msg=VESCMessage("GetValues",fields,id)
return my_msg

Wyświetl plik

@ -0,0 +1,2 @@
from .Vedder_BLDC_Commands_mp import VedderCmd_mp
from .getters_mp import *

Wyświetl plik

@ -0,0 +1,49 @@
from pyvesc.protocol.base_mp import VESCMessage
from pyvesc.VESC.messages import VedderCmd_mp
def GetVersion_mp():
""" Gets version fields
"""
id = VedderCmd_mp.COMM_FW_VERSION
fields = [
('comm_fw_version', 'b', 0),
('fw_version_major', 'b', 0),
('fw_version_minor', 'b', 0)
]
my_msg = VESCMessage("GetVersion", fields, id)
return my_msg
def GetValues_mp():
""" Gets internal sensor data
"""
id = VedderCmd_mp.COMM_GET_VALUES
fields = [
('temp_fet', 'h', 10),
('temp_motor', 'h', 10),
('avg_motor_current', 'i', 100),
('avg_input_current', 'i', 100),
('avg_id', 'i', 100),
('avg_iq', 'i', 100),
('duty_cycle_now', 'h', 1000),
('rpm', 'i', 1),
('v_in', 'h', 10),
('amp_hours', 'i', 10000),
('amp_hours_charged', 'i', 10000),
('watt_hours', 'i', 10000),
('watt_hours_charged', 'i', 10000),
('tachometer', 'i', 1),
('tachometer_abs', 'i', 1),
('mc_fault_code', 'b', 0),
('pid_pos_now', 'i', 1000000),
('app_controller_id', 'b', 0),
('time_ms', 'i', 1),
]
my_msg = VESCMessage("GetValues", fields, id)
return my_msg

Wyświetl plik

@ -0,0 +1,103 @@
from pyvesc.protocol.base import VESCMessage
from pyvesc.protocol.interface import encode
from pyvesc.VESC.messages import VedderCmd
class SetDutyCycle(metaclass=VESCMessage):
""" Set the duty cycle.
:ivar duty_cycle: Value of duty cycle to be set (range [-1e5, 1e5]).
"""
id = VedderCmd.COMM_SET_DUTY
fields = [
('duty_cycle', 'i', 100000)
]
class SetRPM(metaclass=VESCMessage):
""" Set the RPM.
:ivar rpm: Value to set the RPM to.
"""
id = VedderCmd.COMM_SET_RPM
fields = [
('rpm', 'i')
]
class SetCurrent(metaclass=VESCMessage):
""" Set the current (in milliamps) to the motor.
:ivar current: Value to set the current to (in milliamps).
"""
id = VedderCmd.COMM_SET_CURRENT
fields = [
('current', 'i', 1000)
]
class SetCurrentBrake(metaclass=VESCMessage):
""" Set the current brake (in milliamps).
:ivar current_brake: Value to set the current brake to (in milliamps).
"""
id = VedderCmd.COMM_SET_CURRENT_BRAKE
fields = [
('current_brake', 'i', 1000)
]
class SetPosition(metaclass=VESCMessage):
"""Set the rotor angle based off of an encoder or sensor
:ivar pos: Value to set the current position or angle to.
"""
id = VedderCmd.COMM_SET_POS
fields = [
('pos', 'i', 1000000)
]
class SetRotorPositionMode(metaclass=VESCMessage):
"""Sets the rotor position feedback mode.
It is reccomended to use the defined modes as below:
* DISP_POS_OFF
* DISP_POS_MODE_ENCODER
* DISP_POS_MODE_PID_POS
* DISP_POS_MODE_PID_POS_ERROR
:ivar pos_mode: Value of the mode
"""
DISP_POS_OFF = 0
DISP_POS_MODE_ENCODER = 3
DISP_POS_MODE_PID_POS = 4
DISP_POS_MODE_PID_POS_ERROR = 5
id = VedderCmd.COMM_SET_DETECT
fields = [
('pos_mode', 'b')
]
class SetServoPosition(metaclass=VESCMessage):
"""Sets the position of s servo connected to the VESC.
:ivar servo_pos: Value of position (range [0, 1])
"""
id = VedderCmd.COMM_SET_SERVO_POS
fields = [
('servo_pos', 'h', 1000)
]
class Alive(metaclass=VESCMessage):
"""Heartbeat signal to keep VESC alive"""
id = VedderCmd.COMM_ALIVE
fields = []
# statically save this message because it does not need to be recalculated
alive_msg = encode(Alive())

Wyświetl plik

@ -0,0 +1,2 @@
from pyvesc.protocol import *
from pyvesc.VESC import *

Wyświetl plik

@ -0,0 +1,3 @@
from .interface_mp import *
from .packet import *
from .base_mp import *

Wyświetl plik

@ -0,0 +1,142 @@
import struct
class VESCMessage:
""" Metaclass for VESC messages.
This is the metaclass for any VESC message classes. A VESC message class must then declare 2 static attributes:
id: unsigned integer which is the identification number for messages of this class
fields: list of tuples. tuples are of size 2, first element is the field name, second element is the fields type
the third optional element is a scalar that will be applied to the data upon unpack
format character. For more info on struct format characters see: https://docs.python.org/2/library/struct.html
"""
_msg_registry = {}
_endian_fmt = '!'
_id_fmt = 'B'
_can_id_fmt = 'BB'
_comm_forward_can = 33
_entry_msg_registry = None
def __init__(cls, name,fields,id):
cls.can_id = None
cls.fields=fields
cls.id=id
cls.msg_id = id
msg_id = id
#print("_msg_registry="+str(VESCMessage._msg_registry))
# make sure that message classes are final
# check for duplicate id
# check for duplicate id
if msg_id in VESCMessage._msg_registry:
raise TypeError("ID conflict with %s" % str(VESCMessage._msg_registry[msg_id]))
else:
VESCMessage._msg_registry[msg_id] = cls
# initialize cls static variables
cls._string_field = None
cls._fmt_fields = ''
cls._field_names = []
cls._field_scalars = []
for field, idx in zip(cls.fields, range(0, len(cls.fields))):
cls._field_names.append(field[0])
if len(field) >= 3:
cls._field_scalars.append(field[2])
if field[1] is 's':
# string field, add % so we can vary the length
cls._fmt_fields += '%u'
cls._string_field = idx
cls._fmt_fields += field[1]
print("_fmt_fields="+cls._fmt_fields)
cls._full_msg_size = struct.calcsize(cls._fmt_fields)
# check that at most 1 field is a string
string_field_count = cls._fmt_fields.count('s')
if string_field_count > 1:
raise TypeError("Max number of string fields is 1.")
if 'p' in cls._fmt_fields:
raise TypeError("Field with format character 'p' detected. For string field use 's'.")
#super(VESCMessage, cls).__init__(name, bases, clsdict)
def __call__(cls, *args, **kwargs):
#instance = super(VESCMessage, cls)
if args:
if len(args) != len(cls.fields):
raise AttributeError("Expected %u arguments, received %u" % (len(cls.fields), len(args)))
for name, value in zip(cls._field_names, args):
setattr(cls, name, value)
return cls
@staticmethod
def msg_type(id):
#print("VESCMessage._msg_registry[id]="+str(VESCMessage._msg_registry))
return VESCMessage._msg_registry[id]
@staticmethod
def unpack(msg_bytes):
msg_id = struct.unpack_from(VESCMessage._endian_fmt + VESCMessage._id_fmt, msg_bytes, 0)
msg_type = VESCMessage.msg_type(*msg_id)
data = None
if not (msg_type._string_field is None):
# string field
fmt_wo_string = msg_type._fmt_fields.replace('%u', '')
fmt_wo_string = fmt_wo_string.replace('s', '')
len_string = len(msg_bytes) - struct.calcsize(VESCMessage._endian_fmt + fmt_wo_string) - 1
fmt_w_string = msg_type._fmt_fields % (len_string)
data = struct.unpack_from(VESCMessage._endian_fmt + fmt_w_string, msg_bytes, 1)
else:
data = list(struct.unpack_from(VESCMessage._endian_fmt + msg_type._fmt_fields, msg_bytes, 1))
for k, field in enumerate(data):
try:
if msg_type._field_scalars[k] != 0:
data[k] = data[k]/msg_type._field_scalars[k]
except (TypeError, IndexError) as e:
print("Error ecountered on field " + msg_type.fields[k][0])
print(e)
msg = msg_type(*data)
if not (msg_type._string_field is None):
string_field_name = msg_type._field_names[msg_type._string_field]
setattr(msg,
string_field_name,
getattr(msg, string_field_name).decode('ascii'))
return msg
@staticmethod
def pack(instance, header_only=None):
if header_only:
if instance.can_id is not None:
fmt = VESCMessage._endian_fmt + VESCMessage._can_id_fmt + VESCMessage._id_fmt
values = (VESCMessage._comm_forward_can, instance.can_id, instance.id)
else:
fmt = VESCMessage._endian_fmt + VESCMessage._id_fmt
values = (instance.id,)
return struct.pack(fmt, *values)
field_values = []
if not instance._field_scalars:
for field_name in instance._field_names:
field_values.append(getattr(instance, field_name))
else:
for field_name, field_scalar in zip(instance._field_names, instance._field_scalars):
field_values.append(int(getattr(instance, field_name) * field_scalar))
if not (instance._string_field is None):
# string field
string_field_name = instance._field_names[instance._string_field]
string_length = len(getattr(instance, string_field_name))
field_values[instance._string_field] = field_values[instance._string_field].encode('ascii')
values = ((instance.id,) + tuple(field_values))
if instance.can_id is not None:
fmt = VESCMessage._endian_fmt + VESCMessage._can_id_fmt + VESCMessage._id_fmt\
+ (instance._fmt_fields % (string_length))
values = (VESCMessage._comm_forward_can, instance.can_id) + values
else:
fmt = VESCMessage._endian_fmt + VESCMessage._id_fmt + (instance._fmt_fields % (string_length))
return struct.pack(fmt, *values)
else:
values = ((instance.id,) + tuple(field_values))
if instance.can_id is not None:
fmt = VESCMessage._endian_fmt + VESCMessage._can_id_fmt + VESCMessage._id_fmt + instance._fmt_fields
values = (VESCMessage._comm_forward_can, instance.can_id) + values
else:
fmt = VESCMessage._endian_fmt + VESCMessage._id_fmt + instance._fmt_fields
return struct.pack(fmt, *values)

Wyświetl plik

@ -0,0 +1,55 @@
import pyvesc.protocol.base_mp
import pyvesc.protocol.packet.codec
def decode_mp(my_msg, buffer):
"""
Decodes the next valid VESC message in a buffer.
:param buffer: The buffer to attempt to parse from.
:type buffer: bytes
:return: PyVESC message, number of bytes consumed in the buffer. If nothing
was parsed returns (None, 0).
:rtype: `tuple`: (PyVESC message, int)
"""
msg_payload, consumed = pyvesc.protocol.packet.codec.unframe(buffer)
if msg_payload:
#return pyvesc.protocol.base_mp.VESCMessage.unpack(msg_payload), consumed
return my_msg.unpack(msg_payload), consumed
else:
return None, consumed
def encode_mp(msg):
"""
Encodes a PyVESC message to a packet. This packet is a valid VESC packet and
can be sent to a VESC via your serial port.
:param msg: Message to be encoded. All fields must be initialized.
:type msg: PyVESC message
:return: The packet.
:rtype: bytes
"""
msg_payload = pyvesc.protocol.base_mp.VESCMessage.pack(msg)
packet = pyvesc.protocol.packet.codec.frame(msg_payload)
return packet
def encode_request_mp(msg_cls):
"""
Encodes a PyVESC message for requesting a getter message. This function
should be called when you want to request a VESC to return a getter
message.
:param msg_cls: The message type which you are requesting.
:type msg_cls: pyvesc.messages.getters.[requested getter]
:return: The encoded PyVESC message which can be sent.
:rtype: bytes
"""
msg_payload = pyvesc.protocol.base_mp.VESCMessage.pack(msg_cls, header_only=True)
packet = pyvesc.protocol.packet.codec.frame(msg_payload)
return packet

Wyświetl plik

@ -0,0 +1,3 @@
from .structure_mp import *
from .codec import *
from .exceptions import *

Wyświetl plik

@ -0,0 +1,234 @@
from .exceptions import *
from .structure_mp import *
from crccheck.crc import CrcXmodem
crc_checker = CrcXmodem()
class UnpackerBase(object):
"""
Helper methods for both stateless and stated unpacking.
"""
@staticmethod
def _unpack_header(buffer):
"""
Attempt to unpack a header from the buffer.
:param buffer: buffer object.
:return: Header object if successful, None otherwise.
"""
if len(buffer) == 0:
return None
fmt = Header.fmt(buffer[0])
if len(buffer) >= struct.calcsize(fmt):
try:
header = Header.parse(buffer)
return header
except struct.error:
raise CorruptPacket("Unable to parse header: %s" % buffer)
else:
return None
@staticmethod
def _unpack_footer(buffer, header):
"""
Unpack the footer. Parse must be valid.
:param buffer: buffer object.
:param header: Header object for current packet.
:return: Footer object.
"""
try:
footer = Footer.parse(buffer, header)
return footer
except struct.error:
raise CorruptPacket("Unable to parse footer: %s" % buffer)
@staticmethod
def _next_possible_packet_index(buffer):
"""
Tries to find the next possible start byte of a packet in a buffer. Typically called after a corruption has been
detected.
:param buffer: buffer object.
:return: Index of next valid start byte. Returns -1 if no valid start bytes are found.
"""
if len(buffer) < 2: # too short to find next
return -1
next_short_sb = buffer[1:].find(b'\x02')
next_long_sb= buffer[1:].find(b'\x03')
possible_index = []
if next_short_sb >= 0: # exclude index zero's as we know the current first packet is corrupt
possible_index.append(next_short_sb + 1) # +1 because we want found from second byte
if next_long_sb >= 0:
possible_index.append(next_long_sb + 1)
if possible_index == []:
return -1
else:
return min(possible_index)
@staticmethod
def _consume_after_corruption_detected(buffer):
"""
Gives the number of bytes in the buffer to consume after a corrupt packet was detected.
:param buffer: buffer object
:return: Number of bytes to consume in the buffer.
"""
next_index = UnpackerBase._next_possible_packet_index(buffer)
if next_index == -1: # no valid start byte was found
return len(buffer) # consume entire buffer
else:
return next_index # consume up to next index
@staticmethod
def _packet_size(header):
return struct.calcsize(Header.fmt(header.payload_index)) + header.payload_length + struct.calcsize(Footer.fmt())
@staticmethod
def _packet_parsable(buffer, header):
"""
Checks if an entire packet is parsable.
:param buffer: buffer object
:param header: Header object
:return: True if the current packet is parsable, False otherwise.
"""
frame_size = UnpackerBase._packet_size(header)
return len(buffer) >= frame_size
@staticmethod
def _unpack_payload(buffer, header):
"""
Unpacks the payload of the packet.
:param buffer: buffer object
:param header: Header object
:return: byte string of the payload
"""
footer_index = header.payload_index + header.payload_length
return bytes(buffer[header.payload_index:footer_index])
@staticmethod
def _validate_payload(payload, footer):
"""
Validates the payload using the footer. CorruptPacket is raised if the payload is corrupt or the terminator is
not correct.
:param payload: byte string
:param footer: Footer object
:return: void
"""
crc_checker.calc(payload)
if crc_checker.calc(payload) != footer.crc:
raise CorruptPacket("Invalid checksum value.")
if footer.terminator is not Footer.TERMINATOR:
raise CorruptPacket("Invalid terminator: %u" % footer.terminator)
return
@staticmethod
def _unpack(buffer, header, errors, recovery_mode=False):
"""
Attempt to parse a packet from the buffer.
:param buffer: buffer object
:param errors: specifies error handling scheme. see codec error handling schemes
:return: (1) Packet if parse was successful, None otherwise, (2) Length consumed of buffer
"""
while True:
try:
# if we were not given a header then try to parse one
if header is None:
header = UnpackerBase._unpack_header(buffer)
if header is None:
# buffer is too short to parse a header
if recovery_mode:
return Stateless._recovery_recurse(buffer, header, errors, False)
else:
return None, 0
# check if a packet is parsable
if UnpackerBase._packet_parsable(buffer, header) is False:
# buffer is too short to parse the rest of the packet
if recovery_mode:
return Stateless._recovery_recurse(buffer, header, errors, False)
else:
return None, 0
# parse the packet
payload = UnpackerBase._unpack_payload(buffer, header)
footer = UnpackerBase._unpack_footer(buffer, header)
# validate the payload
UnpackerBase._validate_payload(payload, footer)
# clean header as we wont need it again
consumed = UnpackerBase._packet_size(header)
header = None
return payload, consumed
except CorruptPacket as corrupt_packet:
if errors is 'ignore':
# find the next possible start byte in the buffer
return Stateless._recovery_recurse(buffer, header, errors, True)
elif errors is 'strict':
raise corrupt_packet
@staticmethod
def _recovery_recurse(buffer, header, errors, consume_on_not_recovered):
header = None # clean header
next_sb = UnpackerBase._next_possible_packet_index(buffer)
if next_sb == -1: # no valid start byte in buffer. consume entire buffer
if consume_on_not_recovered:
return None, len(buffer)
else:
return None, 0
else:
payload, consumed = UnpackerBase._unpack(buffer[next_sb:], header, errors, True)
if payload is None:
# failed to recover
if consume_on_not_recovered:
return payload, consumed + next_sb
else:
return payload, consumed
else:
# recovery was successful
return payload, consumed + next_sb
class PackerBase(object):
"""
Packing is the same for stated and stateless. Therefore its implemented in this base class.
"""
@staticmethod
def _pack(payload):
"""
Packs a payload.
:param payload: byte string of payload
:return: byte string of packed packet
"""
if payload == b'':
raise InvalidPayload("Empty payload")
# get header/footer tuples
header = Header.generate(payload)
footer = Footer.generate(payload)
# serialize tuples
header = struct.pack(Header.fmt(header.payload_index), *header)
footer = struct.pack(Footer.fmt(), *footer)
return header + payload + footer
class Stateless(UnpackerBase, PackerBase):
"""
Statelessly pack and unpack VESC packets.
"""
@staticmethod
def unpack(buffer, errors='ignore'):
"""
Attempt to parse a packet from the buffer.
:param buffer: buffer object
:param errors: specifies error handling scheme. see codec error handling schemes
:return: (1) Packet if parse was successful, None otherwise, (2) Length consumed of buffer
"""
return Stateless._unpack(buffer, None, errors)
@staticmethod
def pack(payload):
"""
See PackerBase.pack
"""
return Stateless._pack(payload)
def frame(bytestring):
return Stateless.pack(bytestring)
def unframe(buffer, errors='ignore'):
return Stateless.unpack(buffer, errors)

Wyświetl plik

@ -0,0 +1,6 @@
class CorruptPacket(ValueError):
pass
class InvalidPayload(ValueError):
pass

Wyświetl plik

@ -0,0 +1,85 @@
import collections
import struct
from pyvesc.protocol.packet.exceptions import *
from crccheck.crc import CrcXmodem
crc_checker = CrcXmodem()
class Header(collections.namedtuple('Header', ['payload_index', 'payload_length'])):
"""
Tuple to help with packing and unpacking the header of a VESC packet.
"""
@staticmethod
def generate(payload):
"""
Creates a Header for the given payload.
:param payload: byte string representation of payload.
:return: Header object.
"""
payload_length = len(payload)
if payload_length < 256:
payload_index = 0x2
elif payload_length < 65536:
payload_index = 0x3
else:
raise InvalidPayload("Invalid payload size. Payload must be less than 65536 bytes.")
return Header(payload_index, payload_length)
@staticmethod
def parse(buffer):
"""
Creates a Header by parsing the given buffer.
:param buffer: buffer object.
:return: Header object.
"""
A = struct.unpack_from(Header.fmt(buffer[0]), buffer, 0)
return Header(*A)
#return Header._makeA(struct.unpack_from(Header.fmt(buffer[0]), buffer, 0))
@staticmethod
def fmt(start_byte):
"""
Format characters of the header packet.
:param start_byte: The first byte in the buffer.
:return: The character format of the packet header.
"""
if start_byte is 0x2:
return '>BB'
elif start_byte is 0x3:
return '>BH'
else:
raise CorruptPacket("Invalid start byte: %u" % start_byte)
class Footer(collections.namedtuple('Footer', ['crc', 'terminator'])):
"""
Footer of a VESC packet.
"""
TERMINATOR = 0x3 # Terminator character
@staticmethod
def parse(buffer, header):
A=struct.unpack_from(Footer.fmt(), buffer, header.payload_index + header.payload_length)
return Footer(*A)
#return Footer._makeA(struct.unpack_from(Footer.fmt(), buffer, header.payload_index + header.payload_length))
@staticmethod
def generate(payload):
crc = crc_checker.calc(payload)
terminator = Footer.TERMINATOR
return Footer(crc, terminator)
@staticmethod
def fmt():
"""
Format of the footer.
:return: Character format of the footer.
"""
return '>HB'

Wyświetl plik

@ -4,6 +4,7 @@ from machine import UART,WDT,Pin,I2C,PWM
import neopixel
import time
from observer_analog import AnalogPins
from observer_vesc import VESC
import ble_simple_peripheral
import neopixel_demo
from log import Log
@ -11,6 +12,10 @@ import micropython_dotstar as dotstar
from machine import Pin, SPI
from lcd_i2c import LCD
from ssd1306 import SSD1306_I2C
from pyvesc.VESC.messages import GetValues_mp
import pyvesc
from observer import Subject, Observer
from Display import Display
I2C_ADDR = 0x27
NUM_ROWS = 2
@ -23,29 +28,31 @@ devices = i2c.scan()
if len(devices) == 0:
Log("No i2c device !")
else:
Log('i2c devices found:',len(devices))
Log('i2c devices found:', len(devices))
for device in devices:
Log("At address: ",hex(device))
Log("At address: ", hex(device))
if device == 0x27:
lcd = LCD(addr=I2C_ADDR, cols=NUM_COLS, rows=NUM_ROWS, i2c=i2c)
lcd.begin()
lcd.print("started lcd")
elif device == 0x3c:
oled = SSD1306_I2C(128,64,i2c)
oled = SSD1306_I2C(128, 64, i2c)
#oled.text("oled started", 0, 0, 1)
#oled.show()
_wdt = None
#_wdt = WDT(timeout=2000) # enable it with a timeout of 2s
_uart = UART(1, 115200, tx=21, rx=20) # init with given baudrate
_sa =AnalogPins([3, 2])
_sa = AnalogPins([3, 2])
_np = neopixel.NeoPixel(Pin(10), 8)
_np = neopixel.NeoPixel(Pin(10), 69)
'''''''''''
p4 = Pin(4, Pin.IN, Pin.PULL_UP)
def handle_pin_interrupt(pin):
print("brake change " + str(pin.value()))
if pin.value():
@ -55,33 +62,58 @@ def handle_pin_interrupt(pin):
# pwm5.duty(512)
p5.value(1)
p4.irq(trigger=3, handler=handle_pin_interrupt)
p5 = Pin(5, mode= Pin.OPEN_DRAIN, pull= None)
p5 = Pin(5, mode=Pin.OPEN_DRAIN, pull=None)
'''
# pwm5 = PWM(p5,freq=50000,duty=10)
neopixel_demo.demo(_np)
_vs = VESC(_uart)
if oled:
oled.text("oled started", 0, 0, 1)
oled.show()
d = Display(oled, _np)
_vs.attach(d)
_sa.attach(d.ao)
_wdt = WDT(timeout=4000) # enable it with a timeout of 2s
count = 0
while True:
if count > 100:
oled.fill(0)
_sa.update()
oled.show()
if _wdt:
_wdt.feed()
if _vs.update() > 0:
_sa.update()
oled.show()
count = 0
else:
count += 1
time.sleep(.1)
p = ble_simple_peripheral.ble_main(_uart, _sa, _np, i2c)
if p:
while True:
if p.is_connected():
# Short burst of queued notifications.
len = _uart.any()
if len > 0:
ar = _uart.read(len)
p.send_uart(ar)
else:
time.sleep_ms(100)
if _sa:
_sa.update(p)
else:
time.sleep(1)
while True:
if p.is_connected():
# Short burst of queued notifications.
len = _uart.any()
if len > 0:
ar = _uart.read(len)
p.send_uart(ar)
else:
time.sleep_ms(100)
if _sa:
_sa.update()
else:
time.sleep(1)

Wyświetl plik

@ -1,4 +1,3 @@
from .interface import *
from .interface_mp import *
from .packet import *
from .base import *