Experimental encoder driver.

encoder_driver
Peter Hinch 2023-11-02 15:15:41 +00:00
rodzic 4117ddf230
commit e6d82c7dcb
1 zmienionych plików z 35 dodań i 9 usunięć

Wyświetl plik

@ -11,21 +11,35 @@
import uasyncio as asyncio
from machine import Pin
from . import Delay_ms
class Encoder:
def __init__(self, pin_x, pin_y, v=0, div=1, vmin=None, vmax=None,
mod=None, callback=lambda a, b : None, args=(), delay=100):
def __init__(
self,
pin_x,
pin_y,
v=0,
div=1,
vmin=None,
vmax=None,
mod=None,
callback=lambda a, b: None,
args=(),
delay=100,
):
self._pin_x = pin_x
self._pin_y = pin_y
self._x = pin_x()
self._y = pin_y()
self._v = v * div # Initialise hardware value
self._cv = v # Current (divided) value
self.delay = delay # Pause (ms) for motion to stop/limit callback frequency
self._delay = Delay_ms(duration=delay)
self._timeout = 2 * delay # Continuous rotation timeout
# Pause (ms) for motion to stop/limit callback frequency
if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax):
raise ValueError('Incompatible args: must have vmin <= v <= vmax')
raise ValueError("Incompatible args: must have vmin <= v <= vmax")
self._tsf = asyncio.ThreadSafeFlag()
trig = Pin.IRQ_RISING | Pin.IRQ_FALLING
try:
@ -34,7 +48,7 @@ class Encoder:
except TypeError: # hard arg is unsupported on some hosts
xirq = pin_x.irq(trigger=trig, handler=self._x_cb)
yirq = pin_y.irq(trigger=trig, handler=self._y_cb)
asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args))
self._runt = asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args))
# Hardware IRQ's. Duration 36μs on Pyboard 1 ~50μs on ESP32.
# IRQ latency: 2nd edge may have occured by the time ISR runs, in
@ -51,15 +65,23 @@ class Encoder:
self._v -= 1 if y ^ self._pin_x() else -1
self._tsf.set()
async def respond(self): # Retrigger the delay until motion/bounce stops
while True:
await self._tsf.wait()
self._delay.trigger()
async def _run(self, vmin, vmax, div, mod, cb, args):
pv = self._v # Prior hardware value
pcv = self._cv # Prior divided value passed to callback
lcv = pcv # Current value after limits applied
plcv = pcv # Previous value after limits applied
delay = self.delay
self._rspt = asyncio.create_task(self.respond())
while True:
await self._tsf.wait()
await asyncio.sleep_ms(delay) # Wait for motion to stop.
try:
await asyncio.wait_for_ms(self._delay.wait(), self._timeout)
except asyncio.TimeoutError: # Continuous rotation
pass
self._delay.clear()
hv = self._v # Sample hardware (atomic read).
if hv == pv: # A change happened but was negated before
continue # this got scheduled. Nothing to do.
@ -79,3 +101,7 @@ class Encoder:
def value(self):
return self._cv
def deinit(self):
self._rspt.cancel()
self._runt.cancel()