kiln-controller-max31856/lib/oven.py

750 wiersze
25 KiB
Python

import threading
import time
import datetime
import logging
import json
import config
import os
import digitalio
import busio
log = logging.getLogger(__name__)
class DupFilter(object):
def __init__(self):
self.msgs = set()
def filter(self, record):
rv = record.msg not in self.msgs
self.msgs.add(record.msg)
return rv
class Duplogger():
def __init__(self):
self.log = logging.getLogger("%s.dupfree" % (__name__))
dup_filter = DupFilter()
self.log.addFilter(dup_filter)
def logref(self):
return self.log
duplog = Duplogger().logref()
class Output(object):
'''This represents a GPIO output that controls a solid
state relay to turn the kiln elements on and off.
inputs
config.gpio_heat
'''
def __init__(self):
self.active = False
self.heater = digitalio.DigitalInOut(config.gpio_heat)
self.heater.direction = digitalio.Direction.OUTPUT
def heat(self,sleepfor):
self.heater.value = True
time.sleep(sleepfor)
def cool(self,sleepfor):
'''no active cooling, so sleep'''
self.heater.value = False
time.sleep(sleepfor)
# wrapper for blinka board
class Board(object):
'''This represents a blinka board where this code
runs.
'''
def __init__(self):
log.info("board: %s" % (self.name))
self.temp_sensor.start()
class RealBoard(Board):
'''Each board has a thermocouple board attached to it.
Any blinka board that supports SPI can be used. The
board is automatically detected by blinka.
'''
def __init__(self):
self.name = None
self.load_libs()
self.temp_sensor = self.choose_tempsensor()
Board.__init__(self)
def load_libs(self):
import board
self.name = board.board_id
def choose_tempsensor(self):
if config.max31855:
return Max31855()
if config.max31856:
return Max31856()
class SimulatedBoard(Board):
'''Simulated board used during simulations.
See config.simulate
'''
def __init__(self):
self.name = "simulated"
self.temp_sensor = TempSensorSimulated()
Board.__init__(self)
class TempSensor(threading.Thread):
'''Used by the Board class. Each Board must have
a TempSensor.
'''
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.time_step = config.sensor_time_wait
self.status = ThermocoupleTracker()
class TempSensorSimulated(TempSensor):
'''Simulates a temperature sensor '''
def __init__(self):
TempSensor.__init__(self)
self.simulated_temperature = 0
def temperature(self):
return self.simulated_temperature
class TempSensorReal(TempSensor):
'''real temperature sensor that takes many measurements
during the time_step
inputs
config.temperature_average_samples
'''
def __init__(self):
TempSensor.__init__(self)
self.sleeptime = self.time_step / float(config.temperature_average_samples)
self.temptracker = TempTracker()
self.spi = busio.SPI(config.spi_sclk, config.spi_mosi, config.spi_miso)
self.cs = digitalio.DigitalInOut(config.spi_cs)
def get_temperature(self):
'''read temp from tc and convert if needed'''
try:
temp = self.raw_temp() # raw_temp provided by subclasses
if config.temp_scale.lower() == "f":
temp = (temp*9/5)+32
self.status.good()
return temp
except ThermocoupleError as tce:
if tce.ignore:
log.error("Problem reading temp (ignored) %s" % (tce.message))
self.status.good()
else:
log.error("Problem reading temp %s" % (tce.message))
self.status.bad()
return None
def temperature(self):
'''average temp over a duty cycle'''
return self.temptracker.get_avg_temp()
def run(self):
while True:
temp = self.get_temperature()
if temp:
self.temptracker.add(temp)
time.sleep(self.sleeptime)
class TempTracker(object):
'''creates a sliding window of N temperatures per
config.sensor_time_wait
'''
def __init__(self):
self.size = config.temperature_average_samples
self.temps = [0 for i in range(self.size)]
def add(self,temp):
self.temps.append(temp)
while(len(self.temps) > self.size):
del self.temps[0]
def get_avg_temp(self, chop=25):
'''
strip off chop percent from the beginning and end of the sorted temps
then return the average of what is left
'''
chop = chop / 100
temps = sorted(self.temps)
total = len(temps)
items = int(total*chop)
temps = temps[items:total-items]
return sum(temps) / len(temps)
class ThermocoupleTracker(object):
'''Keeps sliding window to track successful/failed calls to get temp
over the last two duty cycles.
'''
def __init__(self):
self.size = config.temperature_average_samples * 2
self.status = [True for i in range(self.size)]
self.limit = 30
def good(self):
'''True is good!'''
self.status.append(True)
del self.status[0]
def bad(self):
'''False is bad!'''
self.status.append(False)
del self.status[0]
def error_percent(self):
errors = sum(i == False for i in self.status)
return (errors/self.size)*100
def over_error_limit(self):
if self.error_percent() > self.limit:
return True
return False
class Max31855(TempSensorReal):
'''each subclass expected to handle errors and get temperature'''
def __init__(self):
TempSensorReal.__init__(self)
log.info("thermocouple MAX31855")
import adafruit_max31855
self.thermocouple = adafruit_max31855.MAX31855(self.spi, self.cs)
def raw_temp(self):
try:
return self.thermocouple.temperature_NIST
except RuntimeError as rte:
if rte.args and rte.args[0]:
raise Max31855_Error(rte.args[0])
raise Max31855_Error('unknown')
class ThermocoupleError(Exception):
'''
thermocouple exception parent class to handle mapping of error messages
and make them consistent across adafruit libraries. Also set whether
each exception should be ignored based on settings in config.py.
'''
def __init__(self, message):
self.ignore = False
self.message = message
self.map_message()
self.set_ignore()
super().__init__(self.message)
def set_ignore(self):
if self.message == "not connected" and config.ignore_tc_lost_connection == True:
self.ignore = True
if self.message == "short circuit" and config.ignore_tc_short_errors == True:
self.ignore = True
if self.message == "unknown" and config.ignore_tc_unknown_error == True:
self.ignore = True
if self.message == "cold junction range fault" and config.ignore_tc_cold_junction_range_error == True:
self.ignore = True
if self.message == "thermocouple range fault" and config.ignore_tc_range_error == True:
self.ignore = True
if self.message == "cold junction temp too high" and config.ignore_tc_cold_junction_temp_high == True:
self.ignore = True
if self.message == "cold junction temp too low" and config.ignore_tc_cold_junction_temp_low == True:
self.ignore = True
if self.message == "thermocouple temp too high" and config.ignore_tc_temp_high == True:
self.ignore = True
if self.message == "thermocouple temp too low" and config.ignore_tc_temp_low == True:
self.ignore = True
if self.message == "voltage too high or low" and config.ignore_tc_voltage_error == True:
self.ignore = True
def map_message(self):
try:
self.message = self.map[self.orig_message]
except KeyError:
self.message = "unknown"
class Max31855_Error(ThermocoupleError):
'''
All children must set self.orig_message and self.map
'''
def __init__(self, message):
self.orig_message = message
# this purposefully makes "fault reading" and
# "Total thermoelectric voltage out of range..." unknown errors
self.map = {
"thermocouple not connected" : "not connected",
"short circuit to ground" : "short circuit",
"short circuit to power" : "short circuit",
}
super().__init__(self.message)
class Max31856_Error(ThermocoupleError):
def __init__(self, message):
self.orig_message = message
self.map = {
"cj_range" : "cold junction range fault",
"tc_range" : "thermocouple range fault",
"cj_high" : "cold junction temp too high",
"cj_low" : "cold junction temp too low",
"tc_high" : "thermocouple temp too high",
"tc_low" : "thermocouple temp too low",
"voltage" : "voltage too high or low",
"open_tc" : "not connected"
}
super().__init__(self.message)
class Max31856(TempSensorReal):
'''each subclass expected to handle errors and get temperature'''
def __init__(self):
TempSensorReal.__init__(self)
log.info("thermocouple MAX31856")
import adafruit_max31856
adafruit_max31856.ThermocoupleType(config.thermocouple_type)
self.thermocouple = adafruit_max31856.MAX31856(spi, cs)
if (config.ac_freq_50hz == True):
self.thermocouple.noise_rejection(50)
else:
self.thermocouple.noise_rejection(60)
def raw_temp(self):
# The underlying adafruit library does not throw exceptions
# for thermocouple errors. Instead, they are stored in
# dict named self.thermocouple.fault. Here we check that
# dict for errors and raise an exception.
# and raise Max31856_Error(message)
temp = self.thermocouple.temperature
for k,v in self.thermocouple.fault:
if v:
raise Max31856_Error(k)
return temp
class Oven(threading.Thread):
'''parent oven class. this has all the common code
for either a real or simulated oven'''
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.temperature = 0
self.time_step = config.sensor_time_wait
self.reset()
def reset(self):
self.cost = 0
self.state = "IDLE"
self.profile = None
self.start_time = 0
self.runtime = 0
self.totaltime = 0
self.target = 0
self.heat = 0
self.pid = PID(ki=config.pid_ki, kd=config.pid_kd, kp=config.pid_kp)
def run_profile(self, profile, startat=0):
self.reset()
self.startat = startat * 60
self.runtime = self.startat
self.start_time = datetime.datetime.now() - datetime.timedelta(seconds=self.startat)
self.profile = profile
self.totaltime = profile.get_duration()
self.state = "RUNNING"
log.info("Running schedule %s starting at %d minutes" % (profile.name,startat))
log.info("Starting")
def abort_run(self):
self.reset()
self.save_automatic_restart_state()
def kiln_must_catch_up(self):
'''shift the whole schedule forward in time by one time_step
to wait for the kiln to catch up'''
if config.kiln_must_catch_up == True:
temp = self.board.temp_sensor.temperature() + \
config.thermocouple_offset
# kiln too cold, wait for it to heat up
if self.target - temp > config.pid_control_window:
log.info("kiln must catch up, too cold, shifting schedule")
self.start_time = datetime.datetime.now() - datetime.timedelta(milliseconds = self.runtime * 1000)
# kiln too hot, wait for it to cool down
if temp - self.target > config.pid_control_window:
log.info("kiln must catch up, too hot, shifting schedule")
self.start_time = datetime.datetime.now() - datetime.timedelta(milliseconds = self.runtime * 1000)
def update_runtime(self):
runtime_delta = datetime.datetime.now() - self.start_time
if runtime_delta.total_seconds() < 0:
runtime_delta = datetime.timedelta(0)
self.runtime = runtime_delta.total_seconds()
def update_target_temp(self):
self.target = self.profile.get_target_temperature(self.runtime)
def reset_if_emergency(self):
'''reset if the temperature is way TOO HOT, or other critical errors detected'''
if (self.board.temp_sensor.temperature() + config.thermocouple_offset >=
config.emergency_shutoff_temp):
log.info("emergency!!! temperature too high")
if config.ignore_temp_too_high == False:
self.abort_run()
if self.board.temp_sensor.status.over_error_limit():
log.info("emergency!!! too many errors in a short period")
if config.ignore_too_many_tc_errors == False:
self.abort_run()
def reset_if_schedule_ended(self):
if self.runtime > self.totaltime:
log.info("schedule ended, shutting down")
log.info("total cost = %s%.2f" % (config.currency_type,self.cost))
self.abort_run()
def update_cost(self):
if self.heat:
cost = (config.kwh_rate * config.kw_elements) * ((self.heat)/3600)
else:
cost = 0
self.cost = self.cost + cost
def get_state(self):
temp = 0
try:
temp = self.board.temp_sensor.temperature() + config.thermocouple_offset
except AttributeError as error:
# this happens at start-up with a simulated oven
temp = 0
pass
state = {
'cost': self.cost,
'runtime': self.runtime,
'temperature': temp,
'target': self.target,
'state': self.state,
'heat': self.heat,
'totaltime': self.totaltime,
'kwh_rate': config.kwh_rate,
'currency_type': config.currency_type,
'profile': self.profile.name if self.profile else None,
'pidstats': self.pid.pidstats,
}
return state
def save_state(self):
with open(config.automatic_restart_state_file, 'w', encoding='utf-8') as f:
json.dump(self.get_state(), f, ensure_ascii=False, indent=4)
def state_file_is_old(self):
'''returns True is state files is older than 15 mins default
False if younger
True if state file cannot be opened or does not exist
'''
if os.path.isfile(config.automatic_restart_state_file):
state_age = os.path.getmtime(config.automatic_restart_state_file)
now = time.time()
minutes = (now - state_age)/60
if(minutes <= config.automatic_restart_window):
return False
return True
def save_automatic_restart_state(self):
# only save state if the feature is enabled
if not config.automatic_restarts == True:
return False
self.save_state()
def should_i_automatic_restart(self):
# only automatic restart if the feature is enabled
if not config.automatic_restarts == True:
return False
if self.state_file_is_old():
duplog.info("automatic restart not possible. state file does not exist or is too old.")
return False
with open(config.automatic_restart_state_file) as infile:
d = json.load(infile)
if d["state"] != "RUNNING":
duplog.info("automatic restart not possible. state = %s" % (d["state"]))
return False
return True
def automatic_restart(self):
with open(config.automatic_restart_state_file) as infile: d = json.load(infile)
startat = d["runtime"]/60
filename = "%s.json" % (d["profile"])
profile_path = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'storage','profiles',filename))
log.info("automatically restarting profile = %s at minute = %d" % (profile_path,startat))
with open(profile_path) as infile:
profile_json = json.dumps(json.load(infile))
profile = Profile(profile_json)
self.run_profile(profile,startat=startat)
self.cost = d["cost"]
time.sleep(1)
self.ovenwatcher.record(profile)
def set_ovenwatcher(self,watcher):
log.info("ovenwatcher set in oven class")
self.ovenwatcher = watcher
def run(self):
while True:
if self.state == "IDLE":
if self.should_i_automatic_restart() == True:
self.automatic_restart()
time.sleep(1)
continue
if self.state == "RUNNING":
self.update_cost()
self.save_automatic_restart_state()
self.kiln_must_catch_up()
self.update_runtime()
self.update_target_temp()
self.heat_then_cool()
self.reset_if_emergency()
self.reset_if_schedule_ended()
class SimulatedOven(Oven):
def __init__(self):
self.board = SimulatedBoard()
self.t_env = config.sim_t_env
self.c_heat = config.sim_c_heat
self.c_oven = config.sim_c_oven
self.p_heat = config.sim_p_heat
self.R_o_nocool = config.sim_R_o_nocool
self.R_ho_noair = config.sim_R_ho_noair
self.R_ho = self.R_ho_noair
# set temps to the temp of the surrounding environment
self.t = self.t_env # deg C temp of oven
self.t_h = self.t_env #deg C temp of heating element
super().__init__()
# start thread
self.start()
log.info("SimulatedOven started")
def heating_energy(self,pid):
# using pid here simulates the element being on for
# only part of the time_step
self.Q_h = self.p_heat * self.time_step * pid
def temp_changes(self):
#temperature change of heat element by heating
self.t_h += self.Q_h / self.c_heat
#energy flux heat_el -> oven
self.p_ho = (self.t_h - self.t) / self.R_ho
#temperature change of oven and heating element
self.t += self.p_ho * self.time_step / self.c_oven
self.t_h -= self.p_ho * self.time_step / self.c_heat
#temperature change of oven by cooling to environment
self.p_env = (self.t - self.t_env) / self.R_o_nocool
self.t -= self.p_env * self.time_step / self.c_oven
self.temperature = self.t
self.board.temp_sensor.simulated_temperature = self.t
def heat_then_cool(self):
pid = self.pid.compute(self.target,
self.board.temp_sensor.temperature() +
config.thermocouple_offset)
heat_on = float(self.time_step * pid)
heat_off = float(self.time_step * (1 - pid))
self.heating_energy(pid)
self.temp_changes()
# self.heat is for the front end to display if the heat is on
self.heat = 0.0
if heat_on > 0:
self.heat = heat_on
log.info("simulation: -> %dW heater: %.0f -> %dW oven: %.0f -> %dW env" % (int(self.p_heat * pid),
self.t_h,
int(self.p_ho),
self.t,
int(self.p_env)))
time_left = self.totaltime - self.runtime
try:
log.info("temp=%.2f, target=%.2f, error=%.2f, pid=%.2f, p=%.2f, i=%.2f, d=%.2f, heat_on=%.2f, heat_off=%.2f, run_time=%d, total_time=%d, time_left=%d" %
(self.pid.pidstats['ispoint'],
self.pid.pidstats['setpoint'],
self.pid.pidstats['err'],
self.pid.pidstats['pid'],
self.pid.pidstats['p'],
self.pid.pidstats['i'],
self.pid.pidstats['d'],
heat_on,
heat_off,
self.runtime,
self.totaltime,
time_left))
except KeyError:
pass
# we don't actually spend time heating & cooling during
# a simulation, so sleep.
time.sleep(self.time_step)
class RealOven(Oven):
def __init__(self):
self.board = RealBoard()
self.output = Output()
self.reset()
# call parent init
Oven.__init__(self)
# start thread
self.start()
def reset(self):
super().reset()
self.output.cool(0)
def heat_then_cool(self):
pid = self.pid.compute(self.target,
self.board.temp_sensor.temperature() +
config.thermocouple_offset)
heat_on = float(self.time_step * pid)
heat_off = float(self.time_step * (1 - pid))
# self.heat is for the front end to display if the heat is on
self.heat = 0.0
if heat_on > 0:
self.heat = 1.0
if heat_on:
self.output.heat(heat_on)
if heat_off:
self.output.cool(heat_off)
time_left = self.totaltime - self.runtime
try:
log.info("temp=%.2f, target=%.2f, error=%.2f, pid=%.2f, p=%.2f, i=%.2f, d=%.2f, heat_on=%.2f, heat_off=%.2f, run_time=%d, total_time=%d, time_left=%d" %
(self.pid.pidstats['ispoint'],
self.pid.pidstats['setpoint'],
self.pid.pidstats['err'],
self.pid.pidstats['pid'],
self.pid.pidstats['p'],
self.pid.pidstats['i'],
self.pid.pidstats['d'],
heat_on,
heat_off,
self.runtime,
self.totaltime,
time_left))
except KeyError:
pass
class Profile():
def __init__(self, json_data):
obj = json.loads(json_data)
self.name = obj["name"]
self.data = sorted(obj["data"])
def get_duration(self):
return max([t for (t, x) in self.data])
def get_surrounding_points(self, time):
if time > self.get_duration():
return (None, None)
prev_point = None
next_point = None
for i in range(len(self.data)):
if time < self.data[i][0]:
prev_point = self.data[i-1]
next_point = self.data[i]
break
return (prev_point, next_point)
def get_target_temperature(self, time):
if time > self.get_duration():
return 0
(prev_point, next_point) = self.get_surrounding_points(time)
incl = float(next_point[1] - prev_point[1]) / float(next_point[0] - prev_point[0])
temp = prev_point[1] + (time - prev_point[0]) * incl
return temp
class PID():
def __init__(self, ki=1, kp=1, kd=1):
self.ki = ki
self.kp = kp
self.kd = kd
self.lastNow = datetime.datetime.now()
self.iterm = 0
self.lastErr = 0
self.pidstats = {}
# FIX - this was using a really small window where the PID control
# takes effect from -1 to 1. I changed this to various numbers and
# settled on -50 to 50 and then divide by 50 at the end. This results
# in a larger PID control window and much more accurate control...
# instead of what used to be binary on/off control.
def compute(self, setpoint, ispoint):
now = datetime.datetime.now()
timeDelta = (now - self.lastNow).total_seconds()
window_size = 100
error = float(setpoint - ispoint)
# this removes the need for config.stop_integral_windup
# it turns the controller into a binary on/off switch
# any time it's outside the window defined by
# config.pid_control_window
icomp = 0
output = 0
out4logs = 0
dErr = 0
if error < (-1 * config.pid_control_window):
log.info("kiln outside pid control window, max cooling")
output = 0
# it is possible to set self.iterm=0 here and also below
# but I dont think its needed
elif error > (1 * config.pid_control_window):
log.info("kiln outside pid control window, max heating")
output = 1
else:
icomp = (error * timeDelta * (1/self.ki))
self.iterm += (error * timeDelta * (1/self.ki))
dErr = (error - self.lastErr) / timeDelta
output = self.kp * error + self.iterm + self.kd * dErr
output = sorted([-1 * window_size, output, window_size])[1]
out4logs = output
output = float(output / window_size)
self.lastErr = error
self.lastNow = now
# no active cooling
if output < 0:
output = 0
self.pidstats = {
'time': time.mktime(now.timetuple()),
'timeDelta': timeDelta,
'setpoint': setpoint,
'ispoint': ispoint,
'err': error,
'errDelta': dErr,
'p': self.kp * error,
'i': self.iterm,
'd': self.kd * dErr,
'kp': self.kp,
'ki': self.ki,
'kd': self.kd,
'pid': out4logs,
'out': output,
}
return output