kopia lustrzana https://github.com/peterhinch/micropython_eeprom
Run black over all Python files.
rodzic
79ef80fa25
commit
9d77347007
27
bdevice.py
27
bdevice.py
|
@ -10,12 +10,11 @@ from micropython import const
|
||||||
|
|
||||||
|
|
||||||
class BlockDevice:
|
class BlockDevice:
|
||||||
|
|
||||||
def __init__(self, nbits, nchips, chip_size):
|
def __init__(self, nbits, nchips, chip_size):
|
||||||
self._c_bytes = chip_size # Size of chip in bytes
|
self._c_bytes = chip_size # Size of chip in bytes
|
||||||
self._a_bytes = chip_size * nchips # Size of array
|
self._a_bytes = chip_size * nchips # Size of array
|
||||||
self._nbits = nbits # Block size in bits
|
self._nbits = nbits # Block size in bits
|
||||||
self._block_size = 2**nbits
|
self._block_size = 2 ** nbits
|
||||||
self._rwbuf = bytearray(1)
|
self._rwbuf = bytearray(1)
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
|
@ -35,7 +34,9 @@ class BlockDevice:
|
||||||
# Handle special cases of a slice. Always return a pair of positive indices.
|
# Handle special cases of a slice. Always return a pair of positive indices.
|
||||||
def _do_slice(self, addr):
|
def _do_slice(self, addr):
|
||||||
if not (addr.step is None or addr.step == 1):
|
if not (addr.step is None or addr.step == 1):
|
||||||
raise NotImplementedError('only slices with step=1 (aka None) are supported')
|
raise NotImplementedError(
|
||||||
|
"only slices with step=1 (aka None) are supported"
|
||||||
|
)
|
||||||
start = addr.start if addr.start is not None else 0
|
start = addr.start if addr.start is not None else 0
|
||||||
stop = addr.stop if addr.stop is not None else self._a_bytes
|
stop = addr.stop if addr.stop is not None else self._a_bytes
|
||||||
start = start if start >= 0 else self._a_bytes + start
|
start = start if start >= 0 else self._a_bytes + start
|
||||||
|
@ -48,9 +49,9 @@ class BlockDevice:
|
||||||
if len(value) == (stop - start):
|
if len(value) == (stop - start):
|
||||||
res = self.readwrite(start, value, False)
|
res = self.readwrite(start, value, False)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('Slice must have same length as data')
|
raise RuntimeError("Slice must have same length as data")
|
||||||
except TypeError:
|
except TypeError:
|
||||||
raise RuntimeError('Can only assign bytes/bytearray to a slice')
|
raise RuntimeError("Can only assign bytes/bytearray to a slice")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def _rslice(self, addr):
|
def _rslice(self, addr):
|
||||||
|
@ -80,18 +81,18 @@ class BlockDevice:
|
||||||
if op == 6: # Ignore ERASE because handled by driver.
|
if op == 6: # Ignore ERASE because handled by driver.
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
# Hardware agnostic base class for flash memory.
|
# Hardware agnostic base class for flash memory.
|
||||||
|
|
||||||
_RDBUFSIZE = const(32) # Size of read buffer for erasure test
|
_RDBUFSIZE = const(32) # Size of read buffer for erasure test
|
||||||
|
|
||||||
|
|
||||||
class FlashDevice(BlockDevice):
|
class FlashDevice(BlockDevice):
|
||||||
|
|
||||||
def __init__(self, nbits, nchips, chip_size, sec_size):
|
def __init__(self, nbits, nchips, chip_size, sec_size):
|
||||||
super().__init__(nbits, nchips, chip_size)
|
super().__init__(nbits, nchips, chip_size)
|
||||||
self.sec_size = sec_size
|
self.sec_size = sec_size
|
||||||
self._cache_mask = sec_size - 1 # For 4K sector size: 0xfff
|
self._cache_mask = sec_size - 1 # For 4K sector size: 0xfff
|
||||||
self._fmask = self._cache_mask ^ 0x3fffffff # 4K -> 0x3ffff000
|
self._fmask = self._cache_mask ^ 0x3FFFFFFF # 4K -> 0x3ffff000
|
||||||
self._buf = bytearray(_RDBUFSIZE)
|
self._buf = bytearray(_RDBUFSIZE)
|
||||||
self._mvbuf = memoryview(self._buf)
|
self._mvbuf = memoryview(self._buf)
|
||||||
self._cache = bytearray(sec_size) # Cache always contains one sector
|
self._cache = bytearray(sec_size) # Cache always contains one sector
|
||||||
|
@ -117,10 +118,12 @@ class FlashDevice(BlockDevice):
|
||||||
boff += nr
|
boff += nr
|
||||||
# addr now >= self._acache: read from cache.
|
# addr now >= self._acache: read from cache.
|
||||||
sa = addr - self._acache # Offset into cache
|
sa = addr - self._acache # Offset into cache
|
||||||
nr = min(nbytes, self._acache + self.sec_size - addr) # No of bytes to read from cache
|
nr = min(
|
||||||
|
nbytes, self._acache + self.sec_size - addr
|
||||||
|
) # No of bytes to read from cache
|
||||||
mvb[boff : boff + nr] = self._mvd[sa : sa + nr]
|
mvb[boff : boff + nr] = self._mvd[sa : sa + nr]
|
||||||
if nbytes - nr: # Get any remaining data from chip
|
if nbytes - nr: # Get any remaining data from chip
|
||||||
self.rdchip(addr + nr, mvb[boff + nr : ])
|
self.rdchip(addr + nr, mvb[boff + nr :])
|
||||||
return mvb
|
return mvb
|
||||||
|
|
||||||
def sync(self):
|
def sync(self):
|
||||||
|
@ -129,8 +132,8 @@ class FlashDevice(BlockDevice):
|
||||||
self._dirty = False
|
self._dirty = False
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
# Performance enhancement: if cache intersects address range, update it first.
|
# Performance enhancement: if cache intersects address range, update it first.
|
||||||
# Currently in this case it would be written twice. This may be rare.
|
# Currently in this case it would be written twice. This may be rare.
|
||||||
def write(self, addr, mvb):
|
def write(self, addr, mvb):
|
||||||
nbytes = len(mvb)
|
nbytes = len(mvb)
|
||||||
acache = self._acache
|
acache = self._acache
|
||||||
|
@ -160,7 +163,7 @@ class FlashDevice(BlockDevice):
|
||||||
self._fill_cache(0)
|
self._fill_cache(0)
|
||||||
|
|
||||||
# Return True if a sector is erased.
|
# Return True if a sector is erased.
|
||||||
def is_empty(self, addr, ev=0xff):
|
def is_empty(self, addr, ev=0xFF):
|
||||||
mvb = self._mvbuf
|
mvb = self._mvbuf
|
||||||
erased = True
|
erased = True
|
||||||
nbufs = self.sec_size // _RDBUFSIZE # Read buffers per sector
|
nbufs = self.sec_size // _RDBUFSIZE # Read buffers per sector
|
||||||
|
|
|
@ -11,51 +11,54 @@ from eeprom_i2c import EEPROM, T24C512
|
||||||
# Return an EEPROM array. Adapt for platforms other than Pyboard or chips
|
# Return an EEPROM array. Adapt for platforms other than Pyboard or chips
|
||||||
# smaller than 64KiB.
|
# smaller than 64KiB.
|
||||||
def get_eep():
|
def get_eep():
|
||||||
if uos.uname().machine.split(' ')[0][:4] == 'PYBD':
|
if uos.uname().machine.split(" ")[0][:4] == "PYBD":
|
||||||
Pin.board.EN_3V3.value(1)
|
Pin.board.EN_3V3.value(1)
|
||||||
time.sleep(0.1) # Allow decouplers to charge
|
time.sleep(0.1) # Allow decouplers to charge
|
||||||
eep = EEPROM(I2C(2), T24C512)
|
eep = EEPROM(I2C(2), T24C512)
|
||||||
print('Instantiated EEPROM')
|
print("Instantiated EEPROM")
|
||||||
return eep
|
return eep
|
||||||
|
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /eeprom/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /eeprom/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
eep = get_eep()
|
eep = get_eep()
|
||||||
|
@ -64,71 +67,76 @@ def test():
|
||||||
eep[sa + v] = v
|
eep[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if eep[sa + v] != v:
|
if eep[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, eep[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(sa + v, eep[sa + v], v)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = uos.urandom(30)
|
data = uos.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
eep[sa:sa + 30] = data
|
eep[sa : sa + 30] = data
|
||||||
if eep[sa:sa + 30] == data:
|
if eep[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
|
|
||||||
block = 256
|
block = 256
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test block boundary {} passed'.format(block))
|
print("Test block boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test block boundary {} fail'.format(block))
|
print("Test block boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
block = eep._c_bytes
|
block = eep._c_bytes
|
||||||
if eep._a_bytes > block:
|
if eep._a_bytes > block:
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest(format=False):
|
def fstest(format=False):
|
||||||
eep = get_eep()
|
eep = get_eep()
|
||||||
try:
|
try:
|
||||||
uos.umount('/eeprom')
|
uos.umount("/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
# ***** CODE FOR FATFS *****
|
# ***** CODE FOR FATFS *****
|
||||||
#if format:
|
# if format:
|
||||||
#os.VfsFat.mkfs(eep)
|
# os.VfsFat.mkfs(eep)
|
||||||
# ***** CODE FOR LITTLEFS *****
|
# ***** CODE FOR LITTLEFS *****
|
||||||
if format:
|
if format:
|
||||||
uos.VfsLfs2.mkfs(eep)
|
uos.VfsLfs2.mkfs(eep)
|
||||||
# General
|
# General
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/eeprom')
|
uos.mount(eep, "/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
raise OSError("Can't mount device: have you formatted it?")
|
raise OSError("Can't mount device: have you formatted it?")
|
||||||
print('Contents of "/": {}'.format(uos.listdir('/')))
|
print('Contents of "/": {}'.format(uos.listdir("/")))
|
||||||
print('Contents of "/eeprom": {}'.format(uos.listdir('/eeprom')))
|
print('Contents of "/eeprom": {}'.format(uos.listdir("/eeprom")))
|
||||||
print(uos.statvfs('/eeprom'))
|
print(uos.statvfs("/eeprom"))
|
||||||
|
|
||||||
|
|
||||||
def cptest(): # Assumes pre-existing filesystem of either type
|
def cptest(): # Assumes pre-existing filesystem of either type
|
||||||
eep = get_eep()
|
eep = get_eep()
|
||||||
if 'eeprom' in uos.listdir('/'):
|
if "eeprom" in uos.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/eeprom')
|
uos.mount(eep, "/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
print('Fail mounting device. Have you formatted it?')
|
print("Fail mounting device. Have you formatted it?")
|
||||||
return
|
return
|
||||||
print('Mounted device.')
|
print("Mounted device.")
|
||||||
cp('eep_i2c.py', '/eeprom/')
|
cp("eep_i2c.py", "/eeprom/")
|
||||||
cp('eeprom_i2c.py', '/eeprom/')
|
cp("eeprom_i2c.py", "/eeprom/")
|
||||||
print('Contents of "/eeprom": {}'.format(uos.listdir('/eeprom')))
|
print('Contents of "/eeprom": {}'.format(uos.listdir("/eeprom")))
|
||||||
print(uos.statvfs('/eeprom'))
|
print(uos.statvfs("/eeprom"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
def full_test():
|
def full_test():
|
||||||
|
@ -136,9 +144,9 @@ def full_test():
|
||||||
page = 0
|
page = 0
|
||||||
for sa in range(0, len(eep), 128):
|
for sa in range(0, len(eep), 128):
|
||||||
data = uos.urandom(128)
|
data = uos.urandom(128)
|
||||||
eep[sa:sa + 128] = data
|
eep[sa : sa + 128] = data
|
||||||
if eep[sa:sa + 128] == data:
|
if eep[sa : sa + 128] == data:
|
||||||
print('Page {} passed'.format(page))
|
print("Page {} passed".format(page))
|
||||||
else:
|
else:
|
||||||
print('Page {} readback failed.'.format(page))
|
print("Page {} readback failed.".format(page))
|
||||||
page += 1
|
page += 1
|
||||||
|
|
|
@ -17,11 +17,10 @@ T24C64 = const(8192) # 8KiB 64Kbits
|
||||||
# Logical EEPROM device consists of 1-8 physical chips. Chips must all be the
|
# Logical EEPROM device consists of 1-8 physical chips. Chips must all be the
|
||||||
# same size, and must have contiguous addresses starting from 0x50.
|
# same size, and must have contiguous addresses starting from 0x50.
|
||||||
class EEPROM(BlockDevice):
|
class EEPROM(BlockDevice):
|
||||||
|
|
||||||
def __init__(self, i2c, chip_size=T24C512, verbose=True, block_size=9):
|
def __init__(self, i2c, chip_size=T24C512, verbose=True, block_size=9):
|
||||||
self._i2c = i2c
|
self._i2c = i2c
|
||||||
if chip_size not in (T24C64, T24C128, T24C256, T24C512):
|
if chip_size not in (T24C64, T24C128, T24C256, T24C512):
|
||||||
print('Warning: possible unsupported chip. Size:', chip_size)
|
print("Warning: possible unsupported chip. Size:", chip_size)
|
||||||
nchips = self.scan(verbose, chip_size) # No. of EEPROM chips
|
nchips = self.scan(verbose, chip_size) # No. of EEPROM chips
|
||||||
super().__init__(block_size, nchips, chip_size)
|
super().__init__(block_size, nchips, chip_size)
|
||||||
self._i2c_addr = 0 # I2C address of current chip
|
self._i2c_addr = 0 # I2C address of current chip
|
||||||
|
@ -34,11 +33,11 @@ class EEPROM(BlockDevice):
|
||||||
eeproms = [d for d in devices if _ADDR <= d < _ADDR + 8] # EEPROM chips
|
eeproms = [d for d in devices if _ADDR <= d < _ADDR + 8] # EEPROM chips
|
||||||
nchips = len(eeproms)
|
nchips = len(eeproms)
|
||||||
if nchips == 0:
|
if nchips == 0:
|
||||||
raise RuntimeError('EEPROM not found.')
|
raise RuntimeError("EEPROM not found.")
|
||||||
if min(eeproms) != _ADDR or (max(eeproms) - _ADDR) >= nchips:
|
if min(eeproms) != _ADDR or (max(eeproms) - _ADDR) >= nchips:
|
||||||
raise RuntimeError('Non-contiguous chip addresses', eeproms)
|
raise RuntimeError("Non-contiguous chip addresses", eeproms)
|
||||||
if verbose:
|
if verbose:
|
||||||
s = '{} chips detected. Total EEPROM size {}bytes.'
|
s = "{} chips detected. Total EEPROM size {}bytes."
|
||||||
print(s.format(nchips, chip_size * nchips))
|
print(s.format(nchips, chip_size * nchips))
|
||||||
return nchips
|
return nchips
|
||||||
|
|
||||||
|
@ -59,10 +58,10 @@ class EEPROM(BlockDevice):
|
||||||
if addr >= self._a_bytes:
|
if addr >= self._a_bytes:
|
||||||
raise RuntimeError("EEPROM Address is out of range")
|
raise RuntimeError("EEPROM Address is out of range")
|
||||||
ca, la = divmod(addr, self._c_bytes) # ca == chip no, la == offset into chip
|
ca, la = divmod(addr, self._c_bytes) # ca == chip no, la == offset into chip
|
||||||
self._addrbuf[0] = (la >> 8) & 0xff
|
self._addrbuf[0] = (la >> 8) & 0xFF
|
||||||
self._addrbuf[1] = la & 0xff
|
self._addrbuf[1] = la & 0xFF
|
||||||
self._i2c_addr = _ADDR + ca
|
self._i2c_addr = _ADDR + ca
|
||||||
pe = (addr & ~0x7f) + 0x80 # byte 0 of next page
|
pe = (addr & ~0x7F) + 0x80 # byte 0 of next page
|
||||||
return min(nbytes, pe - la)
|
return min(nbytes, pe - la)
|
||||||
|
|
||||||
# Read or write multiple bytes at an arbitrary address
|
# Read or write multiple bytes at an arbitrary address
|
||||||
|
@ -77,7 +76,9 @@ class EEPROM(BlockDevice):
|
||||||
self._i2c.writeto(self._i2c_addr, self._addrbuf)
|
self._i2c.writeto(self._i2c_addr, self._addrbuf)
|
||||||
self._i2c.readfrom_into(self._i2c_addr, mvb[start : start + npage])
|
self._i2c.readfrom_into(self._i2c_addr, mvb[start : start + npage])
|
||||||
else:
|
else:
|
||||||
self._i2c.writevto(self._i2c_addr, (self._addrbuf, buf[start: start + npage]))
|
self._i2c.writevto(
|
||||||
|
self._i2c_addr, (self._addrbuf, buf[start : start + npage])
|
||||||
|
)
|
||||||
self._wait_rdy()
|
self._wait_rdy()
|
||||||
nbytes -= npage
|
nbytes -= npage
|
||||||
start += npage
|
start += npage
|
||||||
|
|
|
@ -7,53 +7,56 @@ import uos
|
||||||
from machine import I2C, Pin
|
from machine import I2C, Pin
|
||||||
from eeprom_i2c import EEPROM, T24C512
|
from eeprom_i2c import EEPROM, T24C512
|
||||||
|
|
||||||
i2c=I2C(-1, scl=Pin(13, Pin.OPEN_DRAIN), sda=Pin(12, Pin.OPEN_DRAIN))
|
i2c = I2C(-1, scl=Pin(13, Pin.OPEN_DRAIN), sda=Pin(12, Pin.OPEN_DRAIN))
|
||||||
|
|
||||||
# Return an EEPROM array. Adapt for platforms other than Pyboard or chips
|
# Return an EEPROM array. Adapt for platforms other than Pyboard or chips
|
||||||
# smaller than 64KiB.
|
# smaller than 64KiB.
|
||||||
def get_eep():
|
def get_eep():
|
||||||
eep = EEPROM(i2c, T24C512)
|
eep = EEPROM(i2c, T24C512)
|
||||||
print('Instantiated EEPROM')
|
print("Instantiated EEPROM")
|
||||||
return eep
|
return eep
|
||||||
|
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /eeprom/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /eeprom/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
eep = get_eep()
|
eep = get_eep()
|
||||||
|
@ -62,33 +65,36 @@ def test():
|
||||||
eep[sa + v] = v
|
eep[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if eep[sa + v] != v:
|
if eep[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, eep[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(sa + v, eep[sa + v], v)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = uos.urandom(30)
|
data = uos.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
eep[sa:sa + 30] = data
|
eep[sa : sa + 30] = data
|
||||||
if eep[sa:sa + 30] == data:
|
if eep[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
|
|
||||||
block = 256
|
block = 256
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test block boundary {} passed'.format(block))
|
print("Test block boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test block boundary {} fail'.format(block))
|
print("Test block boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
block = eep._c_bytes
|
block = eep._c_bytes
|
||||||
if eep._a_bytes > block:
|
if eep._a_bytes > block:
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest(format=False):
|
def fstest(format=False):
|
||||||
|
@ -97,28 +103,30 @@ def fstest(format=False):
|
||||||
if format:
|
if format:
|
||||||
uos.VfsLfs2.mkfs(eep)
|
uos.VfsLfs2.mkfs(eep)
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/eeprom')
|
uos.mount(eep, "/eeprom")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
print('Contents of "/": {}'.format(uos.listdir('/')))
|
print('Contents of "/": {}'.format(uos.listdir("/")))
|
||||||
print('Contents of "/eeprom": {}'.format(uos.listdir('/eeprom')))
|
print('Contents of "/eeprom": {}'.format(uos.listdir("/eeprom")))
|
||||||
print(uos.statvfs('/eeprom'))
|
print(uos.statvfs("/eeprom"))
|
||||||
|
|
||||||
|
|
||||||
def cptest():
|
def cptest():
|
||||||
eep = get_eep()
|
eep = get_eep()
|
||||||
if 'eeprom' in uos.listdir('/'):
|
if "eeprom" in uos.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/eeprom')
|
uos.mount(eep, "/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
print('Fail mounting device. Have you formatted it?')
|
print("Fail mounting device. Have you formatted it?")
|
||||||
return
|
return
|
||||||
print('Mounted device.')
|
print("Mounted device.")
|
||||||
cp('eep_i2c.py', '/eeprom/')
|
cp("eep_i2c.py", "/eeprom/")
|
||||||
cp('eeprom_i2c.py', '/eeprom/')
|
cp("eeprom_i2c.py", "/eeprom/")
|
||||||
print('Contents of "/eeprom": {}'.format(uos.listdir('/eeprom')))
|
print('Contents of "/eeprom": {}'.format(uos.listdir("/eeprom")))
|
||||||
print(uos.statvfs('/eeprom'))
|
print(uos.statvfs("/eeprom"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
def full_test():
|
def full_test():
|
||||||
|
@ -126,9 +134,9 @@ def full_test():
|
||||||
page = 0
|
page = 0
|
||||||
for sa in range(0, len(eep), 128):
|
for sa in range(0, len(eep), 128):
|
||||||
data = uos.urandom(128)
|
data = uos.urandom(128)
|
||||||
eep[sa:sa + 128] = data
|
eep[sa : sa + 128] = data
|
||||||
if eep[sa:sa + 128] == data:
|
if eep[sa : sa + 128] == data:
|
||||||
print('Page {} passed'.format(page))
|
print("Page {} passed".format(page))
|
||||||
else:
|
else:
|
||||||
print('Page {} readback failed.'.format(page))
|
print("Page {} readback failed.".format(page))
|
||||||
page += 1
|
page += 1
|
||||||
|
|
|
@ -7,59 +7,63 @@ import uos
|
||||||
import time
|
import time
|
||||||
from machine import SPI, Pin
|
from machine import SPI, Pin
|
||||||
from eeprom_spi import EEPROM
|
from eeprom_spi import EEPROM
|
||||||
|
|
||||||
# Add extra pins if using multiple chips
|
# Add extra pins if using multiple chips
|
||||||
cspins = (Pin(Pin.board.Y5, Pin.OUT, value=1), Pin(Pin.board.Y4, Pin.OUT, value=1))
|
cspins = (Pin(Pin.board.Y5, Pin.OUT, value=1), Pin(Pin.board.Y4, Pin.OUT, value=1))
|
||||||
|
|
||||||
# Return an EEPROM array. Adapt for platforms other than Pyboard.
|
# Return an EEPROM array. Adapt for platforms other than Pyboard.
|
||||||
def get_eep(stm):
|
def get_eep(stm):
|
||||||
if uos.uname().machine.split(' ')[0][:4] == 'PYBD':
|
if uos.uname().machine.split(" ")[0][:4] == "PYBD":
|
||||||
Pin.board.EN_3V3.value(1)
|
Pin.board.EN_3V3.value(1)
|
||||||
time.sleep(0.1) # Allow decouplers to charge
|
time.sleep(0.1) # Allow decouplers to charge
|
||||||
if stm:
|
if stm:
|
||||||
eep = EEPROM(SPI(2, baudrate=5_000_000), cspins, 256)
|
eep = EEPROM(SPI(2, baudrate=5_000_000), cspins, 256)
|
||||||
else:
|
else:
|
||||||
eep = EEPROM(SPI(2, baudrate=20_000_000), cspins)
|
eep = EEPROM(SPI(2, baudrate=20_000_000), cspins)
|
||||||
print('Instantiated EEPROM')
|
print("Instantiated EEPROM")
|
||||||
return eep
|
return eep
|
||||||
|
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /eeprom/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /eeprom/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test(stm=False):
|
def test(stm=False):
|
||||||
eep = get_eep(stm)
|
eep = get_eep(stm)
|
||||||
|
@ -68,71 +72,75 @@ def test(stm=False):
|
||||||
eep[sa + v] = v
|
eep[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if eep[sa + v] != v:
|
if eep[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, eep[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(sa + v, eep[sa + v], v)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = uos.urandom(30)
|
data = uos.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
eep[sa:sa + 30] = data
|
eep[sa : sa + 30] = data
|
||||||
if eep[sa:sa + 30] == data:
|
if eep[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
|
|
||||||
block = 256
|
block = 256
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test block boundary {} passed'.format(block))
|
print("Test block boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test block boundary {} fail'.format(block))
|
print("Test block boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
block = eep._c_bytes
|
block = eep._c_bytes
|
||||||
if eep._a_bytes > block:
|
if eep._a_bytes > block:
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest(format=False, stm=False):
|
def fstest(format=False, stm=False):
|
||||||
eep = get_eep(stm)
|
eep = get_eep(stm)
|
||||||
try:
|
try:
|
||||||
uos.umount('/eeprom')
|
uos.umount("/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
# ***** CODE FOR FATFS *****
|
# ***** CODE FOR FATFS *****
|
||||||
#if format:
|
# if format:
|
||||||
#os.VfsFat.mkfs(eep)
|
# os.VfsFat.mkfs(eep)
|
||||||
# ***** CODE FOR LITTLEFS *****
|
# ***** CODE FOR LITTLEFS *****
|
||||||
if format:
|
if format:
|
||||||
uos.VfsLfs2.mkfs(eep)
|
uos.VfsLfs2.mkfs(eep)
|
||||||
# General
|
# General
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/eeprom')
|
uos.mount(eep, "/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
raise OSError("Can't mount device: have you formatted it?")
|
raise OSError("Can't mount device: have you formatted it?")
|
||||||
print('Contents of "/": {}'.format(uos.listdir('/')))
|
print('Contents of "/": {}'.format(uos.listdir("/")))
|
||||||
print('Contents of "/eeprom": {}'.format(uos.listdir('/eeprom')))
|
print('Contents of "/eeprom": {}'.format(uos.listdir("/eeprom")))
|
||||||
print(uos.statvfs('/eeprom'))
|
print(uos.statvfs("/eeprom"))
|
||||||
|
|
||||||
|
|
||||||
def cptest(stm=False): # Assumes pre-existing filesystem of either type
|
def cptest(stm=False): # Assumes pre-existing filesystem of either type
|
||||||
eep = get_eep(stm)
|
eep = get_eep(stm)
|
||||||
if 'eeprom' in uos.listdir('/'):
|
if "eeprom" in uos.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/eeprom')
|
uos.mount(eep, "/eeprom")
|
||||||
except OSError:
|
except OSError:
|
||||||
print('Fail mounting device. Have you formatted it?')
|
print("Fail mounting device. Have you formatted it?")
|
||||||
return
|
return
|
||||||
print('Mounted device.')
|
print("Mounted device.")
|
||||||
cp('eep_spi.py', '/eeprom/')
|
cp("eep_spi.py", "/eeprom/")
|
||||||
cp('eeprom_spi.py', '/eeprom/')
|
cp("eeprom_spi.py", "/eeprom/")
|
||||||
print('Contents of "/eeprom": {}'.format(uos.listdir('/eeprom')))
|
print('Contents of "/eeprom": {}'.format(uos.listdir("/eeprom")))
|
||||||
print(uos.statvfs('/eeprom'))
|
print(uos.statvfs("/eeprom"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
|
@ -141,15 +149,16 @@ def full_test(stm=False):
|
||||||
page = 0
|
page = 0
|
||||||
for sa in range(0, len(eep), 256):
|
for sa in range(0, len(eep), 256):
|
||||||
data = uos.urandom(256)
|
data = uos.urandom(256)
|
||||||
eep[sa:sa + 256] = data
|
eep[sa : sa + 256] = data
|
||||||
got = eep[sa:sa + 256]
|
got = eep[sa : sa + 256]
|
||||||
if got == data:
|
if got == data:
|
||||||
print('Page {} passed'.format(page))
|
print("Page {} passed".format(page))
|
||||||
else:
|
else:
|
||||||
print('Page {} readback failed.'.format(page))
|
print("Page {} readback failed.".format(page))
|
||||||
break
|
break
|
||||||
page += 1
|
page += 1
|
||||||
|
|
||||||
|
|
||||||
test_str = """Available tests (see SPI.md):
|
test_str = """Available tests (see SPI.md):
|
||||||
test(stm=False) Basic hardware test.
|
test(stm=False) Basic hardware test.
|
||||||
full_test(stm=False) Thorough hardware test.
|
full_test(stm=False) Thorough hardware test.
|
||||||
|
|
|
@ -14,8 +14,8 @@ _WRITE = const(2)
|
||||||
_WREN = const(6) # Write enable
|
_WREN = const(6) # Write enable
|
||||||
_RDSR = const(5) # Read status register
|
_RDSR = const(5) # Read status register
|
||||||
# Microchip only:
|
# Microchip only:
|
||||||
_RDID = const(0xab) # Read chip ID
|
_RDID = const(0xAB) # Read chip ID
|
||||||
_CE = const(0xc7) # Chip erase
|
_CE = const(0xC7) # Chip erase
|
||||||
# STM only:
|
# STM only:
|
||||||
_RDID_STM = const(0x83) # Read ID page
|
_RDID_STM = const(0x83) # Read ID page
|
||||||
_WRID_STM = const(0x82)
|
_WRID_STM = const(0x82)
|
||||||
|
@ -26,11 +26,10 @@ _STM_ID = const(0x30) # Arbitrary ID for STM chip
|
||||||
|
|
||||||
# Logical EEPROM device comprising one or more physical chips sharing an SPI bus.
|
# Logical EEPROM device comprising one or more physical chips sharing an SPI bus.
|
||||||
class EEPROM(BlockDevice):
|
class EEPROM(BlockDevice):
|
||||||
|
|
||||||
def __init__(self, spi, cspins, size=128, verbose=True, block_size=9):
|
def __init__(self, spi, cspins, size=128, verbose=True, block_size=9):
|
||||||
# args: virtual block size in bits, no. of chips, bytes in each chip
|
# args: virtual block size in bits, no. of chips, bytes in each chip
|
||||||
if size not in (64, 128, 256):
|
if size not in (64, 128, 256):
|
||||||
print('Warning: possible unsupported chip. Size:', size)
|
print("Warning: possible unsupported chip. Size:", size)
|
||||||
super().__init__(block_size, len(cspins), size * 1024)
|
super().__init__(block_size, len(cspins), size * 1024)
|
||||||
self._stm = size == 256
|
self._stm = size == 256
|
||||||
self._spi = spi
|
self._spi = spi
|
||||||
|
@ -44,7 +43,7 @@ class EEPROM(BlockDevice):
|
||||||
def _stm_rdid(self, n):
|
def _stm_rdid(self, n):
|
||||||
cs = self._cspins[n]
|
cs = self._cspins[n]
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
mvp[:] = b'\0\0\0\0\0'
|
mvp[:] = b"\0\0\0\0\0"
|
||||||
mvp[0] = _RDID_STM
|
mvp[0] = _RDID_STM
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write_readinto(mvp, mvp)
|
self._spi.write_readinto(mvp, mvp)
|
||||||
|
@ -59,7 +58,7 @@ class EEPROM(BlockDevice):
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:1]) # Enable write
|
self._spi.write(mvp[:1]) # Enable write
|
||||||
cs(1)
|
cs(1)
|
||||||
mvp[:] = b'\0\0\0\0\0'
|
mvp[:] = b"\0\0\0\0\0"
|
||||||
mvp[0] = _WRID_STM
|
mvp[0] = _WRID_STM
|
||||||
mvp[4] = _STM_ID
|
mvp[4] = _STM_ID
|
||||||
cs(0)
|
cs(0)
|
||||||
|
@ -74,32 +73,32 @@ class EEPROM(BlockDevice):
|
||||||
if self._stm_rdid(n) != _STM_ID:
|
if self._stm_rdid(n) != _STM_ID:
|
||||||
self._stm_wrid(n)
|
self._stm_wrid(n)
|
||||||
if self._stm_rdid(n) != _STM_ID:
|
if self._stm_rdid(n) != _STM_ID:
|
||||||
raise RuntimeError('M95M02 chip not found at cs[{}].'.format(n))
|
raise RuntimeError("M95M02 chip not found at cs[{}].".format(n))
|
||||||
return n
|
return n
|
||||||
|
|
||||||
# Scan for Microchip devices: read manf ID
|
# Scan for Microchip devices: read manf ID
|
||||||
def _mc_scan(self):
|
def _mc_scan(self):
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
for n, cs in enumerate(self._cspins):
|
for n, cs in enumerate(self._cspins):
|
||||||
mvp[:] = b'\0\0\0\0\0'
|
mvp[:] = b"\0\0\0\0\0"
|
||||||
mvp[0] = _RDID
|
mvp[0] = _RDID
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write_readinto(mvp, mvp)
|
self._spi.write_readinto(mvp, mvp)
|
||||||
cs(1)
|
cs(1)
|
||||||
if mvp[4] != 0x29:
|
if mvp[4] != 0x29:
|
||||||
raise RuntimeError('25xx1024 chip not found at cs[{}].'.format(n))
|
raise RuntimeError("25xx1024 chip not found at cs[{}].".format(n))
|
||||||
return n
|
return n
|
||||||
|
|
||||||
# Check for a valid hardware configuration
|
# Check for a valid hardware configuration
|
||||||
def scan(self, verbose):
|
def scan(self, verbose):
|
||||||
n = self._stm_scan() if self._stm else self._mc_scan()
|
n = self._stm_scan() if self._stm else self._mc_scan()
|
||||||
if verbose:
|
if verbose:
|
||||||
s = '{} chips detected. Total EEPROM size {}bytes.'
|
s = "{} chips detected. Total EEPROM size {}bytes."
|
||||||
print(s.format(n + 1, self._a_bytes))
|
print(s.format(n + 1, self._a_bytes))
|
||||||
|
|
||||||
def erase(self):
|
def erase(self):
|
||||||
if self._stm:
|
if self._stm:
|
||||||
raise RuntimeError('Erase not available on STM chip')
|
raise RuntimeError("Erase not available on STM chip")
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
for cs in self._cspins: # For each chip
|
for cs in self._cspins: # For each chip
|
||||||
mvp[0] = _WREN
|
mvp[0] = _WREN
|
||||||
|
@ -133,9 +132,9 @@ class EEPROM(BlockDevice):
|
||||||
self._ccs = self._cspins[ca] # Current chip select
|
self._ccs = self._cspins[ca] # Current chip select
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
mvp[1] = la >> 16
|
mvp[1] = la >> 16
|
||||||
mvp[2] = (la >> 8) & 0xff
|
mvp[2] = (la >> 8) & 0xFF
|
||||||
mvp[3] = la & 0xff
|
mvp[3] = la & 0xFF
|
||||||
pe = (addr & ~0xff) + 0x100 # byte 0 of next page
|
pe = (addr & ~0xFF) + 0x100 # byte 0 of next page
|
||||||
return min(nbytes, pe - la)
|
return min(nbytes, pe - la)
|
||||||
|
|
||||||
# Read or write multiple bytes at an arbitrary address
|
# Read or write multiple bytes at an arbitrary address
|
||||||
|
@ -162,7 +161,7 @@ class EEPROM(BlockDevice):
|
||||||
mvp[0] = _WRITE
|
mvp[0] = _WRITE
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:4])
|
self._spi.write(mvp[:4])
|
||||||
self._spi.write(mvb[start: start + npage])
|
self._spi.write(mvb[start : start + npage])
|
||||||
cs(1) # Trigger write start
|
cs(1) # Trigger write start
|
||||||
self._wait_rdy() # Wait until done (6ms max)
|
self._wait_rdy() # Wait until done (6ms max)
|
||||||
nbytes -= npage
|
nbytes -= npage
|
||||||
|
|
|
@ -12,21 +12,28 @@ from bdevice import FlashDevice
|
||||||
_READ = const(0)
|
_READ = const(0)
|
||||||
_PP = const(1)
|
_PP = const(1)
|
||||||
_SE = const(2)
|
_SE = const(2)
|
||||||
_CMDS3BA = b'\x03\x02\x20'
|
_CMDS3BA = b"\x03\x02\x20"
|
||||||
_CMDS4BA = b'\x13\x12\x21'
|
_CMDS4BA = b"\x13\x12\x21"
|
||||||
# No address
|
# No address
|
||||||
_WREN = const(6) # Write enable
|
_WREN = const(6) # Write enable
|
||||||
_RDSR1 = const(5) # Read status register 1
|
_RDSR1 = const(5) # Read status register 1
|
||||||
_RDID = const(0x9f) # Read manufacturer ID
|
_RDID = const(0x9F) # Read manufacturer ID
|
||||||
_CE = const(0xc7) # Chip erase (takes minutes)
|
_CE = const(0xC7) # Chip erase (takes minutes)
|
||||||
|
|
||||||
_SEC_SIZE = const(4096) # Flash sector size 0x1000
|
_SEC_SIZE = const(4096) # Flash sector size 0x1000
|
||||||
|
|
||||||
# Logical Flash device comprising one or more physical chips sharing an SPI bus.
|
# Logical Flash device comprising one or more physical chips sharing an SPI bus.
|
||||||
class FLASH(FlashDevice):
|
class FLASH(FlashDevice):
|
||||||
|
def __init__(
|
||||||
def __init__(self, spi, cspins, size=None, verbose=True,
|
self,
|
||||||
sec_size=_SEC_SIZE, block_size=9, cmd5=None):
|
spi,
|
||||||
|
cspins,
|
||||||
|
size=None,
|
||||||
|
verbose=True,
|
||||||
|
sec_size=_SEC_SIZE,
|
||||||
|
block_size=9,
|
||||||
|
cmd5=None,
|
||||||
|
):
|
||||||
self._spi = spi
|
self._spi = spi
|
||||||
self._cspins = cspins
|
self._cspins = cspins
|
||||||
self._ccs = None # Chip select Pin object for current chip
|
self._ccs = None # Chip select Pin object for current chip
|
||||||
|
@ -57,7 +64,7 @@ class FLASH(FlashDevice):
|
||||||
def scan(self, verbose, size):
|
def scan(self, verbose, size):
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
for n, cs in enumerate(self._cspins):
|
for n, cs in enumerate(self._cspins):
|
||||||
mvp[:] = b'\0\0\0\0\0\0'
|
mvp[:] = b"\0\0\0\0\0\0"
|
||||||
mvp[0] = _RDID
|
mvp[0] = _RDID
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write_readinto(mvp[:4], mvp[:4])
|
self._spi.write_readinto(mvp[:4], mvp[:4])
|
||||||
|
@ -66,9 +73,13 @@ class FLASH(FlashDevice):
|
||||||
if size is None:
|
if size is None:
|
||||||
size = scansize # Save size of 1st chip
|
size = scansize # Save size of 1st chip
|
||||||
if size != scansize: # Mismatch passed size or 1st chip.
|
if size != scansize: # Mismatch passed size or 1st chip.
|
||||||
raise ValueError('Flash size mismatch: expected {}KiB, found {}KiB'.format(size, scansize))
|
raise ValueError(
|
||||||
|
"Flash size mismatch: expected {}KiB, found {}KiB".format(
|
||||||
|
size, scansize
|
||||||
|
)
|
||||||
|
)
|
||||||
if verbose:
|
if verbose:
|
||||||
s = '{} chips detected. Total flash size {}MiB.'
|
s = "{} chips detected. Total flash size {}MiB."
|
||||||
n += 1
|
n += 1
|
||||||
print(s.format(n, (n * size) // 1024))
|
print(s.format(n, (n * size) // 1024))
|
||||||
return size
|
return size
|
||||||
|
@ -105,7 +116,7 @@ class FLASH(FlashDevice):
|
||||||
cs(1)
|
cs(1)
|
||||||
mvp[0] = self._cmds[_PP]
|
mvp[0] = self._cmds[_PP]
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:self._cmdlen]) # Start write
|
self._spi.write(mvp[: self._cmdlen]) # Start write
|
||||||
self._spi.write(cache[start : start + ps])
|
self._spi.write(cache[start : start + ps])
|
||||||
cs(1)
|
cs(1)
|
||||||
self._wait_rdy() # Wait for write to complete
|
self._wait_rdy() # Wait for write to complete
|
||||||
|
@ -123,7 +134,7 @@ class FLASH(FlashDevice):
|
||||||
cs = self._ccs
|
cs = self._ccs
|
||||||
mvp[0] = self._cmds[_READ]
|
mvp[0] = self._cmds[_READ]
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:self._cmdlen])
|
self._spi.write(mvp[: self._cmdlen])
|
||||||
self._spi.readinto(mvb[start : start + npage])
|
self._spi.readinto(mvb[start : start + npage])
|
||||||
cs(1)
|
cs(1)
|
||||||
nbytes -= npage
|
nbytes -= npage
|
||||||
|
@ -161,9 +172,9 @@ class FLASH(FlashDevice):
|
||||||
mvp = self._mvp[:cmdlen]
|
mvp = self._mvp[:cmdlen]
|
||||||
if cmdlen > 3:
|
if cmdlen > 3:
|
||||||
mvp[-4] = la >> 24
|
mvp[-4] = la >> 24
|
||||||
mvp[-3] = la >> 16 & 0xff
|
mvp[-3] = la >> 16 & 0xFF
|
||||||
mvp[-2] = (la >> 8) & 0xff
|
mvp[-2] = (la >> 8) & 0xFF
|
||||||
mvp[-1] = la & 0xff
|
mvp[-1] = la & 0xFF
|
||||||
pe = (addr & -self._c_bytes) + self._c_bytes # Byte 0 of next chip
|
pe = (addr & -self._c_bytes) + self._c_bytes # Byte 0 of next chip
|
||||||
return min(nbytes, pe - la)
|
return min(nbytes, pe - la)
|
||||||
|
|
||||||
|
@ -180,6 +191,6 @@ class FLASH(FlashDevice):
|
||||||
cs(1)
|
cs(1)
|
||||||
mvp[0] = self._cmds[_SE]
|
mvp[0] = self._cmds[_SE]
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:self._cmdlen]) # Start erase
|
self._spi.write(mvp[: self._cmdlen]) # Start erase
|
||||||
cs(1)
|
cs(1)
|
||||||
self._wait_rdy() # Wait for erase to complete
|
self._wait_rdy() # Wait for erase to complete
|
||||||
|
|
|
@ -13,55 +13,58 @@ from flash_spi import FLASH
|
||||||
# Return an EEPROM array. Adapt for platforms other than Pyboard.
|
# Return an EEPROM array. Adapt for platforms other than Pyboard.
|
||||||
# May want to set chip size and baudrate.
|
# May want to set chip size and baudrate.
|
||||||
def get_device():
|
def get_device():
|
||||||
if uos.uname().machine.split(' ')[0][:4] == 'PYBD':
|
if uos.uname().machine.split(" ")[0][:4] == "PYBD":
|
||||||
Pin.board.EN_3V3.value(1)
|
Pin.board.EN_3V3.value(1)
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
# Adjust to suit number of chips and their wiring.
|
# Adjust to suit number of chips and their wiring.
|
||||||
cspins = (Pin(Pin.board.Y5, Pin.OUT, value=1), Pin(Pin.board.Y4, Pin.OUT, value=1))
|
cspins = (Pin(Pin.board.Y5, Pin.OUT, value=1), Pin(Pin.board.Y4, Pin.OUT, value=1))
|
||||||
flash = FLASH(SPI(2, baudrate=20_000_000), cspins)
|
flash = FLASH(SPI(2, baudrate=20_000_000), cspins)
|
||||||
print('Instantiated Flash')
|
print("Instantiated Flash")
|
||||||
return flash
|
return flash
|
||||||
|
|
||||||
|
|
||||||
# **** END OF USER-ADAPTED CODE ****
|
# **** END OF USER-ADAPTED CODE ****
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
# Dumb file copy utility to help with managing EEPROM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /fl_ext/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /fl_ext/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
eep = get_device()
|
eep = get_device()
|
||||||
|
@ -70,33 +73,36 @@ def test():
|
||||||
eep[sa + v] = v
|
eep[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if eep[sa + v] != v:
|
if eep[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, eep[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(sa + v, eep[sa + v], v)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = uos.urandom(30)
|
data = uos.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
eep[sa:sa + 30] = data
|
eep[sa : sa + 30] = data
|
||||||
if eep[sa:sa + 30] == data:
|
if eep[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
|
|
||||||
block = 256
|
block = 256
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test block boundary {} passed'.format(block))
|
print("Test block boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test block boundary {} fail'.format(block))
|
print("Test block boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
block = eep._c_bytes
|
block = eep._c_bytes
|
||||||
if eep._a_bytes > block:
|
if eep._a_bytes > block:
|
||||||
res = _testblock(eep, block)
|
res = _testblock(eep, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest(format=False):
|
def fstest(format=False):
|
||||||
|
@ -105,28 +111,29 @@ def fstest(format=False):
|
||||||
if format:
|
if format:
|
||||||
uos.VfsLfs2.mkfs(eep)
|
uos.VfsLfs2.mkfs(eep)
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/fl_ext')
|
uos.mount(eep, "/fl_ext")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
print('Contents of "/": {}'.format(uos.listdir('/')))
|
print('Contents of "/": {}'.format(uos.listdir("/")))
|
||||||
print('Contents of "/fl_ext": {}'.format(uos.listdir('/fl_ext')))
|
print('Contents of "/fl_ext": {}'.format(uos.listdir("/fl_ext")))
|
||||||
print(uos.statvfs('/fl_ext'))
|
print(uos.statvfs("/fl_ext"))
|
||||||
|
|
||||||
|
|
||||||
def cptest():
|
def cptest():
|
||||||
eep = get_device()
|
eep = get_device()
|
||||||
if 'fl_ext' in uos.listdir('/'):
|
if "fl_ext" in uos.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/fl_ext')
|
uos.mount(eep, "/fl_ext")
|
||||||
except OSError:
|
except OSError:
|
||||||
print('Fail mounting device. Have you formatted it?')
|
print("Fail mounting device. Have you formatted it?")
|
||||||
return
|
return
|
||||||
print('Mounted device.')
|
print("Mounted device.")
|
||||||
cp('flash_test.py', '/fl_ext/')
|
cp("flash_test.py", "/fl_ext/")
|
||||||
cp('flash_spi.py', '/fl_ext/')
|
cp("flash_spi.py", "/fl_ext/")
|
||||||
print('Contents of "/fl_ext": {}'.format(uos.listdir('/fl_ext')))
|
print('Contents of "/fl_ext": {}'.format(uos.listdir("/fl_ext")))
|
||||||
print(uos.statvfs('/fl_ext'))
|
print(uos.statvfs("/fl_ext"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
|
@ -135,36 +142,37 @@ def full_test(count=10):
|
||||||
for n in range(count):
|
for n in range(count):
|
||||||
data = uos.urandom(256)
|
data = uos.urandom(256)
|
||||||
while True:
|
while True:
|
||||||
sa = int.from_bytes(uos.urandom(4), 'little') & 0x3fffffff
|
sa = int.from_bytes(uos.urandom(4), "little") & 0x3FFFFFFF
|
||||||
if sa < (flash._a_bytes - 256):
|
if sa < (flash._a_bytes - 256):
|
||||||
break
|
break
|
||||||
flash[sa:sa + 256] = data
|
flash[sa : sa + 256] = data
|
||||||
flash.sync()
|
flash.sync()
|
||||||
got = flash[sa:sa + 256]
|
got = flash[sa : sa + 256]
|
||||||
if got == data:
|
if got == data:
|
||||||
print('Pass {} address {:08x} passed'.format(n, sa))
|
print("Pass {} address {:08x} passed".format(n, sa))
|
||||||
if sa & 0xfff > (4096 -253):
|
if sa & 0xFFF > (4096 - 253):
|
||||||
print('cross boundary')
|
print("cross boundary")
|
||||||
else:
|
else:
|
||||||
print('Pass {} address {:08x} readback failed.'.format(n, sa))
|
print("Pass {} address {:08x} readback failed.".format(n, sa))
|
||||||
sa1 = sa & 0xfff
|
sa1 = sa & 0xFFF
|
||||||
print('Bounds {} to {}'.format(sa1, sa1+256))
|
print("Bounds {} to {}".format(sa1, sa1 + 256))
|
||||||
# flash.sync()
|
# flash.sync()
|
||||||
got1 = flash[sa:sa + 256]
|
got1 = flash[sa : sa + 256]
|
||||||
if got1 == data:
|
if got1 == data:
|
||||||
print('second attempt OK')
|
print("second attempt OK")
|
||||||
else:
|
else:
|
||||||
print('second attempt fail', got == got1)
|
print("second attempt fail", got == got1)
|
||||||
for n, g in enumerate(got):
|
for n, g in enumerate(got):
|
||||||
if g != data[n]:
|
if g != data[n]:
|
||||||
print('{} {:2x} {:2x} {:2x}'.format(n, data[n], g, got1[n]))
|
print("{} {:2x} {:2x} {:2x}".format(n, data[n], g, got1[n]))
|
||||||
break
|
break
|
||||||
|
|
||||||
helpstr = '''Available tests:
|
|
||||||
|
helpstr = """Available tests:
|
||||||
test() Basic hardware test.
|
test() Basic hardware test.
|
||||||
full_test(count=10) Full hardware test, count is no. of passes.
|
full_test(count=10) Full hardware test, count is no. of passes.
|
||||||
fstest(format=False) Check or create littlefs filesystem.
|
fstest(format=False) Check or create littlefs filesystem.
|
||||||
cptest() Copy 2 files to filesystem.
|
cptest() Copy 2 files to filesystem.
|
||||||
cp() Primitive copy routine. See docs.
|
cp() Primitive copy routine. See docs.
|
||||||
'''
|
"""
|
||||||
print(helpstr)
|
print(helpstr)
|
||||||
|
|
|
@ -8,30 +8,33 @@ from machine import SPI, Pin
|
||||||
from flash_spi import FLASH
|
from flash_spi import FLASH
|
||||||
from flash_test import get_device
|
from flash_test import get_device
|
||||||
|
|
||||||
directory = '/fl_ext'
|
directory = "/fl_ext"
|
||||||
a = bytearray(range(256))
|
a = bytearray(range(256))
|
||||||
b = bytearray(256)
|
b = bytearray(256)
|
||||||
files = {} # n:length
|
files = {} # n:length
|
||||||
errors = 0
|
errors = 0
|
||||||
|
|
||||||
|
|
||||||
def fname(n):
|
def fname(n):
|
||||||
return '{}/{:05d}'.format(directory, n + 1) # Names start 00001
|
return "{}/{:05d}".format(directory, n + 1) # Names start 00001
|
||||||
|
|
||||||
|
|
||||||
def fcreate(n): # Create a binary file of random length
|
def fcreate(n): # Create a binary file of random length
|
||||||
length = int.from_bytes(uos.urandom(2), 'little') + 1 # 1-65536 bytes
|
length = int.from_bytes(uos.urandom(2), "little") + 1 # 1-65536 bytes
|
||||||
linit = length
|
linit = length
|
||||||
with open(fname(n), 'wb') as f:
|
with open(fname(n), "wb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nw = min(length, 256)
|
nw = min(length, 256)
|
||||||
f.write(a[:nw])
|
f.write(a[:nw])
|
||||||
length -= nw
|
length -= nw
|
||||||
files[n] = length
|
files[n] = length
|
||||||
return linit
|
return linit
|
||||||
|
|
||||||
|
|
||||||
def fcheck(n):
|
def fcheck(n):
|
||||||
length = files[n]
|
length = files[n]
|
||||||
with open(fname(n), 'rb') as f:
|
with open(fname(n), "rb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nr = f.readinto(b)
|
nr = f.readinto(b)
|
||||||
if not nr:
|
if not nr:
|
||||||
return False
|
return False
|
||||||
|
@ -40,21 +43,23 @@ def fcheck(n):
|
||||||
length -= nr
|
length -= nr
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_all():
|
def check_all():
|
||||||
global errors
|
global errors
|
||||||
for n in files:
|
for n in files:
|
||||||
if fcheck(n):
|
if fcheck(n):
|
||||||
print('File {:d} OK'.format(n))
|
print("File {:d} OK".format(n))
|
||||||
else:
|
else:
|
||||||
print('Error in file', n)
|
print("Error in file", n)
|
||||||
errors += 1
|
errors += 1
|
||||||
print('Total errors:', errors)
|
print("Total errors:", errors)
|
||||||
|
|
||||||
|
|
||||||
def remove_all():
|
def remove_all():
|
||||||
for n in files:
|
for n in files:
|
||||||
uos.remove(fname(n))
|
uos.remove(fname(n))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
eep = get_device()
|
eep = get_device()
|
||||||
try:
|
try:
|
||||||
|
@ -63,15 +68,16 @@ def main():
|
||||||
pass
|
pass
|
||||||
for n in range(128):
|
for n in range(128):
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Created', n, length)
|
print("Created", n, length)
|
||||||
print('Created files', files)
|
print("Created files", files)
|
||||||
check_all()
|
check_all()
|
||||||
for _ in range(100):
|
for _ in range(100):
|
||||||
for x in range(5): # Rewrite 5 files with new lengths
|
for x in range(5): # Rewrite 5 files with new lengths
|
||||||
n = int.from_bytes(uos.urandom(1), 'little') & 0x7f
|
n = int.from_bytes(uos.urandom(1), "little") & 0x7F
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Rewrote', n, length)
|
print("Rewrote", n, length)
|
||||||
check_all()
|
check_all()
|
||||||
remove_all()
|
remove_all()
|
||||||
|
|
||||||
print('main() to run littlefs test. Filesystem must exist.')
|
|
||||||
|
print("main() to run littlefs test. Filesystem must exist.")
|
||||||
|
|
|
@ -9,37 +9,42 @@ from flash_spi import FLASH
|
||||||
|
|
||||||
cspins = (Pin(5, Pin.OUT, value=1), Pin(14, Pin.OUT, value=1))
|
cspins = (Pin(5, Pin.OUT, value=1), Pin(14, Pin.OUT, value=1))
|
||||||
|
|
||||||
spi=SPI(-1, baudrate=20_000_000, sck=Pin(4), miso=Pin(0), mosi=Pin(2))
|
spi = SPI(-1, baudrate=20_000_000, sck=Pin(4), miso=Pin(0), mosi=Pin(2))
|
||||||
|
|
||||||
|
|
||||||
def get_flash():
|
def get_flash():
|
||||||
flash = FLASH(spi, cspins)
|
flash = FLASH(spi, cspins)
|
||||||
print('Instantiated Flash')
|
print("Instantiated Flash")
|
||||||
return flash
|
return flash
|
||||||
|
|
||||||
directory = '/fl_ext'
|
|
||||||
|
directory = "/fl_ext"
|
||||||
a = bytearray(range(256)) # Data to write
|
a = bytearray(range(256)) # Data to write
|
||||||
b = bytearray(256) # Data to read back
|
b = bytearray(256) # Data to read back
|
||||||
files = {} # n:length
|
files = {} # n:length
|
||||||
errors = 0
|
errors = 0
|
||||||
|
|
||||||
|
|
||||||
def fname(n):
|
def fname(n):
|
||||||
return '{}/{:05d}'.format(directory, n + 1) # Names start 00001
|
return "{}/{:05d}".format(directory, n + 1) # Names start 00001
|
||||||
|
|
||||||
|
|
||||||
def fcreate(n): # Create a binary file of random length
|
def fcreate(n): # Create a binary file of random length
|
||||||
length = int.from_bytes(uos.urandom(2), 'little') + 1 # 1-65536 bytes
|
length = int.from_bytes(uos.urandom(2), "little") + 1 # 1-65536 bytes
|
||||||
linit = length
|
linit = length
|
||||||
with open(fname(n), 'wb') as f:
|
with open(fname(n), "wb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nw = min(length, 256)
|
nw = min(length, 256)
|
||||||
f.write(a[:nw])
|
f.write(a[:nw])
|
||||||
length -= nw
|
length -= nw
|
||||||
files[n] = length
|
files[n] = length
|
||||||
return linit
|
return linit
|
||||||
|
|
||||||
|
|
||||||
def fcheck(n):
|
def fcheck(n):
|
||||||
length = files[n]
|
length = files[n]
|
||||||
with open(fname(n), 'rb') as f:
|
with open(fname(n), "rb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nr = f.readinto(b)
|
nr = f.readinto(b)
|
||||||
if not nr:
|
if not nr:
|
||||||
return False
|
return False
|
||||||
|
@ -48,43 +53,46 @@ def fcheck(n):
|
||||||
length -= nr
|
length -= nr
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_all():
|
def check_all():
|
||||||
global errors
|
global errors
|
||||||
for n in files:
|
for n in files:
|
||||||
if fcheck(n):
|
if fcheck(n):
|
||||||
print('File {:d} OK'.format(n))
|
print("File {:d} OK".format(n))
|
||||||
else:
|
else:
|
||||||
print('Error in file', n)
|
print("Error in file", n)
|
||||||
errors += 1
|
errors += 1
|
||||||
print('Total errors:', errors)
|
print("Total errors:", errors)
|
||||||
|
|
||||||
|
|
||||||
def remove_all():
|
def remove_all():
|
||||||
for n in files:
|
for n in files:
|
||||||
uos.remove(fname(n))
|
uos.remove(fname(n))
|
||||||
|
|
||||||
|
|
||||||
def flash_test(format=False):
|
def flash_test(format=False):
|
||||||
eep = get_flash()
|
eep = get_flash()
|
||||||
if format:
|
if format:
|
||||||
uos.VfsLfs2.mkfs(eep)
|
uos.VfsLfs2.mkfs(eep)
|
||||||
try:
|
try:
|
||||||
uos.mount(eep,'/fl_ext')
|
uos.mount(eep, "/fl_ext")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
for n in range(128):
|
for n in range(128):
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Created', n, length)
|
print("Created", n, length)
|
||||||
print('Created files', files)
|
print("Created files", files)
|
||||||
check_all()
|
check_all()
|
||||||
for _ in range(100):
|
for _ in range(100):
|
||||||
for x in range(5): # Rewrite 5 files with new lengths
|
for x in range(5): # Rewrite 5 files with new lengths
|
||||||
n = int.from_bytes(uos.urandom(1), 'little') & 0x7f
|
n = int.from_bytes(uos.urandom(1), "little") & 0x7F
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Rewrote', n, length)
|
print("Rewrote", n, length)
|
||||||
check_all()
|
check_all()
|
||||||
remove_all()
|
remove_all()
|
||||||
|
|
||||||
msg='''Run wemos_flash.flash_test(True) to format new array, otherwise
|
|
||||||
|
msg = """Run wemos_flash.flash_test(True) to format new array, otherwise
|
||||||
wemos_flash.flash_test()
|
wemos_flash.flash_test()
|
||||||
Runs prolonged test of filesystem.'''
|
Runs prolonged test of filesystem."""
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
|
@ -7,31 +7,34 @@ import uos
|
||||||
from machine import SPI, Pin
|
from machine import SPI, Pin
|
||||||
from fram_spi_test import get_fram
|
from fram_spi_test import get_fram
|
||||||
|
|
||||||
directory = '/fram'
|
directory = "/fram"
|
||||||
a = bytearray(range(256))
|
a = bytearray(range(256))
|
||||||
b = bytearray(256)
|
b = bytearray(256)
|
||||||
files = {} # n:length
|
files = {} # n:length
|
||||||
errors = 0
|
errors = 0
|
||||||
|
|
||||||
|
|
||||||
def fname(n):
|
def fname(n):
|
||||||
return '{}/{:05d}'.format(directory, n + 1) # Names start 00001
|
return "{}/{:05d}".format(directory, n + 1) # Names start 00001
|
||||||
|
|
||||||
|
|
||||||
def fcreate(n): # Create a binary file of random length
|
def fcreate(n): # Create a binary file of random length
|
||||||
length = int.from_bytes(uos.urandom(2), 'little') + 1 # 1-65536 bytes
|
length = int.from_bytes(uos.urandom(2), "little") + 1 # 1-65536 bytes
|
||||||
length &= 0x3ff # 1-1023 for FRAM
|
length &= 0x3FF # 1-1023 for FRAM
|
||||||
linit = length
|
linit = length
|
||||||
with open(fname(n), 'wb') as f:
|
with open(fname(n), "wb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nw = min(length, 256)
|
nw = min(length, 256)
|
||||||
f.write(a[:nw])
|
f.write(a[:nw])
|
||||||
length -= nw
|
length -= nw
|
||||||
files[n] = length
|
files[n] = length
|
||||||
return linit
|
return linit
|
||||||
|
|
||||||
|
|
||||||
def fcheck(n):
|
def fcheck(n):
|
||||||
length = files[n]
|
length = files[n]
|
||||||
with open(fname(n), 'rb') as f:
|
with open(fname(n), "rb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nr = f.readinto(b)
|
nr = f.readinto(b)
|
||||||
if not nr:
|
if not nr:
|
||||||
return False
|
return False
|
||||||
|
@ -40,21 +43,23 @@ def fcheck(n):
|
||||||
length -= nr
|
length -= nr
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_all():
|
def check_all():
|
||||||
global errors
|
global errors
|
||||||
for n in files:
|
for n in files:
|
||||||
if fcheck(n):
|
if fcheck(n):
|
||||||
print('File {:d} OK'.format(n))
|
print("File {:d} OK".format(n))
|
||||||
else:
|
else:
|
||||||
print('Error in file', n)
|
print("Error in file", n)
|
||||||
errors += 1
|
errors += 1
|
||||||
print('Total errors:', errors)
|
print("Total errors:", errors)
|
||||||
|
|
||||||
|
|
||||||
def remove_all():
|
def remove_all():
|
||||||
for n in files:
|
for n in files:
|
||||||
uos.remove(fname(n))
|
uos.remove(fname(n))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
try:
|
try:
|
||||||
|
@ -63,15 +68,16 @@ def main():
|
||||||
pass
|
pass
|
||||||
for n in range(128):
|
for n in range(128):
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Created', n, length)
|
print("Created", n, length)
|
||||||
print('Created files', files)
|
print("Created files", files)
|
||||||
check_all()
|
check_all()
|
||||||
for _ in range(100):
|
for _ in range(100):
|
||||||
for x in range(5): # Rewrite 5 files with new lengths
|
for x in range(5): # Rewrite 5 files with new lengths
|
||||||
n = int.from_bytes(uos.urandom(1), 'little') & 0x7f
|
n = int.from_bytes(uos.urandom(1), "little") & 0x7F
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Rewrote', n, length)
|
print("Rewrote", n, length)
|
||||||
check_all()
|
check_all()
|
||||||
remove_all()
|
remove_all()
|
||||||
|
|
||||||
print('main() to run littlefs test. Filesystem must exist.')
|
|
||||||
|
print("main() to run littlefs test. Filesystem must exist.")
|
||||||
|
|
|
@ -8,8 +8,8 @@ from bdevice import BlockDevice
|
||||||
|
|
||||||
_SIZE = const(32768) # Chip size 32KiB
|
_SIZE = const(32768) # Chip size 32KiB
|
||||||
_ADDR = const(0x50) # FRAM I2C address 0x50 to 0x57
|
_ADDR = const(0x50) # FRAM I2C address 0x50 to 0x57
|
||||||
_FRAM_SLAVE_ID = const(0xf8) # FRAM device ID location
|
_FRAM_SLAVE_ID = const(0xF8) # FRAM device ID location
|
||||||
_MANF_ID = const(0x0a)
|
_MANF_ID = const(0x0A)
|
||||||
_PRODUCT_ID = const(0x510)
|
_PRODUCT_ID = const(0x510)
|
||||||
|
|
||||||
|
|
||||||
|
@ -29,21 +29,23 @@ class FRAM(BlockDevice):
|
||||||
chips = [d for d in devices if d in range(_ADDR, _ADDR + 8)]
|
chips = [d for d in devices if d in range(_ADDR, _ADDR + 8)]
|
||||||
nchips = len(chips)
|
nchips = len(chips)
|
||||||
if nchips == 0:
|
if nchips == 0:
|
||||||
raise RuntimeError('FRAM not found.')
|
raise RuntimeError("FRAM not found.")
|
||||||
if min(chips) != _ADDR or (max(chips) - _ADDR) >= nchips:
|
if min(chips) != _ADDR or (max(chips) - _ADDR) >= nchips:
|
||||||
raise RuntimeError('Non-contiguous chip addresses', chips)
|
raise RuntimeError("Non-contiguous chip addresses", chips)
|
||||||
for chip in chips:
|
for chip in chips:
|
||||||
if not self._available(chip):
|
if not self._available(chip):
|
||||||
raise RuntimeError('FRAM at address 0x{:02x} reports an error'.format(chip))
|
raise RuntimeError(
|
||||||
|
"FRAM at address 0x{:02x} reports an error".format(chip)
|
||||||
|
)
|
||||||
if verbose:
|
if verbose:
|
||||||
s = '{} chips detected. Total FRAM size {}bytes.'
|
s = "{} chips detected. Total FRAM size {}bytes."
|
||||||
print(s.format(nchips, chip_size * nchips))
|
print(s.format(nchips, chip_size * nchips))
|
||||||
return nchips
|
return nchips
|
||||||
|
|
||||||
def _available(self, device_addr):
|
def _available(self, device_addr):
|
||||||
res = self._buf3
|
res = self._buf3
|
||||||
self._i2c.readfrom_mem_into(_FRAM_SLAVE_ID >> 1, device_addr << 1, res)
|
self._i2c.readfrom_mem_into(_FRAM_SLAVE_ID >> 1, device_addr << 1, res)
|
||||||
manufacturerID = (res[0] << 4) + (res[1] >> 4)
|
manufacturerID = (res[0] << 4) + (res[1] >> 4)
|
||||||
productID = ((res[1] & 0x0F) << 8) + res[2]
|
productID = ((res[1] & 0x0F) << 8) + res[2]
|
||||||
return manufacturerID == _MANF_ID and productID == _PRODUCT_ID
|
return manufacturerID == _MANF_ID and productID == _PRODUCT_ID
|
||||||
|
|
||||||
|
@ -52,10 +54,10 @@ class FRAM(BlockDevice):
|
||||||
# Return the no. of bytes available to access on that chip.
|
# Return the no. of bytes available to access on that chip.
|
||||||
def _getaddr(self, addr, nbytes): # Set up _addrbuf and i2c_addr
|
def _getaddr(self, addr, nbytes): # Set up _addrbuf and i2c_addr
|
||||||
if addr >= self._a_bytes:
|
if addr >= self._a_bytes:
|
||||||
raise RuntimeError('FRAM Address is out of range')
|
raise RuntimeError("FRAM Address is out of range")
|
||||||
ca, la = divmod(addr, self._c_bytes) # ca == chip no, la == offset into chip
|
ca, la = divmod(addr, self._c_bytes) # ca == chip no, la == offset into chip
|
||||||
self._addrbuf[0] = (la >> 8) & 0xff
|
self._addrbuf[0] = (la >> 8) & 0xFF
|
||||||
self._addrbuf[1] = la & 0xff
|
self._addrbuf[1] = la & 0xFF
|
||||||
self._i2c_addr = _ADDR + ca
|
self._i2c_addr = _ADDR + ca
|
||||||
return min(nbytes, self._c_bytes - la)
|
return min(nbytes, self._c_bytes - la)
|
||||||
|
|
||||||
|
@ -67,9 +69,13 @@ class FRAM(BlockDevice):
|
||||||
npage = self._getaddr(addr, nbytes) # No of bytes that fit on current chip
|
npage = self._getaddr(addr, nbytes) # No of bytes that fit on current chip
|
||||||
if read:
|
if read:
|
||||||
self._i2c.writeto(self._i2c_addr, self._addrbuf)
|
self._i2c.writeto(self._i2c_addr, self._addrbuf)
|
||||||
self._i2c.readfrom_into(self._i2c_addr, mvb[start : start + npage]) # Sequential read
|
self._i2c.readfrom_into(
|
||||||
|
self._i2c_addr, mvb[start : start + npage]
|
||||||
|
) # Sequential read
|
||||||
else:
|
else:
|
||||||
self._i2c.writevto(self._i2c_addr, (self._addrbuf, buf[start: start + npage]))
|
self._i2c.writevto(
|
||||||
|
self._i2c_addr, (self._addrbuf, buf[start : start + npage])
|
||||||
|
)
|
||||||
nbytes -= npage
|
nbytes -= npage
|
||||||
start += npage
|
start += npage
|
||||||
addr += npage
|
addr += npage
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
|
|
||||||
from micropython import const
|
from micropython import const
|
||||||
from bdevice import BlockDevice
|
from bdevice import BlockDevice
|
||||||
|
|
||||||
# import time # for sleep command
|
# import time # for sleep command
|
||||||
|
|
||||||
# Command set
|
# Command set
|
||||||
|
@ -19,15 +20,15 @@ _RDSR = const(5) # Read status reg
|
||||||
_WRSR = const(1)
|
_WRSR = const(1)
|
||||||
_READ = const(3)
|
_READ = const(3)
|
||||||
_WRITE = const(2)
|
_WRITE = const(2)
|
||||||
_RDID = const(0x9f)
|
_RDID = const(0x9F)
|
||||||
# _FSTRD = const(0x0b) No obvious difference to _READ
|
# _FSTRD = const(0x0b) No obvious difference to _READ
|
||||||
_SLEEP = const(0xb9)
|
_SLEEP = const(0xB9)
|
||||||
|
|
||||||
|
|
||||||
class FRAM(BlockDevice):
|
class FRAM(BlockDevice):
|
||||||
def __init__(self, spi, cspins, size=512, verbose=True, block_size=9):
|
def __init__(self, spi, cspins, size=512, verbose=True, block_size=9):
|
||||||
if size not in (256, 512):
|
if size not in (256, 512):
|
||||||
raise ValueError('FRAM size must be 256 or 512')
|
raise ValueError("FRAM size must be 256 or 512")
|
||||||
super().__init__(block_size, len(cspins), size * 1024)
|
super().__init__(block_size, len(cspins), size * 1024)
|
||||||
self._spi = spi
|
self._spi = spi
|
||||||
self._cspins = cspins
|
self._cspins = cspins
|
||||||
|
@ -38,20 +39,20 @@ class FRAM(BlockDevice):
|
||||||
# Check hardware
|
# Check hardware
|
||||||
density = 8 if size == 256 else 9
|
density = 8 if size == 256 else 9
|
||||||
for n, cs in enumerate(cspins):
|
for n, cs in enumerate(cspins):
|
||||||
mvp[:] = b'\0\0\0\0\0'
|
mvp[:] = b"\0\0\0\0\0"
|
||||||
mvp[0] = _RDID
|
mvp[0] = _RDID
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write_readinto(mvp, mvp)
|
self._spi.write_readinto(mvp, mvp)
|
||||||
cs(1)
|
cs(1)
|
||||||
# Ignore bits labelled "proprietary"
|
# Ignore bits labelled "proprietary"
|
||||||
if mvp[1] != 4 or mvp[2] != 0x7f:
|
if mvp[1] != 4 or mvp[2] != 0x7F:
|
||||||
s = 'FRAM not found at cspins[{}].'
|
s = "FRAM not found at cspins[{}]."
|
||||||
raise RuntimeError(s.format(n))
|
raise RuntimeError(s.format(n))
|
||||||
if (mvp[3] & 0x1f) != density:
|
if (mvp[3] & 0x1F) != density:
|
||||||
s = 'FRAM at cspins[{}] is incorrect size.'
|
s = "FRAM at cspins[{}] is incorrect size."
|
||||||
raise RuntimeError(s.format(n))
|
raise RuntimeError(s.format(n))
|
||||||
if verbose:
|
if verbose:
|
||||||
s = 'Total FRAM size {} bytes in {} devices.'
|
s = "Total FRAM size {} bytes in {} devices."
|
||||||
print(s.format(self._a_bytes, n + 1))
|
print(s.format(self._a_bytes, n + 1))
|
||||||
# Set up status register on each chip
|
# Set up status register on each chip
|
||||||
for cs in cspins:
|
for cs in cspins:
|
||||||
|
@ -69,7 +70,7 @@ class FRAM(BlockDevice):
|
||||||
self._spi.write_readinto(mvp[:2], mvp[:2])
|
self._spi.write_readinto(mvp[:2], mvp[:2])
|
||||||
cs(1)
|
cs(1)
|
||||||
if mvp[1]:
|
if mvp[1]:
|
||||||
s = 'FRAM has bad status at cspins[{}].'
|
s = "FRAM has bad status at cspins[{}]."
|
||||||
raise RuntimeError(s.format(n))
|
raise RuntimeError(s.format(n))
|
||||||
|
|
||||||
def _wrctrl(self, cs, en): # Enable/Disable device write
|
def _wrctrl(self, cs, en): # Enable/Disable device write
|
||||||
|
@ -79,16 +80,16 @@ class FRAM(BlockDevice):
|
||||||
self._spi.write(mvp[:1])
|
self._spi.write(mvp[:1])
|
||||||
cs(1)
|
cs(1)
|
||||||
|
|
||||||
#def sleep(self, on):
|
# def sleep(self, on):
|
||||||
#mvp = self._mvp
|
# mvp = self._mvp
|
||||||
#mvp[0] = _SLEEP
|
# mvp[0] = _SLEEP
|
||||||
#for cs in self._cspins:
|
# for cs in self._cspins:
|
||||||
#cs(0)
|
# cs(0)
|
||||||
#if on:
|
# if on:
|
||||||
#self._spi.write(mvp[:1])
|
# self._spi.write(mvp[:1])
|
||||||
#else:
|
# else:
|
||||||
#time.sleep_us(500)
|
# time.sleep_us(500)
|
||||||
#cs(1)
|
# cs(1)
|
||||||
|
|
||||||
# Given an address, set current chip select and address buffer.
|
# Given an address, set current chip select and address buffer.
|
||||||
# Return the number of bytes that can be processed in the current page.
|
# Return the number of bytes that can be processed in the current page.
|
||||||
|
@ -100,9 +101,9 @@ class FRAM(BlockDevice):
|
||||||
self._ccs = self._cspins[ca] # Current chip select
|
self._ccs = self._cspins[ca] # Current chip select
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
mvp[1] = la >> 16
|
mvp[1] = la >> 16
|
||||||
mvp[2] = (la >> 8) & 0xff
|
mvp[2] = (la >> 8) & 0xFF
|
||||||
mvp[3] = la & 0xff
|
mvp[3] = la & 0xFF
|
||||||
pe = (addr & ~0xff) + 0x100 # byte 0 of next page
|
pe = (addr & ~0xFF) + 0x100 # byte 0 of next page
|
||||||
return min(nbytes, pe - la)
|
return min(nbytes, pe - la)
|
||||||
|
|
||||||
# Interface to bdevice
|
# Interface to bdevice
|
||||||
|
@ -125,7 +126,7 @@ class FRAM(BlockDevice):
|
||||||
mvp[0] = _WRITE
|
mvp[0] = _WRITE
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:4])
|
self._spi.write(mvp[:4])
|
||||||
self._spi.write(mvb[start: start + npage])
|
self._spi.write(mvb[start : start + npage])
|
||||||
cs(1)
|
cs(1)
|
||||||
self._wrctrl(cs, False)
|
self._wrctrl(cs, False)
|
||||||
nbytes -= npage
|
nbytes -= npage
|
||||||
|
|
|
@ -12,51 +12,56 @@ cspins = (Pin(Pin.board.Y5, Pin.OUT, value=1), Pin(Pin.board.Y4, Pin.OUT, value=
|
||||||
|
|
||||||
# Return an FRAM array. Adapt for platforms other than Pyboard.
|
# Return an FRAM array. Adapt for platforms other than Pyboard.
|
||||||
def get_fram():
|
def get_fram():
|
||||||
if uos.uname().machine.split(' ')[0][:4] == 'PYBD':
|
if uos.uname().machine.split(" ")[0][:4] == "PYBD":
|
||||||
Pin.board.EN_3V3.value(1)
|
Pin.board.EN_3V3.value(1)
|
||||||
time.sleep(0.1) # Allow decouplers to charge
|
time.sleep(0.1) # Allow decouplers to charge
|
||||||
fram = FRAM(SPI(2, baudrate=25_000_000), cspins, size=512) # Change size as required
|
fram = FRAM(
|
||||||
print('Instantiated FRAM')
|
SPI(2, baudrate=25_000_000), cspins, size=512
|
||||||
|
) # Change size as required
|
||||||
|
print("Instantiated FRAM")
|
||||||
return fram
|
return fram
|
||||||
|
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing FRAM contents at the REPL.
|
# Dumb file copy utility to help with managing FRAM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /fram/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /fram/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
|
@ -65,57 +70,64 @@ def test():
|
||||||
fram[sa + v] = v
|
fram[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if fram[sa + v] != v:
|
if fram[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, fram[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(
|
||||||
|
sa + v, fram[sa + v], v
|
||||||
|
)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = uos.urandom(30)
|
data = uos.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
fram[sa:sa + 30] = data
|
fram[sa : sa + 30] = data
|
||||||
if fram[sa:sa + 30] == data:
|
if fram[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
# On FRAM the only meaningful block test is on a chip boundary.
|
# On FRAM the only meaningful block test is on a chip boundary.
|
||||||
block = fram._c_bytes
|
block = fram._c_bytes
|
||||||
if fram._a_bytes > block:
|
if fram._a_bytes > block:
|
||||||
res = _testblock(fram, block)
|
res = _testblock(fram, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest(format=False):
|
def fstest(format=False):
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
if format:
|
if format:
|
||||||
uos.VfsFat.mkfs(fram)
|
uos.VfsFat.mkfs(fram)
|
||||||
vfs=uos.VfsFat(fram)
|
vfs = uos.VfsFat(fram)
|
||||||
try:
|
try:
|
||||||
uos.mount(vfs,'/fram')
|
uos.mount(vfs, "/fram")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
print('Contents of "/": {}'.format(uos.listdir('/')))
|
print('Contents of "/": {}'.format(uos.listdir("/")))
|
||||||
print('Contents of "/fram": {}'.format(uos.listdir('/fram')))
|
print('Contents of "/fram": {}'.format(uos.listdir("/fram")))
|
||||||
print(uos.statvfs('/fram'))
|
print(uos.statvfs("/fram"))
|
||||||
|
|
||||||
|
|
||||||
def cptest():
|
def cptest():
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
if 'fram' in uos.listdir('/'):
|
if "fram" in uos.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
vfs=uos.VfsFat(fram)
|
vfs = uos.VfsFat(fram)
|
||||||
try:
|
try:
|
||||||
uos.mount(vfs,'/fram')
|
uos.mount(vfs, "/fram")
|
||||||
except OSError:
|
except OSError:
|
||||||
print('Fail mounting device. Have you formatted it?')
|
print("Fail mounting device. Have you formatted it?")
|
||||||
return
|
return
|
||||||
print('Mounted device.')
|
print("Mounted device.")
|
||||||
cp('fram_spi_test.py', '/fram/')
|
cp("fram_spi_test.py", "/fram/")
|
||||||
cp('fram_spi.py', '/fram/')
|
cp("fram_spi.py", "/fram/")
|
||||||
print('Contents of "/fram": {}'.format(uos.listdir('/fram')))
|
print('Contents of "/fram": {}'.format(uos.listdir("/fram")))
|
||||||
print(uos.statvfs('/fram'))
|
print(uos.statvfs("/fram"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
def full_test():
|
def full_test():
|
||||||
|
@ -123,9 +135,9 @@ def full_test():
|
||||||
page = 0
|
page = 0
|
||||||
for sa in range(0, len(fram), 256):
|
for sa in range(0, len(fram), 256):
|
||||||
data = uos.urandom(256)
|
data = uos.urandom(256)
|
||||||
fram[sa:sa + 256] = data
|
fram[sa : sa + 256] = data
|
||||||
if fram[sa:sa + 256] == data:
|
if fram[sa : sa + 256] == data:
|
||||||
print('Page {} passed'.format(page))
|
print("Page {} passed".format(page))
|
||||||
else:
|
else:
|
||||||
print('Page {} readback failed.'.format(page))
|
print("Page {} readback failed.".format(page))
|
||||||
page += 1
|
page += 1
|
||||||
|
|
|
@ -10,51 +10,54 @@ from fram_i2c import FRAM
|
||||||
|
|
||||||
# Return an FRAM array. Adapt for platforms other than Pyboard.
|
# Return an FRAM array. Adapt for platforms other than Pyboard.
|
||||||
def get_fram():
|
def get_fram():
|
||||||
if uos.uname().machine.split(' ')[0][:4] == 'PYBD':
|
if uos.uname().machine.split(" ")[0][:4] == "PYBD":
|
||||||
Pin.board.EN_3V3.value(1)
|
Pin.board.EN_3V3.value(1)
|
||||||
time.sleep(0.1) # Allow decouplers to charge
|
time.sleep(0.1) # Allow decouplers to charge
|
||||||
fram = FRAM(I2C(2))
|
fram = FRAM(I2C(2))
|
||||||
print('Instantiated FRAM')
|
print("Instantiated FRAM")
|
||||||
return fram
|
return fram
|
||||||
|
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing FRAM contents at the REPL.
|
# Dumb file copy utility to help with managing FRAM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /fram/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /fram/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
|
@ -63,57 +66,64 @@ def test():
|
||||||
fram[sa + v] = v
|
fram[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if fram[sa + v] != v:
|
if fram[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, fram[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(
|
||||||
|
sa + v, fram[sa + v], v
|
||||||
|
)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = uos.urandom(30)
|
data = uos.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
fram[sa:sa + 30] = data
|
fram[sa : sa + 30] = data
|
||||||
if fram[sa:sa + 30] == data:
|
if fram[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
# On FRAM the only meaningful block test is on a chip boundary.
|
# On FRAM the only meaningful block test is on a chip boundary.
|
||||||
block = fram._c_bytes
|
block = fram._c_bytes
|
||||||
if fram._a_bytes > block:
|
if fram._a_bytes > block:
|
||||||
res = _testblock(fram, block)
|
res = _testblock(fram, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest(format=False):
|
def fstest(format=False):
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
if format:
|
if format:
|
||||||
uos.VfsFat.mkfs(fram)
|
uos.VfsFat.mkfs(fram)
|
||||||
vfs=uos.VfsFat(fram)
|
vfs = uos.VfsFat(fram)
|
||||||
try:
|
try:
|
||||||
uos.mount(vfs,'/fram')
|
uos.mount(vfs, "/fram")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
print('Contents of "/": {}'.format(uos.listdir('/')))
|
print('Contents of "/": {}'.format(uos.listdir("/")))
|
||||||
print('Contents of "/fram": {}'.format(uos.listdir('/fram')))
|
print('Contents of "/fram": {}'.format(uos.listdir("/fram")))
|
||||||
print(uos.statvfs('/fram'))
|
print(uos.statvfs("/fram"))
|
||||||
|
|
||||||
|
|
||||||
def cptest():
|
def cptest():
|
||||||
fram = get_fram()
|
fram = get_fram()
|
||||||
if 'fram' in uos.listdir('/'):
|
if "fram" in uos.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
vfs=uos.VfsFat(fram)
|
vfs = uos.VfsFat(fram)
|
||||||
try:
|
try:
|
||||||
uos.mount(vfs,'/fram')
|
uos.mount(vfs, "/fram")
|
||||||
except OSError:
|
except OSError:
|
||||||
print('Fail mounting device. Have you formatted it?')
|
print("Fail mounting device. Have you formatted it?")
|
||||||
return
|
return
|
||||||
print('Mounted device.')
|
print("Mounted device.")
|
||||||
cp('fram_test.py', '/fram/')
|
cp("fram_test.py", "/fram/")
|
||||||
cp('fram_i2c.py', '/fram/')
|
cp("fram_i2c.py", "/fram/")
|
||||||
print('Contents of "/fram": {}'.format(uos.listdir('/fram')))
|
print('Contents of "/fram": {}'.format(uos.listdir("/fram")))
|
||||||
print(uos.statvfs('/fram'))
|
print(uos.statvfs("/fram"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
def full_test():
|
def full_test():
|
||||||
|
@ -121,9 +131,9 @@ def full_test():
|
||||||
page = 0
|
page = 0
|
||||||
for sa in range(0, len(fram), 256):
|
for sa in range(0, len(fram), 256):
|
||||||
data = uos.urandom(256)
|
data = uos.urandom(256)
|
||||||
fram[sa:sa + 256] = data
|
fram[sa : sa + 256] = data
|
||||||
if fram[sa:sa + 256] == data:
|
if fram[sa : sa + 256] == data:
|
||||||
print('Page {} passed'.format(page))
|
print("Page {} passed".format(page))
|
||||||
else:
|
else:
|
||||||
print('Page {} readback failed.'.format(page))
|
print("Page {} readback failed.".format(page))
|
||||||
page += 1
|
page += 1
|
||||||
|
|
|
@ -7,31 +7,34 @@ import os
|
||||||
from machine import SPI, Pin
|
from machine import SPI, Pin
|
||||||
from spiram_test import get_spiram
|
from spiram_test import get_spiram
|
||||||
|
|
||||||
directory = '/ram'
|
directory = "/ram"
|
||||||
a = bytearray(range(256))
|
a = bytearray(range(256))
|
||||||
b = bytearray(256)
|
b = bytearray(256)
|
||||||
files = {} # n:length
|
files = {} # n:length
|
||||||
errors = 0
|
errors = 0
|
||||||
|
|
||||||
|
|
||||||
def fname(n):
|
def fname(n):
|
||||||
return '{}/{:05d}'.format(directory, n + 1) # Names start 00001
|
return "{}/{:05d}".format(directory, n + 1) # Names start 00001
|
||||||
|
|
||||||
|
|
||||||
def fcreate(n): # Create a binary file of random length
|
def fcreate(n): # Create a binary file of random length
|
||||||
length = int.from_bytes(os.urandom(2), 'little') + 1 # 1-65536 bytes
|
length = int.from_bytes(os.urandom(2), "little") + 1 # 1-65536 bytes
|
||||||
length &= 0x3ff # 1-1023 for FRAM
|
length &= 0x3FF # 1-1023 for FRAM
|
||||||
linit = length
|
linit = length
|
||||||
with open(fname(n), 'wb') as f:
|
with open(fname(n), "wb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nw = min(length, 256)
|
nw = min(length, 256)
|
||||||
f.write(a[:nw])
|
f.write(a[:nw])
|
||||||
length -= nw
|
length -= nw
|
||||||
files[n] = length
|
files[n] = length
|
||||||
return linit
|
return linit
|
||||||
|
|
||||||
|
|
||||||
def fcheck(n):
|
def fcheck(n):
|
||||||
length = files[n]
|
length = files[n]
|
||||||
with open(fname(n), 'rb') as f:
|
with open(fname(n), "rb") as f:
|
||||||
while(length):
|
while length:
|
||||||
nr = f.readinto(b)
|
nr = f.readinto(b)
|
||||||
if not nr:
|
if not nr:
|
||||||
return False
|
return False
|
||||||
|
@ -40,39 +43,42 @@ def fcheck(n):
|
||||||
length -= nr
|
length -= nr
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_all():
|
def check_all():
|
||||||
global errors
|
global errors
|
||||||
for n in files:
|
for n in files:
|
||||||
if fcheck(n):
|
if fcheck(n):
|
||||||
print('File {:d} OK'.format(n))
|
print("File {:d} OK".format(n))
|
||||||
else:
|
else:
|
||||||
print('Error in file', n)
|
print("Error in file", n)
|
||||||
errors += 1
|
errors += 1
|
||||||
print('Total errors:', errors)
|
print("Total errors:", errors)
|
||||||
|
|
||||||
|
|
||||||
def remove_all():
|
def remove_all():
|
||||||
for n in files:
|
for n in files:
|
||||||
os.remove(fname(n))
|
os.remove(fname(n))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
ram = get_spiram()
|
ram = get_spiram()
|
||||||
os.VfsLfs2.mkfs(ram) # Format littlefs
|
os.VfsLfs2.mkfs(ram) # Format littlefs
|
||||||
try:
|
try:
|
||||||
os.mount(ram,'/ram')
|
os.mount(ram, "/ram")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
for n in range(128):
|
for n in range(128):
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Created', n, length)
|
print("Created", n, length)
|
||||||
print('Created files', files)
|
print("Created files", files)
|
||||||
check_all()
|
check_all()
|
||||||
for _ in range(100):
|
for _ in range(100):
|
||||||
for x in range(5): # Rewrite 5 files with new lengths
|
for x in range(5): # Rewrite 5 files with new lengths
|
||||||
n = int.from_bytes(os.urandom(1), 'little') & 0x7f
|
n = int.from_bytes(os.urandom(1), "little") & 0x7F
|
||||||
length = fcreate(n)
|
length = fcreate(n)
|
||||||
print('Rewrote', n, length)
|
print("Rewrote", n, length)
|
||||||
check_all()
|
check_all()
|
||||||
remove_all()
|
remove_all()
|
||||||
|
|
||||||
print('main() to run littlefs test. Erases any data on RAM.')
|
|
||||||
|
print("main() to run littlefs test. Erases any data on RAM.")
|
||||||
|
|
|
@ -15,13 +15,13 @@ _WRITE = const(2)
|
||||||
_READ = const(3)
|
_READ = const(3)
|
||||||
_RSTEN = const(0x66)
|
_RSTEN = const(0x66)
|
||||||
_RESET = const(0x99)
|
_RESET = const(0x99)
|
||||||
_RDID = const(0x9f)
|
_RDID = const(0x9F)
|
||||||
|
|
||||||
|
|
||||||
class SPIRAM(BlockDevice):
|
class SPIRAM(BlockDevice):
|
||||||
def __init__(self, spi, cspins, size=8192, verbose=True, block_size=9):
|
def __init__(self, spi, cspins, size=8192, verbose=True, block_size=9):
|
||||||
if size != 8192:
|
if size != 8192:
|
||||||
print('SPIRAM size other than 8192KiB may not work.')
|
print("SPIRAM size other than 8192KiB may not work.")
|
||||||
super().__init__(block_size, len(cspins), size * 1024)
|
super().__init__(block_size, len(cspins), size * 1024)
|
||||||
self._spi = spi
|
self._spi = spi
|
||||||
self._cspins = cspins
|
self._cspins = cspins
|
||||||
|
@ -31,18 +31,17 @@ class SPIRAM(BlockDevice):
|
||||||
self._mvp = mvp
|
self._mvp = mvp
|
||||||
# Check hardware
|
# Check hardware
|
||||||
for n, cs in enumerate(cspins):
|
for n, cs in enumerate(cspins):
|
||||||
mvp[:] = b'\0\0\0\0\0\0'
|
mvp[:] = b"\0\0\0\0\0\0"
|
||||||
mvp[0] = _RDID
|
mvp[0] = _RDID
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write_readinto(mvp, mvp)
|
self._spi.write_readinto(mvp, mvp)
|
||||||
cs(1)
|
cs(1)
|
||||||
if mvp[4] != 0x0d or mvp[5] != 0x5d:
|
if mvp[4] != 0x0D or mvp[5] != 0x5D:
|
||||||
print("Warning: expected manufacturer ID not found.")
|
print("Warning: expected manufacturer ID not found.")
|
||||||
|
|
||||||
if verbose:
|
|
||||||
s = 'Total SPIRAM size {} KiB in {} devices.'
|
|
||||||
print(s.format(self._a_bytes//1024, n + 1))
|
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
s = "Total SPIRAM size {} KiB in {} devices."
|
||||||
|
print(s.format(self._a_bytes // 1024, n + 1))
|
||||||
|
|
||||||
# Given an address, set current chip select and address buffer.
|
# Given an address, set current chip select and address buffer.
|
||||||
# Return the number of bytes that can be processed in the current chip.
|
# Return the number of bytes that can be processed in the current chip.
|
||||||
|
@ -53,8 +52,8 @@ class SPIRAM(BlockDevice):
|
||||||
self._ccs = self._cspins[ca] # Current chip select
|
self._ccs = self._cspins[ca] # Current chip select
|
||||||
mvp = self._mvp
|
mvp = self._mvp
|
||||||
mvp[1] = la >> 16
|
mvp[1] = la >> 16
|
||||||
mvp[2] = (la >> 8) & 0xff
|
mvp[2] = (la >> 8) & 0xFF
|
||||||
mvp[3] = la & 0xff
|
mvp[3] = la & 0xFF
|
||||||
pe = (addr & -self._c_bytes) + self._c_bytes # Byte 0 of next chip
|
pe = (addr & -self._c_bytes) + self._c_bytes # Byte 0 of next chip
|
||||||
return min(nbytes, pe - la)
|
return min(nbytes, pe - la)
|
||||||
|
|
||||||
|
@ -77,20 +76,21 @@ class SPIRAM(BlockDevice):
|
||||||
mvp[0] = _WRITE
|
mvp[0] = _WRITE
|
||||||
cs(0)
|
cs(0)
|
||||||
self._spi.write(mvp[:4])
|
self._spi.write(mvp[:4])
|
||||||
self._spi.write(mvb[start: start + nchip])
|
self._spi.write(mvb[start : start + nchip])
|
||||||
cs(1)
|
cs(1)
|
||||||
nbytes -= nchip
|
nbytes -= nchip
|
||||||
start += nchip
|
start += nchip
|
||||||
addr += nchip
|
addr += nchip
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
|
|
||||||
# Reset is unnecessary because it restores the default power-up state.
|
# Reset is unnecessary because it restores the default power-up state.
|
||||||
#def _reset(self, cs, bufr = bytearray(1)):
|
# def _reset(self, cs, bufr = bytearray(1)):
|
||||||
#cs(0)
|
# cs(0)
|
||||||
#bufr[0] = _RSTEN
|
# bufr[0] = _RSTEN
|
||||||
#self._spi.write(bufr)
|
# self._spi.write(bufr)
|
||||||
#cs(1)
|
# cs(1)
|
||||||
#cs(0)
|
# cs(0)
|
||||||
#bufr[0] = _RESET
|
# bufr[0] = _RESET
|
||||||
#self._spi.write(bufr)
|
# self._spi.write(bufr)
|
||||||
#cs(1)
|
# cs(1)
|
||||||
|
|
|
@ -14,51 +14,54 @@ cspins = (Pin(Pin.board.Y5, Pin.OUT, value=1), Pin(Pin.board.Y4, Pin.OUT, value=
|
||||||
|
|
||||||
# Return an RAM array. Adapt for platforms other than Pyboard.
|
# Return an RAM array. Adapt for platforms other than Pyboard.
|
||||||
def get_spiram():
|
def get_spiram():
|
||||||
if os.uname().machine.split(' ')[0][:4] == 'PYBD':
|
if os.uname().machine.split(" ")[0][:4] == "PYBD":
|
||||||
Pin.board.EN_3V3.value(1)
|
Pin.board.EN_3V3.value(1)
|
||||||
time.sleep(0.1) # Allow decouplers to charge
|
time.sleep(0.1) # Allow decouplers to charge
|
||||||
ram = SPIRAM(SPI(2, baudrate=25_000_000), cspins)
|
ram = SPIRAM(SPI(2, baudrate=25_000_000), cspins)
|
||||||
print('Instantiated RAM')
|
print("Instantiated RAM")
|
||||||
return ram
|
return ram
|
||||||
|
|
||||||
|
|
||||||
# Dumb file copy utility to help with managing FRAM contents at the REPL.
|
# Dumb file copy utility to help with managing FRAM contents at the REPL.
|
||||||
def cp(source, dest):
|
def cp(source, dest):
|
||||||
if dest.endswith('/'): # minimal way to allow
|
if dest.endswith("/"): # minimal way to allow
|
||||||
dest = ''.join((dest, source.split('/')[-1])) # cp /sd/file /ram/
|
dest = "".join((dest, source.split("/")[-1])) # cp /sd/file /ram/
|
||||||
with open(source, 'rb') as infile: # Caller should handle any OSError
|
with open(source, "rb") as infile: # Caller should handle any OSError
|
||||||
with open(dest,'wb') as outfile: # e.g file not found
|
with open(dest, "wb") as outfile: # e.g file not found
|
||||||
while True:
|
while True:
|
||||||
buf = infile.read(100)
|
buf = infile.read(100)
|
||||||
outfile.write(buf)
|
outfile.write(buf)
|
||||||
if len(buf) < 100:
|
if len(buf) < 100:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF DRIVER *****
|
# ***** TEST OF DRIVER *****
|
||||||
def _testblock(eep, bs):
|
def _testblock(eep, bs):
|
||||||
d0 = b'this >'
|
d0 = b"this >"
|
||||||
d1 = b'<is the boundary'
|
d1 = b"<is the boundary"
|
||||||
d2 = d0 + d1
|
d2 = d0 + d1
|
||||||
garbage = b'xxxxxxxxxxxxxxxxxxx'
|
garbage = b"xxxxxxxxxxxxxxxxxxx"
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
eep[start : end] = garbage
|
eep[start:end] = garbage
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != garbage:
|
if res != garbage:
|
||||||
return 'Block test fail 1:' + str(list(res))
|
return "Block test fail 1:" + str(list(res))
|
||||||
end = start + len(d0)
|
end = start + len(d0)
|
||||||
eep[start : end] = d0
|
eep[start:end] = d0
|
||||||
end = start + len(garbage)
|
end = start + len(garbage)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != b'this >xxxxxxxxxxxxx':
|
if res != b"this >xxxxxxxxxxxxx":
|
||||||
return 'Block test fail 2:' + str(list(res))
|
return "Block test fail 2:" + str(list(res))
|
||||||
start = bs
|
start = bs
|
||||||
end = bs + len(d1)
|
end = bs + len(d1)
|
||||||
eep[start : end] = d1
|
eep[start:end] = d1
|
||||||
start = bs - len(d0)
|
start = bs - len(d0)
|
||||||
end = start + len(d2)
|
end = start + len(d2)
|
||||||
res = eep[start : end]
|
res = eep[start:end]
|
||||||
if res != d2:
|
if res != d2:
|
||||||
return 'Block test fail 3:' + str(list(res))
|
return "Block test fail 3:" + str(list(res))
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
ram = get_spiram()
|
ram = get_spiram()
|
||||||
|
@ -67,51 +70,56 @@ def test():
|
||||||
ram[sa + v] = v
|
ram[sa + v] = v
|
||||||
for v in range(256):
|
for v in range(256):
|
||||||
if ram[sa + v] != v:
|
if ram[sa + v] != v:
|
||||||
print('Fail at address {} data {} should be {}'.format(sa + v, ram[sa + v], v))
|
print(
|
||||||
|
"Fail at address {} data {} should be {}".format(sa + v, ram[sa + v], v)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
print('Test of byte addressing passed')
|
print("Test of byte addressing passed")
|
||||||
data = os.urandom(30)
|
data = os.urandom(30)
|
||||||
sa = 2000
|
sa = 2000
|
||||||
ram[sa:sa + 30] = data
|
ram[sa : sa + 30] = data
|
||||||
if ram[sa:sa + 30] == data:
|
if ram[sa : sa + 30] == data:
|
||||||
print('Test of slice readback passed')
|
print("Test of slice readback passed")
|
||||||
# On SPIRAM the only meaningful block test is on a chip boundary.
|
# On SPIRAM the only meaningful block test is on a chip boundary.
|
||||||
block = ram._c_bytes
|
block = ram._c_bytes
|
||||||
if ram._a_bytes > block:
|
if ram._a_bytes > block:
|
||||||
res = _testblock(ram, block)
|
res = _testblock(ram, block)
|
||||||
if res is None:
|
if res is None:
|
||||||
print('Test chip boundary {} passed'.format(block))
|
print("Test chip boundary {} passed".format(block))
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary {} fail'.format(block))
|
print("Test chip boundary {} fail".format(block))
|
||||||
print(res)
|
print(res)
|
||||||
else:
|
else:
|
||||||
print('Test chip boundary skipped: only one chip!')
|
print("Test chip boundary skipped: only one chip!")
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF FILESYSTEM MOUNT *****
|
# ***** TEST OF FILESYSTEM MOUNT *****
|
||||||
def fstest():
|
def fstest():
|
||||||
ram = get_spiram()
|
ram = get_spiram()
|
||||||
os.VfsLfs2.mkfs(ram) # Format littlefs
|
os.VfsLfs2.mkfs(ram) # Format littlefs
|
||||||
try:
|
try:
|
||||||
os.mount(ram,'/ram')
|
os.mount(ram, "/ram")
|
||||||
except OSError: # Already mounted
|
except OSError: # Already mounted
|
||||||
pass
|
pass
|
||||||
print('Contents of "/": {}'.format(os.listdir('/')))
|
print('Contents of "/": {}'.format(os.listdir("/")))
|
||||||
print('Contents of "/ram": {}'.format(os.listdir('/ram')))
|
print('Contents of "/ram": {}'.format(os.listdir("/ram")))
|
||||||
print(os.statvfs('/ram'))
|
print(os.statvfs("/ram"))
|
||||||
|
|
||||||
|
|
||||||
def cptest():
|
def cptest():
|
||||||
ram = get_spiram()
|
ram = get_spiram()
|
||||||
if 'ram' in os.listdir('/'):
|
if "ram" in os.listdir("/"):
|
||||||
print('Device already mounted.')
|
print("Device already mounted.")
|
||||||
else:
|
else:
|
||||||
os.VfsLfs2.mkfs(ram) # Format littlefs
|
os.VfsLfs2.mkfs(ram) # Format littlefs
|
||||||
os.mount(ram,'/ram')
|
os.mount(ram, "/ram")
|
||||||
print('Formatted and mounted device.')
|
print("Formatted and mounted device.")
|
||||||
cp('/sd/spiram_test.py', '/ram/')
|
cp("/sd/spiram_test.py", "/ram/")
|
||||||
cp('/sd/spiram.py', '/ram/')
|
cp("/sd/spiram.py", "/ram/")
|
||||||
print('Contents of "/ram": {}'.format(os.listdir('/ram')))
|
print('Contents of "/ram": {}'.format(os.listdir("/ram")))
|
||||||
print(os.statvfs('/ram'))
|
print(os.statvfs("/ram"))
|
||||||
|
|
||||||
|
|
||||||
# ***** TEST OF HARDWARE *****
|
# ***** TEST OF HARDWARE *****
|
||||||
def full_test():
|
def full_test():
|
||||||
|
@ -120,9 +128,9 @@ def full_test():
|
||||||
page = 0
|
page = 0
|
||||||
for sa in range(0, len(ram), bsize):
|
for sa in range(0, len(ram), bsize):
|
||||||
data = os.urandom(bsize)
|
data = os.urandom(bsize)
|
||||||
ram[sa:sa + bsize] = data
|
ram[sa : sa + bsize] = data
|
||||||
if ram[sa:sa + bsize] == data:
|
if ram[sa : sa + bsize] == data:
|
||||||
print('Page {} passed'.format(page))
|
print("Page {} passed".format(page))
|
||||||
else:
|
else:
|
||||||
print('Page {} readback failed.'.format(page))
|
print("Page {} readback failed.".format(page))
|
||||||
page += 1
|
page += 1
|
||||||
|
|
Ładowanie…
Reference in New Issue