kopia lustrzana https://github.com/GuyCarver/MicroPython
Adding TTGO files. xv11 lidar driver as well
rodzic
742fe88d57
commit
58edcfdc05
|
@ -0,0 +1,18 @@
|
|||
# This file is executed on every boot (including wake-boot from deepsleep)
|
||||
import sys, network, utime, display
|
||||
sys.path[1] = '/flash/lib'
|
||||
|
||||
wifi = network.WLAN(network.STA_IF)
|
||||
wifi.active(True)
|
||||
wifi.connect('Carvers', 'gruntbuggly')
|
||||
utime.sleep_ms(4000)
|
||||
network.telnet.start(user='g', password='g')
|
||||
|
||||
#Initialize display in landscape mode at maximum resolution
|
||||
tft = display.TFT()
|
||||
tft.init(tft.ST7789, bgr=False, rot=tft.LANDSCAPE, miso=17, backl_pin=4, backl_on=1, mosi=19, clk=18, cs=5, dc=16)
|
||||
tft.setwin(40, 52, 320, 240)
|
||||
|
||||
# import lidar
|
||||
# lidar.run(tft)
|
||||
|
|
@ -0,0 +1,212 @@
|
|||
|
||||
from machine import UART, Pin, PWM
|
||||
from time import sleep
|
||||
|
||||
_FRAMESIZE = const(22) # See frame data below
|
||||
_FRAMESTART = const(0xFA) # Each frame starts with this tag
|
||||
_FRAMEINDEX = const(0xA0) # Indexes are 0xA0 + 0-90
|
||||
_ANGLESPERFRAME = const(4)
|
||||
_FRAMESPERROT = const(90)
|
||||
_ANGLESPERROT = const(_FRAMESPERROT * _ANGLESPERFRAME) # We get 90 frames with 4 angles per frame
|
||||
_FRAMESPERREAD = const(90) # Up this to read more data per attempt
|
||||
_INVALID = const(1 << 15) # Bit set in destance word of angle data indicating invalid data
|
||||
_BUFFERSIZE = const(_FRAMESIZE * _FRAMESPERREAD)
|
||||
_MOTORSPEED = const(30)
|
||||
|
||||
#angle data.
|
||||
# distance = 14 bits - Distance or error code (invalid flag set)
|
||||
# strength = 1 bit - Flag indicating signal strength was lower than expected.
|
||||
# invalid = 1 bit - Set when distance couldn't be calculated
|
||||
# uint16_t signal strength
|
||||
|
||||
#frame data
|
||||
# uint8_t start = 0xFA
|
||||
# uint8_t index = index - 0xA0 * 4 is the angle for the readings array
|
||||
# uint16_t speed = divide by 64 to get speed in rpm
|
||||
# 16 bytes - angledata[4] = angle data for 4 consecutive angles
|
||||
# uint16_t checksum
|
||||
# Total size = 22 bytes
|
||||
|
||||
#invalid data codes.
|
||||
# XV11LIDAR_CRC_FAILURE = 0x66 the frame had incorrect CRC, don't use the data
|
||||
# XV11LIDAR_ERROR1 = 0x02
|
||||
# XV11LIDAR_ERROR2 = 0x03
|
||||
# XV11LIDAR_ERROR3 = 0x21
|
||||
# XV11LIDAR_ERROR4 = 0x25
|
||||
# XV11LIDAR_ERROR5 = 0x35
|
||||
# XV11LIDAR_ERROR6 = 0x50
|
||||
|
||||
#--------------------------------------------------------
|
||||
# def printit( aBuffer ):
|
||||
# for v in aBuffer:
|
||||
# print(hex(v), end=',')
|
||||
#
|
||||
# print('')
|
||||
|
||||
#--------------------------------------------------------
|
||||
class lidar(object):
|
||||
''' xv11 lidar driver. Reads data into an angles buffer as well as keep track of rpm.
|
||||
Attempts to maintain ~245 rpm. '''
|
||||
|
||||
def __init__( self ):
|
||||
super(lidar, self).__init__()
|
||||
|
||||
self._uart = UART(1, tx = 2, rx = 15, baudrate = 115200, buffer_size = 8192)
|
||||
self._motor = PWM(Pin(17), freq = 100)
|
||||
self._speed = _MOTORSPEED
|
||||
self._motor.duty(self._speed)
|
||||
|
||||
self._rpm = 0
|
||||
self._buffer = bytearray(_BUFFERSIZE)
|
||||
self._inbuffer = bytearray(_BUFFERSIZE)
|
||||
self._angles = [0] * _ANGLESPERROT # Storage for angles
|
||||
self._insync = False
|
||||
|
||||
#--------------------------------------------------------
|
||||
@property
|
||||
def rpm( self ): return self._rpm
|
||||
|
||||
#--------------------------------------------------------
|
||||
@property
|
||||
def insync( self ): return self._insync
|
||||
|
||||
#--------------------------------------------------------
|
||||
def sync( self ):
|
||||
''' sync up the serial data read with the start of frame. '''
|
||||
# print('syncing')
|
||||
|
||||
#Function to return true if the start tag is found at the desired location
|
||||
def isstart( aOffset ):
|
||||
return self._buffer[aOffset] == _FRAMESTART
|
||||
|
||||
while not self._insync:
|
||||
self._fillbuffer()
|
||||
offset = -1
|
||||
#Loop for up to 4 frames worth of data. If we didn't find a tag by then we probably won't
|
||||
for x in range(_FRAMESIZE * 4):
|
||||
#If start tag is found in 3 consecutive frames then this is probably a real frame start
|
||||
if isstart(x) and isstart(x + _FRAMESIZE) and isstart(x + (_FRAMESIZE * 2)):
|
||||
offset = x
|
||||
break
|
||||
|
||||
#If we found an start tag then let's try and use it
|
||||
if offset != -1:
|
||||
#If the tag is at the beginning of the buffer, don't need to move data
|
||||
if offset > 0:
|
||||
tomove = len(self._buffer) - offset
|
||||
#Move data so frame start is at the beginning
|
||||
for x in range(tomove):
|
||||
self._buffer[x] = self._buffer[offset + x]
|
||||
# printit(self._buffer[:_FRAMESIZE])
|
||||
toread = offset
|
||||
#Now read in data to fill in the rest of the buffer
|
||||
while toread:
|
||||
cnt = self._uart.readinto(self._inbuffer, toread)
|
||||
for y in range(cnt):
|
||||
self._buffer[offset + y] = self._inbuffer[0]
|
||||
offset += cnt
|
||||
toread -= cnt
|
||||
|
||||
#Process the buffer data
|
||||
g = self.process()
|
||||
#If we got enough good values then we are synced
|
||||
if g > 100:
|
||||
print('sync good')
|
||||
self._insync = True
|
||||
|
||||
#--------------------------------------------------------
|
||||
def getrpm( self, aIndex ):
|
||||
''' Get the rpm value. '''
|
||||
return (self._buffer[aIndex + 2] | (self._buffer[aIndex + 3] << 8)) >> 6
|
||||
|
||||
#--------------------------------------------------------
|
||||
def checksum( self, aIndex = 0 ):
|
||||
''' Calculate the checksum and return true if matched. '''
|
||||
chk = 0
|
||||
crc = self._buffer[aIndex + 20] | (self._buffer[aIndex + 21] << 8)
|
||||
for i in range(aIndex, aIndex + 20, 2):
|
||||
w = self._buffer[i] + (self._buffer[i + 1] << 8)
|
||||
chk = (chk << 1) + w
|
||||
|
||||
chk = (chk & 0x7FFF) + (chk >> 15)
|
||||
return (chk & 0x7FFF) == crc
|
||||
|
||||
#--------------------------------------------------------
|
||||
def _fillbuffer( self ):
|
||||
''' Fill the buffer with serial data '''
|
||||
read = self._uart.readinto(self._buffer, _BUFFERSIZE)
|
||||
#Read until the buffer is full
|
||||
while read < _BUFFERSIZE:
|
||||
#Read into temp buffer then append to _buffer
|
||||
cnt = self._uart.readinto(self._inbuffer, _BUFFERSIZE - read)
|
||||
if cnt:
|
||||
for x in range(cnt):
|
||||
self._buffer[read + x] = self._inbuffer[x]
|
||||
read += cnt
|
||||
|
||||
#--------------------------------------------------------
|
||||
def process( self ):
|
||||
''' Process buffer data and return number of good values read. '''
|
||||
|
||||
good = 0
|
||||
#Loop for each frame in the buffer
|
||||
for x in range(0, _BUFFERSIZE, _FRAMESIZE):
|
||||
# if x == 0:
|
||||
# printit(self._buffer[x:x + _FRAMESIZE])
|
||||
#Make sure buffer starts with start tag
|
||||
if self._buffer[x] == _FRAMESTART:
|
||||
#Make sure checksum is good
|
||||
if self.checksum(x):
|
||||
#Average rpm value.
|
||||
self._rpm = (self._rpm + self.getrpm(x)) >> 1
|
||||
angle = (self._buffer[x + 1] - 0xA0) << 2
|
||||
a = x + 4 #Point to angle data array
|
||||
#Process 4 angle values
|
||||
for y in range(a, a + 16, 4): #4 angle entries of 4 bytes each
|
||||
dist = self._buffer[y] | (self._buffer[y + 1] << 8)
|
||||
#If invalid flag set then clear distance
|
||||
if dist & _INVALID:
|
||||
dist = -1
|
||||
else:
|
||||
good += 1
|
||||
self._angles[angle] = dist
|
||||
angle += 1
|
||||
#todo: May want to keep track of bad checksums and frames and report them
|
||||
# else:
|
||||
# print('checksum bad')
|
||||
# else:
|
||||
# self._insync = False
|
||||
# else:
|
||||
# print('bad frame')
|
||||
|
||||
return good
|
||||
|
||||
#--------------------------------------------------------
|
||||
def update( self ):
|
||||
''' Update lidar data '''
|
||||
if self._insync:
|
||||
self._fillbuffer() # Read new data
|
||||
good = self.process() # Process the data
|
||||
a = self._angles[0]
|
||||
b = self._angles[90]
|
||||
c = self._angles[180]
|
||||
d = self._angles[270]
|
||||
print('rpm:', self.rpm, a, b, c, d, good, ' ', end='\r')
|
||||
else:
|
||||
insync = self.sync()
|
||||
|
||||
#Adjust motor speed slightly to try and keep ~245 rpm
|
||||
self._speed = _MOTORSPEED if self._rpm < 245 else _MOTORSPEED - 1
|
||||
self._motor.duty(self._speed)
|
||||
|
||||
#--------------------------------------------------------
|
||||
def run( ):
|
||||
l = lidar()
|
||||
|
||||
while True:
|
||||
l.update()
|
||||
if l.insync:
|
||||
pass
|
||||
#todo: Output to display
|
||||
|
||||
run()
|
Ładowanie…
Reference in New Issue