tools/mpremote: Detach mpremote from pyboard.py.

This commit just takes the necessary parts of pyboard.py and merges them
with pyboardextended.py to make a new transport_serial.py, and updates the
rest of mpremote to use this instead.

It is difficult to continue to add features to mpremote (which usually
requires modification to pyboard.py) while also maintaining backwards
compatibility for pyboard.py.

The idea is that this provides a starting point for further refactoring of
mpremote to allow different transports (webrepl, BLE, etc).

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
pull/11669/head
Jim Mussared 2023-05-31 14:19:14 +10:00 zatwierdzone przez Damien George
rodzic bd5d0163c4
commit b4d785fa20
7 zmienionych plików z 708 dodań i 188 usunięć

Wyświetl plik

@ -4,7 +4,8 @@ import tempfile
import serial.tools.list_ports
from . import pyboardextended as pyboard
from .transport import TransportError
from .transport_serial import SerialTransport
class CommandError(Exception):
@ -36,28 +37,28 @@ def do_connect(state, args=None):
for p in sorted(serial.tools.list_ports.comports()):
if p.vid is not None and p.pid is not None:
try:
state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
state.transport = SerialTransport(p.device, baudrate=115200)
return
except pyboard.PyboardError as er:
except TransportError as er:
if not er.args[0].startswith("failed to access"):
raise er
raise pyboard.PyboardError("no device found")
raise TransportError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
state.transport = SerialTransport(p.device, baudrate=115200)
return
raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
raise TransportError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
state.pyb = pyboard.PyboardExtended(dev, baudrate=115200)
state.transport = SerialTransport(dev, baudrate=115200)
return
except pyboard.PyboardError as er:
except TransportError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
@ -66,23 +67,23 @@ def do_connect(state, args=None):
def do_disconnect(state, _args=None):
if not state.pyb:
if not state.transport:
return
try:
if state.pyb.mounted:
if not state.pyb.in_raw_repl:
state.pyb.enter_raw_repl(soft_reset=False)
state.pyb.umount_local()
if state.pyb.in_raw_repl:
state.pyb.exit_raw_repl()
if state.transport.mounted:
if not state.transport.in_raw_repl:
state.transport.enter_raw_repl(soft_reset=False)
state.transport.umount_local()
if state.transport.in_raw_repl:
state.transport.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connection if it had an error
# - umounting will fail if serial port disappeared
pass
state.pyb.close()
state.pyb = None
state.transport.close()
state.transport = None
state._auto_soft_reset = True
@ -136,16 +137,17 @@ def do_filesystem(state, args):
raise CommandError("'cp -r' source files must be local")
_list_recursive(src_files, path)
known_dirs = {""}
state.pyb.exec_("import uos")
state.transport.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
state.pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
state.transport.exec_(
"try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d
)
known_dirs.add(d)
pyboard.filesystem_command(
state.pyb,
state.transport.filesystem_command(
["cp", "/".join((dir, file)), ":" + dir + "/"],
progress_callback=show_progress_bar,
verbose=verbose,
@ -154,8 +156,8 @@ def do_filesystem(state, args):
if args.recursive:
raise CommandError("'-r' only supported for 'cp'")
try:
pyboard.filesystem_command(
state.pyb, [command] + paths, progress_callback=show_progress_bar, verbose=verbose
state.transport.filesystem_command(
[command] + paths, progress_callback=show_progress_bar, verbose=verbose
)
except OSError as er:
raise CommandError(er)
@ -166,17 +168,17 @@ def do_edit(state, args):
state.did_action()
if not os.getenv("EDITOR"):
raise pyboard.PyboardError("edit: $EDITOR not set")
raise TransportError("edit: $EDITOR not set")
for src in args.files:
src = src.lstrip(":")
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
os.close(dest_fd)
state.pyb.fs_touch(src)
state.pyb.fs_get(src, dest, progress_callback=show_progress_bar)
state.transport.fs_touch(src)
state.transport.fs_get(src, dest, progress_callback=show_progress_bar)
if os.system('%s "%s"' % (os.getenv("EDITOR"), dest)) == 0:
state.pyb.fs_put(dest, src, progress_callback=show_progress_bar)
state.transport.fs_put(dest, src, progress_callback=show_progress_bar)
finally:
os.unlink(dest)
@ -186,13 +188,15 @@ def _do_execbuffer(state, buf, follow):
state.did_action()
try:
state.pyb.exec_raw_no_follow(buf)
state.transport.exec_raw_no_follow(buf)
if follow:
ret, ret_err = state.pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
ret, ret_err = state.transport.follow(
timeout=None, data_consumer=pyboard.stdout_write_bytes
)
if ret_err:
pyboard.stdout_write_bytes(ret_err)
sys.exit(1)
except pyboard.PyboardError as er:
except TransportError as er:
print(er)
sys.exit(1)
except KeyboardInterrupt:
@ -221,13 +225,13 @@ def do_run(state, args):
def do_mount(state, args):
state.ensure_raw_repl()
path = args.path[0]
state.pyb.mount_local(path, unsafe_links=args.unsafe_links)
state.transport.mount_local(path, unsafe_links=args.unsafe_links)
print(f"Local directory {path} is mounted at /remote")
def do_umount(state, path):
state.ensure_raw_repl()
state.pyb.umount_local()
state.transport.umount_local()
def do_resume(state, _args=None):

Wyświetl plik

@ -449,7 +449,7 @@ def do_command_expansion(args):
class State:
def __init__(self):
self.pyb = None
self.transport = None
self._did_action = False
self._auto_soft_reset = True
@ -460,20 +460,20 @@ class State:
return not self._did_action
def ensure_connected(self):
if self.pyb is None:
if self.transport is None:
do_connect(self)
def ensure_raw_repl(self, soft_reset=None):
self.ensure_connected()
soft_reset = self._auto_soft_reset if soft_reset is None else soft_reset
if soft_reset or not self.pyb.in_raw_repl:
self.pyb.enter_raw_repl(soft_reset=soft_reset)
if soft_reset or not self.transport.in_raw_repl:
self.transport.enter_raw_repl(soft_reset=soft_reset)
self._auto_soft_reset = False
def ensure_friendly_repl(self):
self.ensure_connected()
if self.pyb.in_raw_repl:
self.pyb.exit_raw_repl()
if self.transport.in_raw_repl:
self.transport.exit_raw_repl()
def main():

Wyświetl plik

@ -16,7 +16,7 @@ _CHUNK_SIZE = 128
# This implements os.makedirs(os.dirname(path))
def _ensure_path_exists(pyb, path):
def _ensure_path_exists(transport, path):
import os
split = path.split("/")
@ -29,8 +29,8 @@ def _ensure_path_exists(pyb, path):
prefix = ""
for i in range(len(split) - 1):
prefix += split[i]
if not pyb.fs_exists(prefix):
pyb.fs_mkdir(prefix)
if not transport.fs_exists(prefix):
transport.fs_mkdir(prefix)
prefix += "/"
@ -68,7 +68,7 @@ def _rewrite_url(url, branch=None):
return url
def _download_file(pyb, url, dest):
def _download_file(transport, url, dest):
try:
with urllib.request.urlopen(url) as src:
fd, path = tempfile.mkstemp()
@ -76,8 +76,8 @@ def _download_file(pyb, url, dest):
print("Installing:", dest)
with os.fdopen(fd, "wb") as f:
_chunk(src, f.write, src.length)
_ensure_path_exists(pyb, dest)
pyb.fs_put(path, dest, progress_callback=show_progress_bar)
_ensure_path_exists(transport, dest)
transport.fs_put(path, dest, progress_callback=show_progress_bar)
finally:
os.unlink(path)
except urllib.error.HTTPError as e:
@ -89,7 +89,7 @@ def _download_file(pyb, url, dest):
raise CommandError(f"{e.reason} requesting {url}")
def _install_json(pyb, package_json_url, index, target, version, mpy):
def _install_json(transport, package_json_url, index, target, version, mpy):
try:
with urllib.request.urlopen(_rewrite_url(package_json_url, version)) as response:
package_json = json.load(response)
@ -103,15 +103,15 @@ def _install_json(pyb, package_json_url, index, target, version, mpy):
for target_path, short_hash in package_json.get("hashes", ()):
fs_target_path = target + "/" + target_path
file_url = f"{index}/file/{short_hash[:2]}/{short_hash}"
_download_file(pyb, file_url, fs_target_path)
_download_file(transport, file_url, fs_target_path)
for target_path, url in package_json.get("urls", ()):
fs_target_path = target + "/" + target_path
_download_file(pyb, _rewrite_url(url, version), fs_target_path)
_download_file(transport, _rewrite_url(url, version), fs_target_path)
for dep, dep_version in package_json.get("deps", ()):
_install_package(pyb, dep, index, target, dep_version, mpy)
_install_package(transport, dep, index, target, dep_version, mpy)
def _install_package(pyb, package, index, target, version, mpy):
def _install_package(transport, package, index, target, version, mpy):
if (
package.startswith("http://")
or package.startswith("https://")
@ -120,7 +120,7 @@ def _install_package(pyb, package, index, target, version, mpy):
if package.endswith(".py") or package.endswith(".mpy"):
print(f"Downloading {package} to {target}")
_download_file(
pyb, _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1]
transport, _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1]
)
return
else:
@ -136,14 +136,15 @@ def _install_package(pyb, package, index, target, version, mpy):
mpy_version = "py"
if mpy:
pyb.exec("import sys")
transport.exec("import sys")
mpy_version = (
int(pyb.eval("getattr(sys.implementation, '_mpy', 0) & 0xFF").decode()) or "py"
int(transport.eval("getattr(sys.implementation, '_mpy', 0) & 0xFF").decode())
or "py"
)
package = f"{index}/package/{mpy_version}/{package}/{version}.json"
_install_json(pyb, package, index, target, version, mpy)
_install_json(transport, package, index, target, version, mpy)
def do_mip(state, args):
@ -163,9 +164,9 @@ def do_mip(state, args):
args.index = _PACKAGE_INDEX
if args.target is None:
state.pyb.exec("import sys")
state.transport.exec("import sys")
lib_paths = (
state.pyb.eval("'\\n'.join(p for p in sys.path if p.endswith('/lib'))")
state.transport.eval("'\\n'.join(p for p in sys.path if p.endswith('/lib'))")
.decode()
.split("\n")
)
@ -181,7 +182,12 @@ def do_mip(state, args):
try:
_install_package(
state.pyb, package, args.index.rstrip("/"), args.target, version, args.mpy
state.transport,
package,
args.index.rstrip("/"),
args.target,
version,
args.mpy,
)
except CommandError:
print("Package may be partially installed")

Wyświetl plik

@ -1,45 +1,45 @@
from .console import Console, ConsolePosix
from . import pyboardextended as pyboard
from .transport import TransportError
def do_repl_main_loop(
state, console_in, console_out_write, *, escape_non_printable, code_to_inject, file_to_inject
):
while True:
console_in.waitchar(state.pyb.serial)
console_in.waitchar(state.transport.serial)
c = console_in.readchar()
if c:
if c in (b"\x1d", b"\x18"): # ctrl-] or ctrl-x, quit
break
elif c == b"\x04": # ctrl-D
# special handling needed for ctrl-D if filesystem is mounted
state.pyb.write_ctrl_d(console_out_write)
state.transport.write_ctrl_d(console_out_write)
elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
state.pyb.serial.write(code_to_inject)
state.transport.serial.write(code_to_inject)
elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
state.pyb.enter_raw_repl(soft_reset=False)
state.transport.enter_raw_repl(soft_reset=False)
with open(file_to_inject, "rb") as f:
pyfile = f.read()
try:
state.pyb.exec_raw_no_follow(pyfile)
except pyboard.PyboardError as er:
state.transport.exec_raw_no_follow(pyfile)
except TransportError as er:
console_out_write(b"Error:\r\n")
console_out_write(er)
state.pyb.exit_raw_repl()
state.transport.exit_raw_repl()
else:
state.pyb.serial.write(c)
state.transport.serial.write(c)
try:
n = state.pyb.serial.inWaiting()
n = state.transport.serial.inWaiting()
except OSError as er:
if er.args[0] == 5: # IO error, device disappeared
print("device disconnected")
break
if n > 0:
dev_data_in = state.pyb.serial.read(n)
dev_data_in = state.transport.serial.read(n)
if dev_data_in is not None:
if escape_non_printable:
# Pass data through to the console, with escaping of non-printables.
@ -63,7 +63,7 @@ def do_repl(state, args):
code_to_inject = args.inject_code
file_to_inject = args.inject_file
print("Connected to MicroPython at %s" % state.pyb.device_name)
print("Connected to MicroPython at %s" % state.transport.device_name)
print("Use Ctrl-] or Ctrl-x to exit this shell")
if escape_non_printable:
print("Escaping non-printable bytes/characters by printing their hex code")

Wyświetl plik

@ -0,0 +1,33 @@
#!/usr/bin/env python3
#
# This file is part of the MicroPython project, http://micropython.org/
#
# The MIT License (MIT)
#
# Copyright (c) 2023 Jim Mussared
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
class TransportError(Exception):
pass
class Transport:
pass

Wyświetl plik

@ -1,14 +1,604 @@
import io, os, re, struct, time
#!/usr/bin/env python3
#
# This file is part of the MicroPython project, http://micropython.org/
#
# The MIT License (MIT)
#
# Copyright (c) 2014-2021 Damien P. George
# Copyright (c) 2017 Paul Sokolovsky
# Copyright (c) 2023 Jim Mussared
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# This is based on the serial-only parts of tools/pyboard.py, with Python 2
# support removed, and is currently in the process of being refactored to
# support multiple transports (webrepl, socket, BLE, etc). At the moment,
# SerialTransport is just the old Pyboard+PyboardExtended class without any
# of this refactoring. The API is going to change significantly.
# Once the API is stabilised, the idea is that mpremote can be used both
# as a command line tool and a library for interacting with devices.
import ast, io, errno, os, re, struct, sys, time
from collections import namedtuple
from errno import EPERM
from .console import VT_ENABLED
from .transport import TransportError, Transport
try:
from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
except ImportError:
import sys
sys.path.append(os.path.dirname(__file__) + "/../..")
from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
sys.stdout.buffer.write(b)
sys.stdout.buffer.flush()
listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
def reraise_filesystem_error(e, info):
if len(e.args) >= 3:
if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]:
raise FileNotFoundError(info)
raise
class SerialTransport(Transport):
def __init__(self, device, baudrate=115200, wait=0, exclusive=True):
self.in_raw_repl = False
self.use_raw_paste = True
self.device_name = device
self.mounted = False
import serial
import serial.tools.list_ports
# Set options, and exclusive if pyserial supports it
serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1}
if serial.__version__ >= "3.3":
serial_kwargs["exclusive"] = exclusive
delayed = False
for attempt in range(wait + 1):
try:
if os.name == "nt":
self.serial = serial.Serial(**serial_kwargs)
self.serial.port = device
portinfo = list(serial.tools.list_ports.grep(device)) # type: ignore
if portinfo and portinfo[0].manufacturer != "Microsoft":
# ESP8266/ESP32 boards use RTS/CTS for flashing and boot mode selection.
# DTR False: to avoid using the reset button will hang the MCU in bootloader mode
# RTS False: to prevent pulses on rts on serial.close() that would POWERON_RESET an ESPxx
self.serial.dtr = False # DTR False = gpio0 High = Normal boot
self.serial.rts = False # RTS False = EN High = MCU enabled
self.serial.open()
else:
self.serial = serial.Serial(device, **serial_kwargs)
break
except OSError:
if wait == 0:
continue
if attempt == 0:
sys.stdout.write("Waiting {} seconds for pyboard ".format(wait))
delayed = True
time.sleep(1)
sys.stdout.write(".")
sys.stdout.flush()
else:
if delayed:
print("")
raise TransportError("failed to access " + device)
if delayed:
print("")
def close(self):
self.serial.close()
def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
# if data_consumer is used then data is not accumulated and the ending must be 1 byte long
assert data_consumer is None or len(ending) == 1
data = self.serial.read(min_num_bytes)
if data_consumer:
data_consumer(data)
timeout_count = 0
while True:
if data.endswith(ending):
break
elif self.serial.inWaiting() > 0:
new_data = self.serial.read(1)
if data_consumer:
data_consumer(new_data)
data = new_data
else:
data = data + new_data
timeout_count = 0
else:
timeout_count += 1
if timeout is not None and timeout_count >= 100 * timeout:
break
time.sleep(0.01)
return data
def enter_raw_repl(self, soft_reset=True):
self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program
# flush input (without relying on serial.flushInput())
n = self.serial.inWaiting()
while n > 0:
self.serial.read(n)
n = self.serial.inWaiting()
self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL
if soft_reset:
data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>")
if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"):
print(data)
raise TransportError("could not enter raw repl")
self.serial.write(b"\x04") # ctrl-D: soft reset
# Waiting for "soft reboot" independently to "raw REPL" (done below)
# allows boot.py to print, which will show up after "soft reboot"
# and before "raw REPL".
data = self.read_until(1, b"soft reboot\r\n")
if not data.endswith(b"soft reboot\r\n"):
print(data)
raise TransportError("could not enter raw repl")
data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n")
if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"):
print(data)
raise TransportError("could not enter raw repl")
self.in_raw_repl = True
def exit_raw_repl(self):
self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL
self.in_raw_repl = False
def follow(self, timeout, data_consumer=None):
# wait for normal output
data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer)
if not data.endswith(b"\x04"):
raise TransportError("timeout waiting for first EOF reception")
data = data[:-1]
# wait for error output
data_err = self.read_until(1, b"\x04", timeout=timeout)
if not data_err.endswith(b"\x04"):
raise TransportError("timeout waiting for second EOF reception")
data_err = data_err[:-1]
# return normal and error output
return data, data_err
def raw_paste_write(self, command_bytes):
# Read initial header, with window size.
data = self.serial.read(2)
window_size = struct.unpack("<H", data)[0]
window_remain = window_size
# Write out the command_bytes data.
i = 0
while i < len(command_bytes):
while window_remain == 0 or self.serial.inWaiting():
data = self.serial.read(1)
if data == b"\x01":
# Device indicated that a new window of data can be sent.
window_remain += window_size
elif data == b"\x04":
# Device indicated abrupt end. Acknowledge it and finish.
self.serial.write(b"\x04")
return
else:
# Unexpected data from device.
raise TransportError("unexpected read during raw paste: {}".format(data))
# Send out as much data as possible that fits within the allowed window.
b = command_bytes[i : min(i + window_remain, len(command_bytes))]
self.serial.write(b)
window_remain -= len(b)
i += len(b)
# Indicate end of data.
self.serial.write(b"\x04")
# Wait for device to acknowledge end of data.
data = self.read_until(1, b"\x04")
if not data.endswith(b"\x04"):
raise TransportError("could not complete raw paste: {}".format(data))
def exec_raw_no_follow(self, command):
if isinstance(command, bytes):
command_bytes = command
else:
command_bytes = bytes(command, encoding="utf8")
# check we have a prompt
data = self.read_until(1, b">")
if not data.endswith(b">"):
raise TransportError("could not enter raw repl")
if self.use_raw_paste:
# Try to enter raw-paste mode.
self.serial.write(b"\x05A\x01")
data = self.serial.read(2)
if data == b"R\x00":
# Device understood raw-paste command but doesn't support it.
pass
elif data == b"R\x01":
# Device supports raw-paste mode, write out the command using this mode.
return self.raw_paste_write(command_bytes)
else:
# Device doesn't support raw-paste, fall back to normal raw REPL.
data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>")
if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"):
print(data)
raise TransportError("could not enter raw repl")
# Don't try to use raw-paste mode again for this connection.
self.use_raw_paste = False
# Write command using standard raw REPL, 256 bytes every 10ms.
for i in range(0, len(command_bytes), 256):
self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))])
time.sleep(0.01)
self.serial.write(b"\x04")
# check if we could exec command
data = self.serial.read(2)
if data != b"OK":
raise TransportError("could not exec command (response: %r)" % data)
def exec_raw(self, command, timeout=10, data_consumer=None):
self.exec_raw_no_follow(command)
return self.follow(timeout, data_consumer)
def eval(self, expression, parse=False):
if parse:
ret = self.exec("print(repr({}))".format(expression))
ret = ret.strip()
return ast.literal_eval(ret.decode())
else:
ret = self.exec("print({})".format(expression))
ret = ret.strip()
return ret
def exec(self, command, data_consumer=None):
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
if ret_err:
raise TransportError("exception", ret, ret_err)
return ret
def execfile(self, filename):
with open(filename, "rb") as f:
pyfile = f.read()
return self.exec(pyfile)
def fs_exists(self, src):
try:
self.exec("import uos\nuos.stat(%s)" % (("'%s'" % src) if src else ""))
return True
except TransportError:
return False
def fs_ls(self, src):
cmd = (
"import uos\nfor f in uos.ilistdir(%s):\n"
" print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
% (("'%s'" % src) if src else "")
)
self.exec(cmd, data_consumer=stdout_write_bytes)
def fs_listdir(self, src=""):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % (
("'%s'" % src) if src else ""
)
try:
buf.extend(b"[")
self.exec(cmd, data_consumer=repr_consumer)
buf.extend(b"]")
except TransportError as e:
reraise_filesystem_error(e, src)
return [
listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
for f in ast.literal_eval(buf.decode())
]
def fs_stat(self, src):
try:
self.exec("import uos")
return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True))
except TransportError as e:
reraise_filesystem_error(e, src)
def fs_cat(self, src, chunk_size=256):
cmd = (
"with open('%s') as f:\n while 1:\n"
" b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
)
self.exec(cmd, data_consumer=stdout_write_bytes)
def fs_readfile(self, src, chunk_size=256):
buf = bytearray()
def repr_consumer(b):
buf.extend(b.replace(b"\x04", b""))
cmd = (
"with open('%s', 'rb') as f:\n while 1:\n"
" b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
)
try:
self.exec(cmd, data_consumer=repr_consumer)
except TransportError as e:
reraise_filesystem_error(e, src)
return ast.literal_eval(buf.decode())
def fs_writefile(self, dest, data, chunk_size=256):
self.exec("f=open('%s','wb')\nw=f.write" % dest)
while data:
chunk = data[:chunk_size]
self.exec("w(" + repr(chunk) + ")")
data = data[len(chunk) :]
self.exec("f.close()")
def fs_cp(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = self.fs_stat(src).st_size
written = 0
self.exec("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest))
while True:
data_len = int(self.exec("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size))
if not data_len:
break
if progress_callback:
written += data_len
progress_callback(written, src_size)
self.exec("fr.close()\nfw.close()")
def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = self.fs_stat(src).st_size
written = 0
self.exec("f=open('%s','rb')\nr=f.read" % src)
with open(dest, "wb") as f:
while True:
data = bytearray()
self.exec("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d))
assert data.endswith(b"\r\n\x04")
try:
data = ast.literal_eval(str(data[:-3], "ascii"))
if not isinstance(data, bytes):
raise ValueError("Not bytes")
except (UnicodeError, ValueError) as e:
raise TransportError("fs_get: Could not interpret received data: %s" % str(e))
if not data:
break
f.write(data)
if progress_callback:
written += len(data)
progress_callback(written, src_size)
self.exec("f.close()")
def fs_put(self, src, dest, chunk_size=256, progress_callback=None):
if progress_callback:
src_size = os.path.getsize(src)
written = 0
self.exec("f=open('%s','wb')\nw=f.write" % dest)
with open(src, "rb") as f:
while True:
data = f.read(chunk_size)
if not data:
break
if sys.version_info < (3,):
self.exec("w(b" + repr(data) + ")")
else:
self.exec("w(" + repr(data) + ")")
if progress_callback:
written += len(data)
progress_callback(written, src_size)
self.exec("f.close()")
def fs_mkdir(self, dir):
self.exec("import uos\nuos.mkdir('%s')" % dir)
def fs_rmdir(self, dir):
self.exec("import uos\nuos.rmdir('%s')" % dir)
def fs_rm(self, src):
self.exec("import uos\nuos.remove('%s')" % src)
def fs_touch(self, src):
self.exec("f=open('%s','a')\nf.close()" % src)
def filesystem_command(self, args, progress_callback=None, verbose=False):
def fname_remote(src):
if src.startswith(":"):
src = src[1:]
# Convert all path separators to "/", because that's what a remote device uses.
return src.replace(os.path.sep, "/")
def fname_cp_dest(src, dest):
_, src = os.path.split(src)
if dest is None or dest == "":
dest = src
elif dest == ".":
dest = "./" + src
elif dest.endswith("/"):
dest += src
return dest
cmd = args[0]
args = args[1:]
try:
if cmd == "cp":
srcs = args[:-1]
dest = args[-1]
if dest.startswith(":"):
op_remote_src = self.fs_cp
op_local_src = self.fs_put
else:
op_remote_src = self.fs_get
op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest)
for src in srcs:
if verbose:
print("cp %s %s" % (src, dest))
if src.startswith(":"):
op = op_remote_src
else:
op = op_local_src
src2 = fname_remote(src)
dest2 = fname_cp_dest(src2, fname_remote(dest))
op(src2, dest2, progress_callback=progress_callback)
else:
ops = {
"cat": self.fs_cat,
"ls": self.fs_ls,
"mkdir": self.fs_mkdir,
"rm": self.fs_rm,
"rmdir": self.fs_rmdir,
"touch": self.fs_touch,
}
if cmd not in ops:
raise TransportError("'{}' is not a filesystem command".format(cmd))
if cmd == "ls" and not args:
args = [""]
for src in args:
src = fname_remote(src)
if verbose:
print("%s :%s" % (cmd, src))
ops[cmd](src)
except TransportError as er:
if len(er.args) > 1:
print(str(er.args[2], "ascii"))
else:
print(er)
self.exit_raw_repl()
self.close()
sys.exit(1)
def mount_local(self, path, unsafe_links=False):
fout = self.serial
if self.eval('"RemoteFS" in globals()') == b"False":
self.exec(fs_hook_code)
self.exec("__mount()")
self.mounted = True
self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links)
self.serial = SerialIntercept(self.serial, self.cmd)
def write_ctrl_d(self, out_callback):
self.serial.write(b"\x04")
if not self.mounted:
return
# Read response from the device until it is quiet (with a timeout).
INITIAL_TIMEOUT = 0.5
BANNER_TIMEOUT = 2
QUIET_TIMEOUT = 0.1
FULL_TIMEOUT = 5
t_start = t_last_activity = time.monotonic()
data_all = b""
soft_reboot_started = False
soft_reboot_banner = False
while True:
t = time.monotonic()
n = self.serial.inWaiting()
if n > 0:
data = self.serial.read(n)
out_callback(data)
data_all += data
t_last_activity = t
else:
if len(data_all) == 0:
if t - t_start > INITIAL_TIMEOUT:
return
else:
if t - t_start > FULL_TIMEOUT:
if soft_reboot_started:
break
return
next_data_timeout = QUIET_TIMEOUT
if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1:
soft_reboot_started = True
if soft_reboot_started and not soft_reboot_banner:
# Once soft reboot has been initiated, give some more time for the startup
# banner to be shown
if data_all.find(b"\nMicroPython ") != -1:
soft_reboot_banner = True
elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1:
soft_reboot_banner = True
else:
next_data_timeout = BANNER_TIMEOUT
if t - t_last_activity > next_data_timeout:
break
if not soft_reboot_started:
return
if not soft_reboot_banner:
out_callback(b"Warning: Could not remount local filesystem\r\n")
return
# Determine type of prompt
if data_all.endswith(b">"):
in_friendly_repl = False
prompt = b">"
else:
in_friendly_repl = True
prompt = data_all.rsplit(b"\r\n", 1)[-1]
# Clear state while board remounts, it will be re-set once mounted.
self.mounted = False
self.serial = self.serial.orig_serial
# Provide a message about the remount.
out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8"))
# Enter raw REPL and re-mount the remote filesystem.
self.serial.write(b"\x01")
self.exec(fs_hook_code)
self.exec("__mount()")
self.mounted = True
# Exit raw REPL if needed, and wait for the friendly REPL prompt.
if in_friendly_repl:
self.exit_raw_repl()
self.read_until(len(prompt), prompt)
out_callback(prompt)
self.serial = SerialIntercept(self.serial, self.cmd)
def umount_local(self):
if self.mounted:
self.exec('uos.umount("/remote")')
self.mounted = False
self.serial = self.serial.orig_serial
fs_hook_cmds = {
"CMD_STAT": 1,
@ -617,110 +1207,3 @@ class SerialIntercept:
def write(self, buf):
self.orig_serial.write(buf)
class PyboardExtended(Pyboard):
def __init__(self, dev, *args, **kwargs):
super().__init__(dev, *args, **kwargs)
self.device_name = dev
self.mounted = False
def mount_local(self, path, unsafe_links=False):
fout = self.serial
if self.eval('"RemoteFS" in globals()') == b"False":
self.exec_(fs_hook_code)
self.exec_("__mount()")
self.mounted = True
self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links)
self.serial = SerialIntercept(self.serial, self.cmd)
def write_ctrl_d(self, out_callback):
self.serial.write(b"\x04")
if not self.mounted:
return
# Read response from the device until it is quiet (with a timeout).
INITIAL_TIMEOUT = 0.5
BANNER_TIMEOUT = 2
QUIET_TIMEOUT = 0.1
FULL_TIMEOUT = 5
t_start = t_last_activity = time.monotonic()
data_all = b""
soft_reboot_started = False
soft_reboot_banner = False
while True:
t = time.monotonic()
n = self.serial.inWaiting()
if n > 0:
data = self.serial.read(n)
out_callback(data)
data_all += data
t_last_activity = t
else:
if len(data_all) == 0:
if t - t_start > INITIAL_TIMEOUT:
return
else:
if t - t_start > FULL_TIMEOUT:
if soft_reboot_started:
break
return
next_data_timeout = QUIET_TIMEOUT
if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1:
soft_reboot_started = True
if soft_reboot_started and not soft_reboot_banner:
# Once soft reboot has been initiated, give some more time for the startup
# banner to be shown
if data_all.find(b"\nMicroPython ") != -1:
soft_reboot_banner = True
elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1:
soft_reboot_banner = True
else:
next_data_timeout = BANNER_TIMEOUT
if t - t_last_activity > next_data_timeout:
break
if not soft_reboot_started:
return
if not soft_reboot_banner:
out_callback(b"Warning: Could not remount local filesystem\r\n")
return
# Determine type of prompt
if data_all.endswith(b">"):
in_friendly_repl = False
prompt = b">"
else:
in_friendly_repl = True
prompt = data_all.rsplit(b"\r\n", 1)[-1]
# Clear state while board remounts, it will be re-set once mounted.
self.mounted = False
self.serial = self.serial.orig_serial
# Provide a message about the remount.
out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8"))
# Enter raw REPL and re-mount the remote filesystem.
self.serial.write(b"\x01")
self.exec_(fs_hook_code)
self.exec_("__mount()")
self.mounted = True
# Exit raw REPL if needed, and wait for the friendly REPL prompt.
if in_friendly_repl:
self.exit_raw_repl()
self.read_until(len(prompt), prompt)
out_callback(prompt)
self.serial = SerialIntercept(self.serial, self.cmd)
def umount_local(self):
if self.mounted:
self.exec_('uos.umount("/remote")')
self.mounted = False
self.serial = self.serial.orig_serial

Wyświetl plik

@ -44,11 +44,5 @@ raw-options = { root = "../..", version_scheme = "post-release" }
[tool.hatch.build]
packages = ["mpremote"]
# Also grab pyboard.py from /tools and add it to the package for both wheel and sdist.
[tool.hatch.build.force-include]
"../pyboard.py" = "mpremote/pyboard.py"
# Workaround to allow `python -m build` to work.
[tool.hatch.build.targets.sdist.force-include]
"../pyboard.py" = "mpremote/pyboard.py"
"requirements.txt" = "requirements.txt"