kopia lustrzana https://github.com/peterhinch/micropython-samples
327 wiersze
12 KiB
Python
327 wiersze
12 KiB
Python
# mains.py Data collection and processing module for the power meter.
|
|
|
|
# The MIT License (MIT)
|
|
#
|
|
# Copyright (c) 2017 Peter Hinch
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
|
|
from pyb import SPI, Pin, ADC
|
|
from array import array
|
|
from micropython import const
|
|
from math import sin, cos, asin, pi, radians, sqrt
|
|
import uasyncio as asyncio
|
|
import gc
|
|
gc.collect()
|
|
|
|
# PINOUT
|
|
# vadc X19
|
|
# iadc X20
|
|
# MOSI X8
|
|
# MISO X7
|
|
# SCK X6
|
|
# CSN0 X5 First PGA
|
|
# CSN1 X4 Second PGA
|
|
|
|
# SAMPLING
|
|
NSAMPLES = const(400)
|
|
def arr_gen(n):
|
|
for _ in range(n):
|
|
yield 0
|
|
|
|
isamples = array('h', arr_gen(NSAMPLES))
|
|
vsamples = array('h', arr_gen(NSAMPLES))
|
|
gc.collect()
|
|
|
|
# HARDWARE
|
|
vadc = ADC(Pin.board.X19)
|
|
iadc = ADC(Pin.board.X20)
|
|
|
|
spi = SPI(1)
|
|
spi.init(SPI.MASTER, polarity = 0, phase = 0)
|
|
|
|
# ************* Programmable Gain Amplifier *************
|
|
|
|
class MCP6S91():
|
|
CHANNEL_ADDR = 0x41
|
|
GAIN_ADDR = 0x40
|
|
GAINVALS = (1, 2, 4, 5, 8, 10, 16, 32)
|
|
PINS = ('X5', 'X4')
|
|
def __init__(self, devno):
|
|
try:
|
|
self.csn = Pin(MCP6S91.PINS[devno], mode = Pin.OUT_PP)
|
|
except IndexError:
|
|
raise ValueError('MCP6S91 device no. must be 0 or 1.')
|
|
self.csn.value(1)
|
|
self.csn.value(0)
|
|
self.csn.value(1) # Set state machine to known state
|
|
self.gain(1)
|
|
|
|
def gain(self, value):
|
|
try:
|
|
gainval = MCP6S91.GAINVALS.index(value)
|
|
except ValueError:
|
|
raise ValueError('MCP6S91 invalid gain {}'.format(value))
|
|
self.csn.value(0)
|
|
spi.send(MCP6S91.GAIN_ADDR)
|
|
spi.send(gainval)
|
|
self.csn.value(1)
|
|
|
|
# Two cascaded MCP6S91 devices provide gains 1, 2, 5, 10, 20, 50, 100
|
|
class PGA():
|
|
gvals = { 1 : (1, 1), 2 : (2, 1), 5 : (5, 1), 10 : (10, 1),
|
|
20 : (10, 2), 50 : (10, 5), 100 : (10, 10) }
|
|
|
|
def __init__(self):
|
|
self.amp0 = MCP6S91(0)
|
|
self.amp1 = MCP6S91(1)
|
|
|
|
def gain(self, value):
|
|
try:
|
|
g0, g1 = PGA.gvals[value]
|
|
except KeyError:
|
|
raise ValueError('PGA invalid gain {}'.format(value))
|
|
self.amp0.gain(g0)
|
|
self.amp1.gain(g1)
|
|
|
|
# ************* Data Acquisition *************
|
|
|
|
# Integer sample data put into vsamples and isamples global arrays.
|
|
# Results are in range +-2047
|
|
|
|
# SIMULATION PARAMETERS
|
|
VPEAK = 0.545 # Relative to ADC FSD
|
|
IPEAK = 0.6
|
|
VPHASE = 0
|
|
IPHASE = radians(45)
|
|
|
|
def sample(simulate=False):
|
|
# Acquire just over 2 full cycles of AC
|
|
if simulate:
|
|
for n in range(NSAMPLES):
|
|
isamples[n] = int(2047 + IPEAK * 2047 * sin(4.2 * pi * n / NSAMPLES + IPHASE))
|
|
vsamples[n] = int(2047 + VPEAK * 2047 * sin(4.2 * pi * n / NSAMPLES + VPHASE))
|
|
else:
|
|
for n in range(NSAMPLES):
|
|
isamples[n] = iadc.read()
|
|
vsamples[n] = vadc.read()
|
|
for n in range(NSAMPLES): # Normalise to -2047 to +2048
|
|
vsamples[n] -= 2047
|
|
if simulate:
|
|
isamples[n] -= 2047
|
|
else:
|
|
isamples[n] = 2047 - isamples[n] # Sod's law. That's the way I wired the CT.
|
|
|
|
# ************* Preprocessing *************
|
|
|
|
# Filter data. Produce a single cycle of floating point data in two datasets.
|
|
# Both are scaled -1.0 to +1.0.
|
|
# Plot data is scaled such that the data exactly fits the range.
|
|
# Output data is scaled such that DAC FS fits the range.
|
|
|
|
class Preprocessor():
|
|
arraysize = const(NSAMPLES // 2) # We acquire > 2 cycles
|
|
plotsize = const(50)
|
|
vplot = array('f', arr_gen(plotsize)) # Plot data
|
|
iplot = array('f', arr_gen(plotsize))
|
|
vscale = array('f', arr_gen(arraysize)) # Output data
|
|
iscale = array('f', arr_gen(arraysize))
|
|
def __init__(self, simulate, verbose):
|
|
self.avg_len = 4
|
|
self.avg_half = self.avg_len // 2
|
|
gc.collect()
|
|
self.simulate = simulate
|
|
self.verbose = verbose
|
|
self.overrange = False
|
|
self.threshold = 1997
|
|
|
|
def vprint(self, *args):
|
|
if self.verbose:
|
|
print(*args)
|
|
|
|
async def run(self):
|
|
self.overrange = False
|
|
sample(self.simulate)
|
|
return await self.analyse()
|
|
|
|
# Calculate average of avg_len + 1 numbers around a centre value. avg_len must be divisible by 2.
|
|
# This guarantees symmetry around the centre index.
|
|
def avg(self, arr, centre):
|
|
return sum(arr[centre - self.avg_half : centre + 1 + self.avg_half]) / (self.avg_len + 1)
|
|
|
|
# Filter a set of samples around a centre index in an array
|
|
def filt(self, arr, centre):
|
|
avg0 = self.avg(arr, centre - self.avg_half)
|
|
avg1 = self.avg(arr, centre + self.avg_half)
|
|
return avg0, avg1
|
|
|
|
async def analyse(self):
|
|
# Determine the first and second rising edge of voltage
|
|
self.overrange = False
|
|
nfirst = -1 # Index of 1st upward voltage transition
|
|
lastv = 0 # previous max
|
|
ovr = self.threshold # Overrange threshold
|
|
for n in range(self.avg_len, NSAMPLES - self.avg_len + 1):
|
|
vavg0, vavg1 = self.filt(vsamples, n)
|
|
iavg0, iavg1 = self.filt(isamples, n)
|
|
vmax = max(vavg0, vavg1)
|
|
vmin = min(vavg0, vavg1)
|
|
imax = max(iavg0, iavg1)
|
|
imin = min(iavg0, iavg1)
|
|
if vmax > ovr or vmin < -ovr or imax > ovr or imin < -ovr:
|
|
self.overrange = True
|
|
self.vprint('overrange', vmax, vmin, imax, imin)
|
|
if nfirst == -1:
|
|
if vavg0 < 0 and vavg1 > 0 and abs(vmin) < lastv:
|
|
nfirst = n if err > abs(abs(vmin) - lastv) else n - 1
|
|
irising = iavg0 < iavg1 # Save current rising state for phase calculation
|
|
elif n > nfirst + NSAMPLES // 6:
|
|
if vavg0 < 0 and vavg1 > 0 and abs(vmin) < lastv:
|
|
nsecond = n if err > abs(abs(vmin) - lastv) else n - 1
|
|
break
|
|
lastv = vmax
|
|
err = abs(abs(vmin) - lastv)
|
|
yield
|
|
else: # Should never occur because voltage should be present.
|
|
raise OSError('Failed to find a complete cycle.')
|
|
self.vprint(nfirst, nsecond, vsamples[nfirst], vsamples[nsecond], isamples[nfirst], isamples[nsecond])
|
|
|
|
# Produce datasets for a single cycle of V.
|
|
# Scale ADC FSD [-FSD 0 FSD] -> [-1.0 0 +1.0]
|
|
nelems = nsecond - nfirst + 1 # No. of samples in current cycle
|
|
p = 0
|
|
for n in range(nfirst, nsecond + 1):
|
|
self.vscale[p] = vsamples[n] / 2047
|
|
self.iscale[p] = isamples[n] / 2047
|
|
p += 1
|
|
|
|
# Remove DC offsets
|
|
sumv = 0.0
|
|
sumi = 0.0
|
|
for p in range(nelems):
|
|
sumv += self.vscale[p]
|
|
sumi += self.iscale[p]
|
|
meanv = sumv / nelems
|
|
meani = sumi / nelems
|
|
maxv = 0.0
|
|
maxi = 0.0
|
|
yield
|
|
for p in range(nelems):
|
|
self.vscale[p] -= meanv
|
|
self.iscale[p] -= meani
|
|
maxv = max(maxv, abs(self.vscale[p])) # Scaling for plot
|
|
maxi = max(maxi, abs(self.iscale[p]))
|
|
yield
|
|
# Produce plot datasets. vplot scaled to avoid exact overlay of iplot
|
|
maxv = max(maxv * 1.1, 0.01) # Cope with "no signal" conditions
|
|
maxi = max(maxi, 0.01)
|
|
offs = 0
|
|
delta = nelems / (self.plotsize -1)
|
|
for p in range(self.plotsize):
|
|
idx = min(round(offs), nelems -1)
|
|
self.vplot[p] = self.vscale[idx] / maxv
|
|
self.iplot[p] = self.iscale[idx] / maxi
|
|
offs += delta
|
|
|
|
if self.verbose:
|
|
for p in range(nelems):
|
|
print('{:7.3f} {:7.3f} {:7.3f} {:7.3f}'.format(
|
|
self.vscale[p], self.iscale[p],
|
|
self.vplot[round(p / delta)], self.iplot[round(p / delta)]))
|
|
|
|
phase = asin(self.iplot[0]) if irising else pi - asin(self.iplot[0])
|
|
yield
|
|
# calculate power, vrms, irms etc prior to scaling
|
|
us_vrms = 0
|
|
us_pwr = 0
|
|
us_irms = 0
|
|
for p in range(nelems):
|
|
us_vrms += self.vscale[p] * self.vscale[p] # More noise-immune than calcuating from vmax * sqrt(2)
|
|
us_irms += self.iscale[p] * self.iscale[p]
|
|
us_pwr += self.iscale[p] * self.vscale[p]
|
|
us_vrms = sqrt(us_vrms / nelems)
|
|
us_irms = sqrt(us_irms / nelems)
|
|
us_pwr /= nelems
|
|
return phase, us_vrms, us_irms, us_pwr, nelems
|
|
|
|
# Testing. Current provided to CT by 100ohm in series with secondary. Vsec = 7.8Vrms so i = 78mArms == 18W at 230Vrms.
|
|
# Calculated current scaling:
|
|
# FSD 3.3Vpp out = 1.167Vrms.
|
|
# Secondary current Isec = 1.167/180ohm = 6.5mArms.
|
|
# Ratio = 2000. Primary current = 12.96Arms.
|
|
# At FSD iscale is +-1 = 0.707rms
|
|
# Imeasured = us_irms * 12.96/(0.707 * self.pga_gain) = us_irms * 18.331 / self.pga_gain
|
|
|
|
# Voltage scaling. Measured ADC I/P = 1.8Vpp == 0.545fsd pp == 0.386fsd rms
|
|
# so vscale = 230/0.386 = 596
|
|
|
|
class Scaling():
|
|
# FS Watts to PGA gain
|
|
valid_gains = (3000, 1500, 600, 300, 150, 60, 30)
|
|
vscale = 679 # These were re-measured with the new build. Zero load.
|
|
iscale = 18.0 # Based on measurement with 2.5KW resistive load (kettle)
|
|
|
|
def __init__(self, simulate=False, verbose=False):
|
|
self.cb = None
|
|
self.preprocessor = Preprocessor(simulate, verbose)
|
|
self.pga = PGA()
|
|
self.set_range(3000)
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(self._run())
|
|
|
|
async def _run(self):
|
|
while True:
|
|
if self.cb is not None:
|
|
phase, us_vrms, us_irms, us_pwr, nelems = await self.preprocessor.run() # Get unscaled values. Takes 360ms.
|
|
if self.cb is not None: # May have changed during await
|
|
vrms = us_vrms * self.vscale
|
|
irms = us_irms * self.iscale / self.pga_gain
|
|
pwr = us_pwr * self.iscale * self.vscale / self.pga_gain
|
|
self.cb(phase, vrms, irms, pwr, nelems, self.preprocessor.overrange)
|
|
yield
|
|
|
|
def set_callback(self, cb=None):
|
|
self.cb = cb # Set to None to pause acquisition
|
|
|
|
def set_range(self, val):
|
|
if val in self.valid_gains:
|
|
self.pga_gain = 3000 // val
|
|
self.pga.gain(self.pga_gain)
|
|
else:
|
|
raise ValueError('Invalid range. Valid ranges (W) {}'.format(self.valid_gains))
|
|
|
|
@property
|
|
def vplot(self):
|
|
return self.preprocessor.vplot
|
|
|
|
@property
|
|
def iplot(self):
|
|
return self.preprocessor.iplot
|
|
|
|
def test():
|
|
def cb(phase, vrms, irms, pwr, nelems, ovr):
|
|
print('Phase {:5.1f}rad {:5.0f}Vrms {:6.3f}Arms {:6.1f}W Nsamples = {:3d}'.format(phase, vrms, irms, pwr, nelems))
|
|
if ovr:
|
|
print('Overrange')
|
|
s = Scaling(True, True)
|
|
s.set_range(300)
|
|
s.set_callback(cb)
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_forever()
|