kiln_controller/kiln/stepper.py

244 wiersze
7.5 KiB
Python
Czysty Zwykły widok Historia

import time
import atexit
import threading
import warnings
import Queue
import logging
logger = logging.getLogger("kiln.Stepper")
try:
from RPi import GPIO
except ImportError:
pass
class Stepper(threading.Thread):
pattern = [
2014-10-15 18:02:56 +00:00
[1,1,0,0],
[0,1,1,0],
2014-10-15 18:02:56 +00:00
[0,0,1,1],
[1,0,0,1]]
2014-10-26 02:54:40 +00:00
def __init__(self, pin1=5, pin2=6, pin3=13, pin4=19, timeout=1, home_pin=None):
super(Stepper, self).__init__()
self.daemon = True
self.queue = Queue.Queue()
self.finished = threading.Event()
self.pins = [pin1, pin2, pin3, pin4]
2014-10-15 18:02:56 +00:00
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin1, GPIO.OUT)
GPIO.setup(pin2, GPIO.OUT)
GPIO.setup(pin3, GPIO.OUT)
GPIO.setup(pin4, GPIO.OUT)
2014-10-26 02:54:40 +00:00
self.home_pin = home_pin
GPIO.setup(home_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
self.timeout = timeout
2014-10-26 02:54:40 +00:00
self.home()
self.start()
def stop(self):
self.queue.put((None, None, None))
def step(self, num, speed=10, block=False):
"""Step the stepper motor
Parameters
----------
num : int
Number of steps
speed : int
Number of steps per second
block : bool
Block while stepping?
"""
self.finished.clear()
self.queue.put((num, speed, block))
self.finished.wait()
2014-10-26 02:54:40 +00:00
def home(self):
if self.home_pin is None:
raise ValueError("No homing switch defined")
while GPIO.input(self.home_pin):
for i in range(len(self.pattern)):
for pin, out in zip(self.pins, self.pattern[i]):
GPIO.output(pin, out)
time.sleep(1. / 150.)
self.phase = 0
def run(self):
try:
step, speed, block = self.queue.get()
while step is not None:
for pin, out in zip(self.pins, self.pattern[self.phase%len(self.pattern)]):
GPIO.output(pin, out)
if block:
self._step(step, speed)
self.finished.set()
else:
self.finished.set()
self._step_noblock(step, speed)
try:
step, speed, block = self.queue.get(True, self.timeout)
except Queue.Empty:
#handle the timeout, turn off all pins
for pin in self.pins:
GPIO.output(pin, False)
step, speed, block = self.queue.get()
except:
import traceback
traceback.print_exc()
for pin in self.pins:
GPIO.output(pin, False)
2014-10-15 18:02:56 +00:00
GPIO.cleanup()
def _step_noblock(self, step, speed):
ispeed = 1. / (2.*speed)
target = self.phase + step
while self.phase != target:
now = time.time()
self.phase += 1 if target > self.phase else -1
output = self.pattern[self.phase%len(self.pattern)]
for pin, out in zip(self.pins, output):
GPIO.output(pin, out)
if not self.queue.empty():
step, speed, block = self.queue.get()
ispeed = 1. / (2.*speed)
target += step
if block:
self._step(target - self.phase, speed)
self.finished.set()
diff = ispeed - (time.time() - now)
if (diff) > 0:
time.sleep(diff)
else:
warnings.warn("Step rate too high, stepping as fast as possible")
def _step(self, step, speed):
2014-10-15 18:02:56 +00:00
print "Stepping %d steps at %d steps / second"%(step, speed)
if step < 0:
steps = range(step, 0)[::-1]
2014-10-15 18:02:56 +00:00
else:
steps = range(step)
for i in steps:
now = time.time()
output = self.pattern[(self.phase+i)%len(self.pattern)]
for pin, out in zip(self.pins, output):
GPIO.output(pin, out)
diff = 1. / (2*speed) - (time.time() - now)
if (diff) > 0:
time.sleep(diff)
self.phase += step
class StepperSim(object):
def __init__(self):
self.phase = 0
def step(self, num, speed=10, block=False):
print "Simulated stepping %d steps at %d steps / second"%(num, speed)
if block:
time.sleep(1)
def stop(self):
print "stopping"
class Regulator(threading.Thread):
def __init__(self, maxsteps=4500, minsteps=2480, speed=150, ignite_pin=None, flame_pin=None, simulate=False):
"""Set up a stepper-controlled regulator. Implement some safety measures
to make sure everything gets shut off at the end
TODO: integrate flame sensor by converting this into a thread, and checking
flame state regularly. If flame sensor off, immediately increase gas and attempt
reignition, or shut off after 5 seconds of failure.
Parameters
----------
maxsteps : int
The max value for the regulator, in steps
minsteps : int
The minimum position to avoid extinguishing the flame
speed : int
Speed to turn the stepper, in steps per second
ignite_pin : int or None
If not None, turn on this pin during the ignite sequence
"""
if simulate:
self.stepper = StepperSim()
else:
self.stepper = Stepper()
self.stepper.start()
self.current = 0
self.max = maxsteps
self.min = minsteps
self.speed = speed
self.ignite_pin = ignite_pin
if ignite_pin is not None:
GPIO.setup(ignite_pin, OUT)
self.flame_pin = flame_pin
if flame_pin is not None:
GPIO.setup(flame_pin, IN)
2014-10-15 18:02:56 +00:00
def exit():
if self.current != 0:
self.off()
2014-10-15 18:02:56 +00:00
self.stepper.stop()
atexit.register(exit)
def ignite(self, start=2800, delay=1):
if self.current != 0:
raise ValueError("Must be off to ignite")
logger.info("Ignition start")
self.stepper.step(start, self.speed, block=True)
if self.ignite_pin is not None:
GPIO.output(self.ignite_pin, True)
time.sleep(delay)
if self.ignite_pin is not None:
GPIO.output(self.ignite_pin, False)
2014-10-18 20:38:43 +00:00
self.stepper.step(self.min - start, self.speed, block=True)
2014-10-15 18:02:56 +00:00
self.current = self.min
logger.info("Ignition complete")
def off(self, block=True):
logger.info("Shutting off gas")
2014-10-26 02:54:40 +00:00
#self.stepper.step(-self.current, self.speed, block=block)
self.stepper.home()
self.current = 0
def set(self, value, block=False):
if self.current == 0:
raise ValueError("System must be ignited to set value")
if not 0 <= value <= 1:
raise ValueError("Must give fraction between 0 and 1")
2014-10-15 18:02:56 +00:00
target = int(value * (self.max - self.min) + self.min)
nsteps = target - self.current
2014-10-15 18:02:56 +00:00
print "Currently at %d, target %d, stepping %d"%(self.current, target, nsteps)
self.current = target
self.stepper.step(nsteps, self.speed, block=block)
@property
def output(self):
out = (self.current - self.min) / float(self.max - self.min)
if out < 0:
return -1
return out
def run(self):
"""Check the status of the flame sensor"""
#since the flame sensor does not yet exist, we'll save this for later
pass