micropython/net: Add "webrepl" server from main repo.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
pull/525/head
Jim Mussared 2022-09-06 13:25:21 +10:00
rodzic cf5ed97b4d
commit cc2cdeb94b
3 zmienionych plików z 286 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,2 @@
module("webrepl.py", opt=3)
module("webrepl_setup.py", opt=3)

Wyświetl plik

@ -0,0 +1,177 @@
# This module should be imported from REPL, not run from command line.
import binascii
import hashlib
import network
import os
import socket
import sys
import websocket
import _webrepl
listen_s = None
client_s = None
DEBUG = 0
_DEFAULT_STATIC_HOST = const("https://micropython.org/webrepl/")
static_host = _DEFAULT_STATIC_HOST
def server_handshake(cl):
req = cl.makefile("rwb", 0)
# Skip HTTP GET line.
l = req.readline()
if DEBUG:
sys.stdout.write(repr(l))
webkey = None
upgrade = False
websocket = False
while True:
l = req.readline()
if not l:
# EOF in headers.
return False
if l == b"\r\n":
break
if DEBUG:
sys.stdout.write(l)
h, v = [x.strip() for x in l.split(b":", 1)]
if DEBUG:
print((h, v))
if h == b"Sec-WebSocket-Key":
webkey = v
elif h == b"Connection" and b"Upgrade" in v:
upgrade = True
elif h == b"Upgrade" and v == b"websocket":
websocket = True
if not (upgrade and websocket and webkey):
return False
if DEBUG:
print("Sec-WebSocket-Key:", webkey, len(webkey))
d = hashlib.sha1(webkey)
d.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
respkey = d.digest()
respkey = binascii.b2a_base64(respkey)[:-1]
if DEBUG:
print("respkey:", respkey)
cl.send(
b"""\
HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: """
)
cl.send(respkey)
cl.send("\r\n\r\n")
return True
def send_html(cl):
cl.send(
b"""\
HTTP/1.0 200 OK\r
\r
<base href=\""""
)
cl.send(static_host)
cl.send(
b"""\"></base>\r
<script src="webrepl_content.js"></script>\r
"""
)
cl.close()
def setup_conn(port, accept_handler):
global listen_s
listen_s = socket.socket()
listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port)
addr = ai[0][4]
listen_s.bind(addr)
listen_s.listen(1)
if accept_handler:
listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("WebREPL server started on http://%s:%d/" % (iface.ifconfig()[0], port))
return listen_s
def accept_conn(listen_sock):
global client_s
cl, remote_addr = listen_sock.accept()
if not server_handshake(cl):
send_html(cl)
return False
prev = os.dupterm(None)
os.dupterm(prev)
if prev:
print("\nConcurrent WebREPL connection from", remote_addr, "rejected")
cl.close()
return False
print("\nWebREPL connection from:", remote_addr)
client_s = cl
ws = websocket.websocket(cl, True)
ws = _webrepl._webrepl(ws)
cl.setblocking(False)
# notify REPL on socket incoming data (ESP32/ESP8266-only)
if hasattr(os, "dupterm_notify"):
cl.setsockopt(socket.SOL_SOCKET, 20, os.dupterm_notify)
os.dupterm(ws)
return True
def stop():
global listen_s, client_s
os.dupterm(None)
if client_s:
client_s.close()
if listen_s:
listen_s.close()
def start(port=8266, password=None, accept_handler=accept_conn):
global static_host
stop()
webrepl_pass = password
if webrepl_pass is None:
try:
import webrepl_cfg
webrepl_pass = webrepl_cfg.PASS
if hasattr(webrepl_cfg, "BASE"):
static_host = webrepl_cfg.BASE
except:
print("WebREPL is not configured, run 'import webrepl_setup'")
_webrepl.password(webrepl_pass)
s = setup_conn(port, accept_handler)
if accept_handler is None:
print("Starting webrepl in foreground mode")
# Run accept_conn to serve HTML until we get a websocket connection.
while not accept_conn(s):
pass
elif password is None:
print("Started webrepl in normal mode")
else:
print("Started webrepl in manual override mode")
def start_foreground(port=8266, password=None):
start(port, password, None)

Wyświetl plik

@ -0,0 +1,107 @@
import sys
import os
import machine
RC = "./boot.py"
CONFIG = "./webrepl_cfg.py"
def input_choice(prompt, choices):
while 1:
resp = input(prompt)
if resp in choices:
return resp
def getpass(prompt):
return input(prompt)
def input_pass():
while 1:
passwd1 = getpass("New password (4-9 chars): ")
if len(passwd1) < 4 or len(passwd1) > 9:
print("Invalid password length")
continue
passwd2 = getpass("Confirm password: ")
if passwd1 == passwd2:
return passwd1
print("Passwords do not match")
def exists(fname):
try:
with open(fname):
pass
return True
except OSError:
return False
def get_daemon_status():
with open(RC) as f:
for l in f:
if "webrepl" in l:
if l.startswith("#"):
return False
return True
return None
def change_daemon(action):
LINES = ("import webrepl", "webrepl.start()")
with open(RC) as old_f, open(RC + ".tmp", "w") as new_f:
found = False
for l in old_f:
for patt in LINES:
if patt in l:
found = True
if action and l.startswith("#"):
l = l[1:]
elif not action and not l.startswith("#"):
l = "#" + l
new_f.write(l)
if not found:
new_f.write("import webrepl\nwebrepl.start()\n")
# FatFs rename() is not POSIX compliant, will raise OSError if
# dest file exists.
os.remove(RC)
os.rename(RC + ".tmp", RC)
def main():
status = get_daemon_status()
print("WebREPL daemon auto-start status:", "enabled" if status else "disabled")
print("\nWould you like to (E)nable or (D)isable it running on boot?")
print("(Empty line to quit)")
resp = input("> ").upper()
if resp == "E":
if exists(CONFIG):
resp2 = input_choice(
"Would you like to change WebREPL password? (y/n) ", ("y", "n", "")
)
else:
print("To enable WebREPL, you must set password for it")
resp2 = "y"
if resp2 == "y":
passwd = input_pass()
with open(CONFIG, "w") as f:
f.write("PASS = %r\n" % passwd)
if resp not in ("D", "E") or (resp == "D" and not status) or (resp == "E" and status):
print("No further action required")
sys.exit()
change_daemon(resp == "E")
print("Changes will be activated after reboot")
resp = input_choice("Would you like to reboot now? (y/n) ", ("y", "n", ""))
if resp == "y":
machine.reset()
main()