felixdoerre 2025-04-27 14:28:24 +00:00 zatwierdzone przez GitHub
commit 88c1f05a9c
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
3 zmienionych plików z 160 dodań i 27 usunięć

Wyświetl plik

@ -0,0 +1,53 @@
class LegacyFileTransfer:
def __init__(self):
self.opbuf = bytearray(82)
self.opptr = 0
self.op = 0
def handle(self, buf, sock):
if self.op == 2:
import struct
ret = self.file.readinto(memoryview(self.filebuf)[2:])
memoryview(self.filebuf)[0:2] = struct.pack("<h", ret)
sock.ioctl(9, 2)
sock.write(memoryview(self.filebuf)[0 : (2 + ret)])
if ret == 0:
sock.write(b"WB\x00\x00")
self.op = 0
self.filebuf = None
sock.ioctl(9, 1)
return
self.opbuf[self.opptr] = buf[0]
self.opptr += 1
if self.opptr != 82: # or bytes(buf[0:2]) != b"WA":
return
self.opptr = 0
sock.ioctl(9, 2)
sock.write(b"WB\x00\x00")
sock.ioctl(9, 1)
type = self.opbuf[2]
if type == 2: # GET_FILE
self.op = type
name = self.opbuf[18:82].rstrip(b"\x00")
self.filebuf = bytearray(2 + 256)
self.file = open(name.decode(), "rb")
elif type == 1: # PUT_FILE
import struct
name = self.opbuf[18:82].rstrip(b"\x00")
size = struct.unpack("<I", self.opbuf[12:16])[0]
filebuf = bytearray(512)
with open(name.decode(), "wb") as file:
while size > 0:
ret = sock.readinto(filebuf)
if ret is None:
continue
if ret > 0:
file.write(memoryview(filebuf)[0:ret])
size -= ret
elif ret < 0:
break
sock.ioctl(9, 2)
sock.write(b"WB\x00\x00")
sock.ioctl(9, 1)

Wyświetl plik

@ -1,4 +1,5 @@
metadata(description="WebREPL server.", version="0.1.0") metadata(description="WebREPL server.", version="1.0.0")
module("webrepl.py", opt=3) module("webrepl.py", opt=3)
module("legacy_file_transfer.py", opt=3)
module("webrepl_setup.py", opt=3) module("webrepl_setup.py", opt=3)

Wyświetl plik

@ -1,13 +1,14 @@
# This module should be imported from REPL, not run from command line. # This module should be imported from REPL, not run from command line.
import binascii import binascii
import hashlib import hashlib
from micropython import const
import network import network
import os import os
import socket import socket
import sys import sys
import websocket import websocket
import _webrepl import io
from micropython import const
from legacy_file_transfer import LegacyFileTransfer
listen_s = None listen_s = None
client_s = None client_s = None
@ -15,11 +16,81 @@ client_s = None
DEBUG = 0 DEBUG = 0
_DEFAULT_STATIC_HOST = const("https://micropython.org/webrepl/") _DEFAULT_STATIC_HOST = const("https://micropython.org/webrepl/")
_WELCOME_PROMPT = const("\r\nWebREPL connected\r\n>>> ")
static_host = _DEFAULT_STATIC_HOST static_host = _DEFAULT_STATIC_HOST
webrepl_pass = None
legacy = LegacyFileTransfer()
def server_handshake(cl): class WebreplWrapper(io.IOBase):
req = cl.makefile("rwb", 0) def __init__(self, sock):
self.sock = sock
self.sock.ioctl(9, 1 if legacy else 2)
if webrepl_pass is not None:
self.pw = bytearray(16)
self.pwPos = 0
self.sock.write("Password: ")
else:
self.pw = None
self.sock.write(_WELCOME_PROMPT)
def readinto(self, buf):
if self.pw is not None:
buf1 = bytearray(1)
while True:
l = self.sock.readinto(buf1)
if l is None:
continue
if l <= 0:
return l
if buf1[0] == 10 or buf1[0] == 13:
print("Authenticating with:")
print(self.pw[0 : self.pwPos])
if bytes(self.pw[0 : self.pwPos]) == webrepl_pass:
self.pw = None
del self.pwPos
self.sock.write(_WELCOME_PROMPT)
break
else:
print(bytes(self.pw[0 : self.pwPos]))
print(webrepl_pass)
self.sock.write("\r\nAccess denied\r\n")
return 0
else:
if self.pwPos < len(self.pw):
self.pw[self.pwPos] = buf1[0]
self.pwPos = self.pwPos + 1
ret = None
while True:
ret = self.sock.readinto(buf)
if ret is None or ret <= 0:
break
# ignore any non-data frames
if self.sock.ioctl(8) >= 8:
continue
if self.sock.ioctl(8) == 2 and legacy:
legacy.handle(buf, self.sock)
continue
break
return ret
def write(self, buf):
if self.pw is not None:
return len(buf)
return self.sock.write(buf)
def ioctl(self, kind, arg):
if kind == 4:
self.sock.close()
return 0
return -1
def close(self):
self.sock.close()
def server_handshake(req):
# Skip HTTP GET line. # Skip HTTP GET line.
l = req.readline() l = req.readline()
if DEBUG: if DEBUG:
@ -61,30 +132,35 @@ def server_handshake(cl):
if DEBUG: if DEBUG:
print("respkey:", respkey) print("respkey:", respkey)
cl.send( req.write(
b"""\ b"""\
HTTP/1.1 101 Switching Protocols\r HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r Upgrade: websocket\r
Connection: Upgrade\r Connection: Upgrade\r
Sec-WebSocket-Accept: """ Sec-WebSocket-Accept: """
) )
cl.send(respkey) req.write(respkey)
cl.send("\r\n\r\n") req.write("\r\n\r\n")
return True return True
def send_html(cl): def send_html(cl):
cl.send( cl.write(
b"""\ b"""\
HTTP/1.0 200 OK\r HTTP/1.0 200 OK\r
\r \r
<base href=\"""" <base href=\""""
) )
cl.send(static_host) cl.write(static_host)
cl.send( cl.write(
b"""\"></base>\r b"""\"></base>\r
<script src="webrepl_content.js"></script>\r <script src="webrepl"""
)
if not legacy:
cl.write("v2")
cl.write(
b"""_content.js"></script>\r
""" """
) )
cl.close() cl.close()
@ -95,10 +171,7 @@ def setup_conn(port, accept_handler):
listen_s = socket.socket() listen_s = socket.socket()
listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port) listen_s.bind(("", port))
addr = ai[0][4]
listen_s.bind(addr)
listen_s.listen(1) listen_s.listen(1)
if accept_handler: if accept_handler:
listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler) listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler)
@ -110,11 +183,14 @@ def setup_conn(port, accept_handler):
def accept_conn(listen_sock): def accept_conn(listen_sock):
global client_s global client_s, webrepl_ssl_context
cl, remote_addr = listen_sock.accept() cl, remote_addr = listen_sock.accept()
sock = cl
if webrepl_ssl_context is not None:
sock = webrepl_ssl_context.wrap_socket(sock)
if not server_handshake(cl): if not server_handshake(cl):
send_html(cl) send_html(sock)
return False return False
prev = os.dupterm(None) prev = os.dupterm(None)
@ -126,13 +202,13 @@ def accept_conn(listen_sock):
print("\nWebREPL connection from:", remote_addr) print("\nWebREPL connection from:", remote_addr)
client_s = cl client_s = cl
ws = websocket.websocket(cl, True) sock = websocket.websocket(sock)
ws = _webrepl._webrepl(ws) sock = WebreplWrapper(sock)
cl.setblocking(False) cl.setblocking(False)
# notify REPL on socket incoming data (ESP32/ESP8266-only) # notify REPL on socket incoming data (ESP32/ESP8266-only)
if hasattr(os, "dupterm_notify"): if hasattr(os, "dupterm_notify"):
cl.setsockopt(socket.SOL_SOCKET, 20, os.dupterm_notify) cl.setsockopt(socket.SOL_SOCKET, 20, os.dupterm_notify)
os.dupterm(ws) os.dupterm(sock)
return True return True
@ -146,11 +222,12 @@ def stop():
listen_s.close() listen_s.close()
def start(port=8266, password=None, accept_handler=accept_conn): def start(port=8266, password=None, ssl_context=None, accept_handler=accept_conn):
global static_host global static_host, webrepl_pass, webrepl_ssl_context
stop() stop()
webrepl_ssl_context = ssl_context
webrepl_pass = password webrepl_pass = password
if webrepl_pass is None: if password is None:
try: try:
import webrepl_cfg import webrepl_cfg
@ -160,7 +237,9 @@ def start(port=8266, password=None, accept_handler=accept_conn):
except: except:
print("WebREPL is not configured, run 'import webrepl_setup'") print("WebREPL is not configured, run 'import webrepl_setup'")
_webrepl.password(webrepl_pass) if webrepl_pass is not None:
webrepl_pass = webrepl_pass.encode()
s = setup_conn(port, accept_handler) s = setup_conn(port, accept_handler)
if accept_handler is None: if accept_handler is None:
@ -174,5 +253,5 @@ def start(port=8266, password=None, accept_handler=accept_conn):
print("Started webrepl in manual override mode") print("Started webrepl in manual override mode")
def start_foreground(port=8266, password=None): def start_foreground(port=8266, password=None, ssl_context=None):
start(port, password, None) start(port, password, ssl_context=ssl_context, accept_handler=None)