# bdevice.py Hardware-agnostic base classes. # BlockDevice Base class for general block devices e.g. EEPROM, FRAM. # FlashDevice Base class for generic Flash memory (subclass of BlockDevice). # Released under the MIT License (MIT). See LICENSE. # Copyright (c) 2019 Peter Hinch # BlockDevice: hardware-independent class implementing the uos.AbstractBlockDev # protocol with extended interface. It supports littlefs. # http://docs.micropython.org/en/latest/reference/filesystem.html#custom-block-devices # The subclass must implement .readwrite which can read or write arbitrary amounts # of data to arbitrary addresses. IOW .readwrite handles physical block structure # while ioctl supports a virtual block size. class BlockDevice: def __init__(self, nbits, nchips, chip_size): self._c_bytes = chip_size # Size of chip in bytes self._a_bytes = chip_size * nchips # Size of array self._nbits = nbits # Block size in bits self._block_size = 2**nbits self._rwbuf = bytearray(1) def __len__(self): return self._a_bytes def __setitem__(self, addr, value): if isinstance(addr, slice): return self._wslice(addr, value) self._rwbuf[0] = value self.readwrite(addr, self._rwbuf, False) def __getitem__(self, addr): if isinstance(addr, slice): return self._rslice(addr) return self.readwrite(addr, self._rwbuf, True)[0] # Handle special cases of a slice. Always return a pair of positive indices. def _do_slice(self, addr): if not (addr.step is None or addr.step == 1): raise NotImplementedError('only slices with step=1 (aka None) are supported') start = addr.start if addr.start is not None else 0 stop = addr.stop if addr.stop is not None else self._a_bytes start = start if start >= 0 else self._a_bytes + start stop = stop if stop >= 0 else self._a_bytes + stop return start, stop def _wslice(self, addr, value): start, stop = self._do_slice(addr) try: if len(value) == (stop - start): res = self.readwrite(start, value, False) else: raise RuntimeError('Slice must have same length as data') except TypeError: raise RuntimeError('Can only assign bytes/bytearray to a slice') return res def _rslice(self, addr): start, stop = self._do_slice(addr) buf = bytearray(stop - start) return self.readwrite(start, buf, True) # IOCTL protocol. def readblocks(self, blocknum, buf, offset=0): self.readwrite(offset + (blocknum << self._nbits), buf, True) def writeblocks(self, blocknum, buf, offset=None): offset = 0 if offset is None else offset self.readwrite(offset + (blocknum << self._nbits), buf, False) def ioctl(self, op, arg): if op == 4: # BP_IOCTL_SEC_COUNT return self._a_bytes >> self._nbits if op == 5: # BP_IOCTL_SEC_SIZE return self._block_size if op == 6: # Erase return 0 # Hardware agnostic base class for flash memory, where a single sector is cached. # This minimises RAM usage. Under FAT wear is reduced if you cache at least two # sectors. This driver is primarily intended for littlefs which has no such issue. # Subclass must provide these hardware-dependent methods: # .rdchip(addr, mvb) Read from chip into memoryview: data guaranteed not to be cached. # .flush(cache, addr) Erase physical sector and write out an entire cached sector. # .readwrite As per base class. class FlashDevice(BlockDevice): def __init__(self, nbits, nchips, chip_size, sec_size): super().__init__(nbits, nchips, chip_size) self.sec_size = sec_size self._cache_mask = sec_size - 1 # For 4K sector size: 0xfff self._fmask = self._cache_mask ^ 0x3fffffff # 4K -> 0x3ffff000 self._cache = bytearray(sec_size) # Cache always contains one sector self._mvd = memoryview(self._cache) self._acache = 0 # Address in chip of byte 0 of current cached sector def read(self, addr, mvb): nbytes = len(mvb) next_sec = self._acache + self.sec_size # Start of next sector if addr >= next_sec or addr + nbytes <= self._acache: self.rdchip(addr, mvb) # No data is cached: just read from device else: # Some of address range is cached boff = 0 # Offset into buf if addr < self._acache: # Read data prior to cache from chip nr = self._acache - addr self.rdchip(addr, mvb[:nr]) addr = self._acache # Start of cached data nbytes -= nr boff += nr # addr now >= self._acache: read from cache. sa = addr - self._acache # Offset into cache nr = min(nbytes, self._acache + self.sec_size - addr) # No of bytes to read from cache mvb[boff : boff + nr] = self._mvd[sa : sa + nr] if nbytes - nr: # Get any remaining data from chip self.rdchip(addr + nr, mvb[boff + nr : ]) return mvb def synchronise(self): # print('SYNCHRONISE') self.flush(self._mvd, self._acache) # Write out old data # TODO Performance enhancement: if cache intersects address range, update it first. # Currently in this case it would be written twice. def write(self, addr, mvb): nbytes = len(mvb) acache = self._acache boff = 0 # Offset into buf. while nbytes: if (addr & self._fmask) != acache: self.synchronise() # Erase sector and write out old data self._fill_cache(addr) # Cache sector which includes addr offs = addr & self._cache_mask # Offset into cache npage = min(nbytes, self.sec_size - offs) # No. of bytes in current sector self._mvd[offs : offs + npage] = mvb[boff : boff + npage] nbytes -= npage boff += npage addr += npage return mvb # Cache the sector which contains a given byte addresss. Save sector # start address. def _fill_cache(self, addr): addr &= self._fmask self.rdchip(addr, self._mvd) self._acache = addr def initialise(self): self._fill_cache(0)