tools/pyboard.py: Add fs_{listdir,readfile,writefile,stat}.

These are for working with the filesystem when using pyboard.py as a
library, rather than at the command line.

- fs_listdir returns a list of tuples, in the same format as os.ilistdir().
- fs_readfile returns the contents of a file as a bytes object.
- fs_writefile allows writing a bytes object to a file.
- fs_stat returns an os.statresult.

All raise FileNotFoundError (or OSError(ENOENT) on Python 2) if the file is
not found (or PyboardError on other errors).

Updated fs_cp and fs_get to use fs_stat to compute file size.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
pull/10286/head
Jim Mussared 2022-12-21 11:36:18 +11:00 zatwierdzone przez Damien George
rodzic 6013d27dd5
commit aa64280666
1 zmienionych plików z 66 dodań i 3 usunięć

Wyświetl plik

@ -73,6 +73,8 @@ import struct
import sys
import time
from collections import namedtuple
try:
stdout = sys.stdout.buffer
except AttributeError:
@ -87,7 +89,15 @@ def stdout_write_bytes(b):
class PyboardError(Exception):
pass
def convert(self, info):
if len(self.args) >= 3:
if b"OSError" in self.args[2] and b"ENOENT" in self.args[2]:
return OSError(errno.ENOENT, info)
return self
listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
class TelnetToSerial:
@ -467,6 +477,7 @@ class Pyboard:
ret = ret.strip()
return ret
# In Python3, call as pyboard.exec(), see the setattr call below.
def exec_(self, command, data_consumer=None):
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
if ret_err:
@ -497,6 +508,34 @@ class Pyboard:
)
self.exec_(cmd, data_consumer=stdout_write_bytes)
def fs_listdir(self, src=""):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % (
("'%s'" % src) if src else ""
)
try:
buf.extend(b"[")
self.exec_(cmd, data_consumer=repr_consumer)
buf.extend(b"]")
except PyboardError as e:
raise e.convert(src)
return [
listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
for f in ast.literal_eval(buf.decode())
]
def fs_stat(self, src):
try:
self.exec_("import uos")
return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True))
except PyboardError as e:
raise e.convert(src)
def fs_cat(self, src, chunk_size=256):
cmd = (
"with open('%s') as f:\n while 1:\n"
@ -504,9 +543,33 @@ class Pyboard:
)
self.exec_(cmd, data_consumer=stdout_write_bytes)
def fs_readfile(self, src, chunk_size=256):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = (
"with open('%s', 'rb') as f:\n while 1:\n"
" b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
)
try:
self.exec_(cmd, data_consumer=repr_consumer)
except PyboardError as e:
raise e.convert(src)
return ast.literal_eval(buf.decode())
def fs_writefile(self, dest, data, chunk_size=256):
self.exec_("f=open('%s','wb')\nw=f.write" % dest)
while data:
chunk = data[:chunk_size]
self.exec_("w(" + repr(chunk) + ")")
data = data[len(chunk) :]
self.exec_("f.close()")
def fs_cp(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src))
src_size = self.fs_stat(src).st_size
written = 0
self.exec_("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest))
while True:
@ -520,7 +583,7 @@ class Pyboard:
def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src))
src_size = self.fs_stat(src).st_size
written = 0
self.exec_("f=open('%s','rb')\nr=f.read" % src)
with open(dest, "wb") as f: