pull/1022/merge
Andrew Leech 2025-06-25 07:12:53 +00:00 zatwierdzone przez GitHub
commit b376e745f9
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
15 zmienionych plików z 1729 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,172 @@
# MicroPython debugpy
A minimal implementation of debugpy for MicroPython, enabling remote debugging
such as VS Code debugging support.
## Features
- Debug Adapter Protocol (DAP) support for VS Code integration
- Basic debugging operations:
- Breakpoints
- Step over/into/out
- Stack trace inspection
- Variable inspection (globals, locals generally not supported)
- Expression evaluation
- Pause/continue execution
## Requirements
- MicroPython with `sys.settrace` support (enabled with `MICROPY_PY_SYS_SETTRACE`)
- Socket support for network communication
- JSON support for DAP message parsing
## Usage
### Basic Usage
```python
import debugpy
# Start listening for debugger connections
host, port = debugpy.listen() # Default: 127.0.0.1:5678
print(f"Debugger listening on {host}:{port}")
# Enable debugging for current thread
debugpy.debug_this_thread()
# Your code here...
def my_function():
x = 10
y = 20
result = x + y # Set breakpoint here in VS Code
return result
result = my_function()
print(f"Result: {result}")
# Manual breakpoint
debugpy.breakpoint()
```
### VS Code Configuration
Create a `.vscode/launch.json` file in your project:
```json
{
"version": "0.2.0",
"configurations": [
{
"name": "Attach to MicroPython",
"type": "python",
"request": "attach",
"connect": {
"host": "127.0.0.1",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
],
"justMyCode": false
}
]
}
```
### Testing
1. Build the MicroPython Unix coverage port:
```bash
cd ports/unix
make CFLAGS_EXTRA="-DMICROPY_PY_SYS_SETTRACE=1"
```
2. Run the test script:
```bash
cd lib/micropython-lib/python-ecosys/debugpy
../../../../ports/unix/build-coverage/micropython test_debugpy.py
```
3. In VS Code, open the debugpy folder and press F5 to attach the debugger
4. Set breakpoints in the test script and observe debugging functionality
## API Reference
### `debugpy.listen(port=5678, host="127.0.0.1")`
Start listening for debugger connections.
**Parameters:**
- `port`: Port number to listen on (default: 5678)
- `host`: Host address to bind to (default: "127.0.0.1")
**Returns:** Tuple of (host, port) actually used
### `debugpy.debug_this_thread()`
Enable debugging for the current thread by installing the trace function.
### `debugpy.breakpoint()`
Trigger a manual breakpoint that will pause execution if a debugger is attached.
### `debugpy.wait_for_client()`
Wait for the debugger client to connect and initialize.
### `debugpy.is_client_connected()`
Check if a debugger client is currently connected.
**Returns:** Boolean indicating connection status
### `debugpy.disconnect()`
Disconnect from the debugger client and clean up resources.
## Architecture
The implementation consists of several key components:
1. **Public API** (`public_api.py`): Main entry points for users
2. **Debug Session** (`server/debug_session.py`): Handles DAP protocol communication
3. **PDB Adapter** (`server/pdb_adapter.py`): Bridges DAP and MicroPython's trace system
4. **Messaging** (`common/messaging.py`): JSON message handling for DAP
5. **Constants** (`common/constants.py`): DAP protocol constants
## Limitations
This is a minimal implementation with the following limitations:
- Single-threaded debugging only
- No conditional breakpoints
- No function breakpoints
- Limited variable inspection (no nested object expansion)
- No step back functionality
- No hot code reloading
- Simplified stepping implementation
## Compatibility
Tested with:
- MicroPython Unix port
- VS Code with Python/debugpy extension
- CPython 3.x (for comparison)
## Contributing
This implementation provides a foundation for MicroPython debugging. Contributions are welcome to add:
- Conditional breakpoint support
- Better variable inspection
- Multi-threading support
- Performance optimizations
- Additional DAP features
## License
MIT License - see the MicroPython project license for details.

Wyświetl plik

@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""DAP protocol monitor - sits between VS Code and MicroPython debugpy."""
import socket
import threading
import json
import time
import sys
class DAPMonitor:
def __init__(self, listen_port=5679, target_host='127.0.0.1', target_port=5678):
self.disconnect = False
self.listen_port = listen_port
self.target_host = target_host
self.target_port = target_port
self.client_sock = None
self.server_sock = None
def start(self):
"""Start the DAP monitor proxy."""
print(f"DAP Monitor starting on port {self.listen_port}")
print(f"Will forward to {self.target_host}:{self.target_port}")
print("Start MicroPython debugpy server first, then connect VS Code to port 5679")
# Create listening socket
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('127.0.0.1', self.listen_port))
listener.listen(1)
print(f"Listening for VS Code connection on port {self.listen_port}...")
try:
# Wait for VS Code to connect
self.client_sock, client_addr = listener.accept()
print(f"VS Code connected from {client_addr}")
# Connect to MicroPython debugpy server
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.connect((self.target_host, self.target_port))
print(f"Connected to MicroPython debugpy at {self.target_host}:{self.target_port}")
# Start forwarding threads
threading.Thread(target=self.forward_client_to_server, daemon=True).start()
threading.Thread(target=self.forward_server_to_client, daemon=True).start()
print("DAP Monitor active - press Ctrl+C to stop")
while not self.disconnect:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping DAP Monitor...")
except Exception as e:
print(f"Error: {e}")
finally:
self.cleanup()
def forward_client_to_server(self):
"""Forward messages from VS Code client to MicroPython server."""
try:
while True:
data = self.receive_dap_message(self.client_sock, "VS Code")
if data is None:
break
self.send_raw_data(self.server_sock, data)
except Exception as e:
print(f"Client->Server forwarding error: {e}")
def forward_server_to_client(self):
"""Forward messages from MicroPython server to VS Code client."""
try:
while True:
data = self.receive_dap_message(self.server_sock, "MicroPython")
if data is None:
break
self.send_raw_data(self.client_sock, data)
except Exception as e:
print(f"Server->Client forwarding error: {e}")
def receive_dap_message(self, sock, source):
"""Receive and log a DAP message."""
try:
# Read headers
header = b""
while b"\r\n\r\n" not in header:
byte = sock.recv(1)
if not byte:
return None
header += byte
# Parse content length
header_str = header.decode('utf-8')
content_length = 0
for line in header_str.split('\r\n'):
if line.startswith('Content-Length:'):
content_length = int(line.split(':', 1)[1].strip())
break
if content_length == 0:
return None
# Read content
content = b""
while len(content) < content_length:
chunk = sock.recv(content_length - len(content))
if not chunk:
return None
content += chunk
# Parse and Log the message
message = self.parse_dap(source, content)
self.log_dap_message(source, message)
# Check for disconnect command
if message:
if "disconnect" == message.get('command', message.get('event', 'unknown')):
print(f"\n[{source}] Disconnect command received, stopping monitor.")
self.disconnect = True
return header + content
except Exception as e:
print(f"Error receiving from {source}: {e}")
return None
def parse_dap(self, source, content):
"""Parse DAP message and log it."""
try:
message = json.loads(content.decode('utf-8'))
return message
except json.JSONDecodeError:
print(f"\n[{source}] Invalid JSON: {content}")
return None
def log_dap_message(self, source, message):
"""Log DAP message details."""
msg_type = message.get('type', 'unknown')
command = message.get('command', message.get('event', 'unknown'))
seq = message.get('seq', 0)
print(f"\n[{source}] {msg_type.upper()}: {command} (seq={seq})")
if msg_type == 'request':
args = message.get('arguments', {})
if args:
print(f" Arguments: {json.dumps(args, indent=2)}")
elif msg_type == 'response':
success = message.get('success', False)
req_seq = message.get('request_seq', 0)
print(f" Success: {success}, Request Seq: {req_seq}")
body = message.get('body')
if body:
print(f" Body: {json.dumps(body, indent=2)}")
msg = message.get('message')
if msg:
print(f" Message: {msg}")
elif msg_type == 'event':
body = message.get('body', {})
if body:
print(f" Body: {json.dumps(body, indent=2)}")
def send_raw_data(self, sock, data):
"""Send raw data to socket."""
try:
sock.send(data)
except Exception as e:
print(f"Error sending data: {e}")
def cleanup(self):
"""Clean up sockets."""
if self.client_sock:
self.client_sock.close()
if self.server_sock:
self.server_sock.close()
if __name__ == "__main__":
monitor = DAPMonitor()
monitor.start()

Wyświetl plik

@ -0,0 +1,20 @@
"""MicroPython debugpy implementation.
A minimal port of debugpy for MicroPython to enable VS Code debugging support.
This implementation focuses on the core DAP (Debug Adapter Protocol) functionality
needed for basic debugging operations like breakpoints, stepping, and variable inspection.
"""
__version__ = "0.1.0"
from .public_api import listen, wait_for_client, breakpoint, debug_this_thread
from .common.constants import DEFAULT_HOST, DEFAULT_PORT
__all__ = [
"listen",
"wait_for_client",
"breakpoint",
"debug_this_thread",
"DEFAULT_HOST",
"DEFAULT_PORT",
]

Wyświetl plik

@ -0,0 +1 @@
# Common utilities and constants for debugpy

Wyświetl plik

@ -0,0 +1,60 @@
"""Constants used throughout debugpy."""
# Default networking settings
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 5678
# DAP message types
MSG_TYPE_REQUEST = "request"
MSG_TYPE_RESPONSE = "response"
MSG_TYPE_EVENT = "event"
# DAP events
EVENT_INITIALIZED = "initialized"
EVENT_STOPPED = "stopped"
EVENT_CONTINUED = "continued"
EVENT_THREAD = "thread"
EVENT_BREAKPOINT = "breakpoint"
EVENT_OUTPUT = "output"
EVENT_TERMINATED = "terminated"
EVENT_EXITED = "exited"
# DAP commands
CMD_INITIALIZE = "initialize"
CMD_LAUNCH = "launch"
CMD_ATTACH = "attach"
CMD_SET_BREAKPOINTS = "setBreakpoints"
CMD_CONTINUE = "continue"
CMD_NEXT = "next"
CMD_STEP_IN = "stepIn"
CMD_STEP_OUT = "stepOut"
CMD_PAUSE = "pause"
CMD_STACK_TRACE = "stackTrace"
CMD_SCOPES = "scopes"
CMD_VARIABLES = "variables"
CMD_EVALUATE = "evaluate"
CMD_DISCONNECT = "disconnect"
CMD_CONFIGURATION_DONE = "configurationDone"
CMD_THREADS = "threads"
CMD_SOURCE = "source"
# Stop reasons
STOP_REASON_STEP = "step"
STOP_REASON_BREAKPOINT = "breakpoint"
STOP_REASON_EXCEPTION = "exception"
STOP_REASON_PAUSE = "pause"
STOP_REASON_ENTRY = "entry"
# Thread reasons
THREAD_REASON_STARTED = "started"
THREAD_REASON_EXITED = "exited"
# Trace events
TRACE_CALL = "call"
TRACE_LINE = "line"
TRACE_RETURN = "return"
TRACE_EXCEPTION = "exception"
# Scope types
SCOPE_LOCALS = "locals"
SCOPE_GLOBALS = "globals"

Wyświetl plik

@ -0,0 +1,154 @@
"""JSON message handling for DAP protocol."""
import json
from .constants import MSG_TYPE_REQUEST, MSG_TYPE_RESPONSE, MSG_TYPE_EVENT
class JsonMessageChannel:
"""Handles JSON message communication over a socket using DAP format."""
def __init__(self, sock, debug_callback=None):
self.sock = sock
self.seq = 0
self.closed = False
self._recv_buffer = b""
self._debug_print = debug_callback or (lambda x: None) # Default to no-op
def send_message(self, msg_type, command=None, **kwargs):
"""Send a DAP message."""
if self.closed:
return
self.seq += 1
message = {
"seq": self.seq,
"type": msg_type,
}
if command:
if msg_type == MSG_TYPE_REQUEST:
message["command"] = command
if kwargs:
message["arguments"] = kwargs
elif msg_type == MSG_TYPE_RESPONSE:
message["command"] = command
message["request_seq"] = kwargs.get("request_seq", 0)
message["success"] = kwargs.get("success", True)
if "body" in kwargs:
message["body"] = kwargs["body"]
if "message" in kwargs:
message["message"] = kwargs["message"]
elif msg_type == MSG_TYPE_EVENT:
message["event"] = command
if kwargs:
message["body"] = kwargs
json_str = json.dumps(message)
content = json_str.encode("utf-8")
header = f"Content-Length: {len(content)}\r\n\r\n".encode("utf-8")
try:
self.sock.send(header + content)
except OSError:
self.closed = True
def send_request(self, command, **kwargs):
"""Send a request message."""
self.send_message(MSG_TYPE_REQUEST, command, **kwargs)
def send_response(self, command, request_seq, success=True, body=None, message=None):
"""Send a response message."""
kwargs = {"request_seq": request_seq, "success": success}
if body is not None:
kwargs["body"] = body
if message is not None:
kwargs["message"] = message
self._debug_print(f"[DAP] SEND: response {command} (req_seq={request_seq}, success={success})")
if body:
self._debug_print(f"[DAP] body: {body}")
if message:
self._debug_print(f"[DAP] message: {message}")
self.send_message(MSG_TYPE_RESPONSE, command, **kwargs)
def send_event(self, event, **kwargs):
"""Send an event message."""
self._debug_print(f"[DAP] SEND: event {event}")
if kwargs:
self._debug_print(f"[DAP] body: {kwargs}")
self.send_message(MSG_TYPE_EVENT, event, **kwargs)
def recv_message(self):
"""Receive a DAP message."""
if self.closed:
return None
try:
# Read headers
while b"\r\n\r\n" not in self._recv_buffer:
try:
data = self.sock.recv(1024)
if not data:
self.closed = True
return None
self._recv_buffer += data
except OSError as e:
# Handle timeout and other socket errors
if hasattr(e, 'errno') and e.errno in (11, 35): # EAGAIN, EWOULDBLOCK
return None # No data available
self.closed = True
return None
header_end = self._recv_buffer.find(b"\r\n\r\n")
header_str = self._recv_buffer[:header_end].decode("utf-8")
self._recv_buffer = self._recv_buffer[header_end + 4:]
# Parse Content-Length
content_length = 0
for line in header_str.split("\r\n"):
if line.startswith("Content-Length:"):
content_length = int(line.split(":", 1)[1].strip())
break
if content_length == 0:
return None
# Read body
while len(self._recv_buffer) < content_length:
try:
data = self.sock.recv(content_length - len(self._recv_buffer))
if not data:
self.closed = True
return None
self._recv_buffer += data
except OSError as e:
if hasattr(e, 'errno') and e.errno in (11, 35): # EAGAIN, EWOULDBLOCK
return None
self.closed = True
return None
body = self._recv_buffer[:content_length]
self._recv_buffer = self._recv_buffer[content_length:]
# Parse JSON
try:
message = json.loads(body.decode("utf-8"))
self._debug_print(f"[DAP] Successfully received message: {message.get('type')} {message.get('command', message.get('event', 'unknown'))}")
return message
except (ValueError, UnicodeDecodeError) as e:
print(f"[DAP] JSON parse error: {e}")
return None
except OSError as e:
print(f"[DAP] Socket error in recv_message: {e}")
self.closed = True
return None
def close(self):
"""Close the channel."""
self.closed = True
try:
self.sock.close()
except OSError:
pass

Wyświetl plik

@ -0,0 +1,126 @@
"""Public API for debugpy."""
import socket
import sys
from .common.constants import DEFAULT_HOST, DEFAULT_PORT
from .server.debug_session import DebugSession
_debug_session = None
def listen(port=DEFAULT_PORT, host=DEFAULT_HOST):
"""Start listening for debugger connections.
Args:
port: Port number to listen on (default: 5678)
host: Host address to bind to (default: "127.0.0.1")
Returns:
(host, port) tuple of the actual listening address
"""
global _debug_session
if _debug_session is not None:
raise RuntimeError("Already listening for debugger")
# Create listening socket
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except:
pass # Not supported in MicroPython
# Use getaddrinfo for MicroPython compatibility
addr_info = socket.getaddrinfo(host, port)
addr = addr_info[0][-1] # Get the sockaddr
listener.bind(addr)
listener.listen(1)
# getsockname not available in MicroPython, use original values
print(f"Debugpy listening on {host}:{port}")
# Wait for connection
client_sock = None
try:
client_sock, client_addr = listener.accept()
print(f"Debugger connected from {client_addr}")
# Create debug session
_debug_session = DebugSession(client_sock)
# Handle just the initialize request, then return immediately
print("[DAP] Waiting for initialize request...")
init_message = _debug_session.channel.recv_message()
if init_message and init_message.get('command') == 'initialize':
_debug_session._handle_message(init_message)
print("[DAP] Initialize request handled - returning control immediately")
else:
print(f"[DAP] Warning: Expected initialize, got {init_message}")
# Set socket to non-blocking for subsequent message processing
_debug_session.channel.sock.settimeout(0.001)
print("[DAP] Debug session ready - all other messages will be handled in trace function")
except Exception as e:
print(f"[DAP] Connection error: {e}")
if client_sock:
client_sock.close()
_debug_session = None
finally:
# Only close the listener, not the client connection
listener.close()
return (host, port)
def wait_for_client():
"""Wait for the debugger client to connect and initialize."""
global _debug_session
if _debug_session:
_debug_session.wait_for_client()
def breakpoint():
"""Trigger a breakpoint in the debugger."""
global _debug_session
if _debug_session:
_debug_session.trigger_breakpoint()
else:
# Fallback to built-in breakpoint if available
if hasattr(__builtins__, 'breakpoint'):
__builtins__.breakpoint()
def debug_this_thread():
"""Enable debugging for the current thread."""
global _debug_session
if _debug_session:
_debug_session.debug_this_thread()
else:
# Install trace function even if no session yet
if hasattr(sys, 'settrace'):
sys.settrace(_default_trace_func)
else:
raise RuntimeError("MICROPY_PY_SYS_SETTRACE required")
def _default_trace_func(frame, event, arg):
"""Default trace function when no debug session is active."""
# Just return None to continue execution
return None
def is_client_connected():
"""Check if a debugger client is connected."""
global _debug_session
return _debug_session is not None and _debug_session.is_connected()
def disconnect():
"""Disconnect from the debugger client."""
global _debug_session
if _debug_session:
_debug_session.disconnect()
_debug_session = None

Wyświetl plik

@ -0,0 +1 @@
# Debug server components

Wyświetl plik

@ -0,0 +1,429 @@
"""Main debug session handling DAP protocol communication."""
import sys
from ..common.messaging import JsonMessageChannel
from ..common.constants import (
CMD_INITIALIZE, CMD_LAUNCH, CMD_ATTACH, CMD_SET_BREAKPOINTS,
CMD_CONTINUE, CMD_NEXT, CMD_STEP_IN, CMD_STEP_OUT, CMD_PAUSE,
CMD_STACK_TRACE, CMD_SCOPES, CMD_VARIABLES, CMD_EVALUATE, CMD_DISCONNECT,
CMD_CONFIGURATION_DONE, CMD_THREADS, CMD_SOURCE, EVENT_INITIALIZED, EVENT_STOPPED, EVENT_CONTINUED, EVENT_TERMINATED,
STOP_REASON_BREAKPOINT, STOP_REASON_STEP, STOP_REASON_PAUSE,
TRACE_CALL, TRACE_LINE, TRACE_RETURN, TRACE_EXCEPTION
)
from .pdb_adapter import PdbAdapter
class DebugSession:
"""Manages a debugging session with a DAP client."""
def __init__(self, client_socket):
self.debug_logging = False # Initialize first
self.channel = JsonMessageChannel(client_socket, self._debug_print)
self.pdb = PdbAdapter()
self.pdb._debug_session = self # Allow PDB to process messages during wait
self.initialized = False
self.connected = True
self.thread_id = 1 # Simple single-thread model
self.stepping = False
self.paused = False
def _debug_print(self, message):
"""Print debug message only if debug logging is enabled."""
if self.debug_logging:
print(message)
def start(self):
"""Start the debug session message loop."""
try:
while self.connected and not self.channel.closed:
message = self.channel.recv_message()
if message is None:
break
self._handle_message(message)
except Exception as e:
print(f"Debug session error: {e}")
finally:
self.disconnect()
def initialize_connection(self):
"""Initialize the connection - handle just the essential initial messages then return."""
# Note: debug_logging not available yet during init, so we always show these messages
print("[DAP] Processing initial DAP messages...")
try:
# Process initial messages quickly and return control to main thread
# We'll handle ongoing messages in the trace function
attached = False
message_count = 0
max_init_messages = 6 # Just handle the first few essential messages
while message_count < max_init_messages and not attached:
try:
# Short timeout - don't block the main thread for long
self.channel.sock.settimeout(1.0)
message = self.channel.recv_message()
if message is None:
print(f"[DAP] No more messages in initial batch")
break
print(f"[DAP] Initial message #{message_count + 1}: {message.get('command')}")
self._handle_message(message)
message_count += 1
# Just wait for attach, then we can return control
if message.get('command') == 'attach':
attached = True
print("[DAP] ✅ Attach received - returning control to main thread")
break
except Exception as e:
print(f"[DAP] Exception in initial processing: {e}")
break
finally:
self.channel.sock.settimeout(None)
# After attach, continue processing a few more messages quickly
if attached:
self._debug_print("[DAP] Processing remaining setup messages...")
additional_count = 0
while additional_count < 4: # Just a few more
try:
self.channel.sock.settimeout(0.5) # Short timeout
message = self.channel.recv_message()
if message is None:
break
self._debug_print(f"[DAP] Setup message: {message.get('command')}")
self._handle_message(message)
additional_count += 1
except:
break
finally:
self.channel.sock.settimeout(None)
print(f"[DAP] Initial setup complete - main thread can continue")
except Exception as e:
print(f"[DAP] Initialization error: {e}")
def process_pending_messages(self):
"""Process any pending DAP messages without blocking."""
try:
# Set socket to non-blocking mode for message processing
self.channel.sock.settimeout(0.001) # Very short timeout
while True:
message = self.channel.recv_message()
if message is None:
break
self._handle_message(message)
except Exception:
# No messages available or socket error
pass
finally:
# Reset to blocking mode
self.channel.sock.settimeout(None)
def _handle_message(self, message):
"""Handle incoming DAP messages."""
msg_type = message.get("type")
command = message.get("command", message.get("event", "unknown"))
seq = message.get("seq", 0)
self._debug_print(f"[DAP] RECV: {msg_type} {command} (seq={seq})")
if message.get("arguments"):
self._debug_print(f"[DAP] args: {message['arguments']}")
if msg_type == "request":
self._handle_request(message)
elif msg_type == "response":
# We don't expect responses from client
self._debug_print(f"[DAP] Unexpected response from client: {message}")
elif msg_type == "event":
# We don't expect events from client
self._debug_print(f"[DAP] Unexpected event from client: {message}")
def _handle_request(self, message):
"""Handle DAP request messages."""
command = message.get("command")
seq = message.get("seq", 0)
args = message.get("arguments", {})
try:
if command == CMD_INITIALIZE:
self._handle_initialize(seq, args)
elif command == CMD_LAUNCH:
self._handle_launch(seq, args)
elif command == CMD_ATTACH:
self._handle_attach(seq, args)
elif command == CMD_SET_BREAKPOINTS:
self._handle_set_breakpoints(seq, args)
elif command == CMD_CONTINUE:
self._handle_continue(seq, args)
elif command == CMD_NEXT:
self._handle_next(seq, args)
elif command == CMD_STEP_IN:
self._handle_step_in(seq, args)
elif command == CMD_STEP_OUT:
self._handle_step_out(seq, args)
elif command == CMD_PAUSE:
self._handle_pause(seq, args)
elif command == CMD_STACK_TRACE:
self._handle_stack_trace(seq, args)
elif command == CMD_SCOPES:
self._handle_scopes(seq, args)
elif command == CMD_VARIABLES:
self._handle_variables(seq, args)
elif command == CMD_EVALUATE:
self._handle_evaluate(seq, args)
elif command == CMD_DISCONNECT:
self._handle_disconnect(seq, args)
elif command == CMD_CONFIGURATION_DONE:
self._handle_configuration_done(seq, args)
elif command == CMD_THREADS:
self._handle_threads(seq, args)
elif command == CMD_SOURCE:
self._handle_source(seq, args)
else:
self.channel.send_response(command, seq, success=False,
message=f"Unknown command: {command}")
except Exception as e:
self.channel.send_response(command, seq, success=False,
message=str(e))
def _handle_initialize(self, seq, args):
"""Handle initialize request."""
capabilities = {
"supportsConfigurationDoneRequest": True,
"supportsFunctionBreakpoints": False,
"supportsConditionalBreakpoints": False,
"supportsHitConditionalBreakpoints": False,
"supportsEvaluateForHovers": True,
"supportsStepBack": False,
"supportsSetVariable": False,
"supportsRestartFrame": False,
"supportsGotoTargetsRequest": False,
"supportsStepInTargetsRequest": False,
"supportsCompletionsRequest": False,
"supportsModulesRequest": False,
"additionalModuleColumns": [],
"supportedChecksumAlgorithms": [],
"supportsRestartRequest": False,
"supportsExceptionOptions": False,
"supportsValueFormattingOptions": False,
"supportsExceptionInfoRequest": False,
"supportTerminateDebuggee": True,
"supportSuspendDebuggee": True,
"supportsDelayedStackTraceLoading": False,
"supportsLoadedSourcesRequest": False,
"supportsLogPoints": False,
"supportsTerminateThreadsRequest": False,
"supportsSetExpression": False,
"supportsTerminateRequest": True,
"supportsDataBreakpoints": False,
"supportsReadMemoryRequest": False,
"supportsWriteMemoryRequest": False,
"supportsDisassembleRequest": False,
"supportsCancelRequest": False,
"supportsBreakpointLocationsRequest": False,
"supportsClipboardContext": False,
}
self.channel.send_response(CMD_INITIALIZE, seq, body=capabilities)
self.channel.send_event(EVENT_INITIALIZED)
self.initialized = True
def _handle_launch(self, seq, args):
"""Handle launch request."""
# For attach-mode debugging, we don't need to launch anything
self.channel.send_response(CMD_LAUNCH, seq)
def _handle_attach(self, seq, args):
"""Handle attach request."""
# Check if debug logging should be enabled
self.debug_logging = args.get("logToFile", False)
self._debug_print(f"[DAP] Processing attach request with args: {args}")
print(f"[DAP] Debug logging {'enabled' if self.debug_logging else 'disabled'} (logToFile={self.debug_logging})")
# Enable trace function
self.pdb.set_trace_function(self._trace_function)
self.channel.send_response(CMD_ATTACH, seq)
# After successful attach, we might need to send additional events
# Some debuggers expect a 'process' event or thread events
self._debug_print("[DAP] Attach completed, debugging is now active")
def _handle_set_breakpoints(self, seq, args):
"""Handle setBreakpoints request."""
source = args.get("source", {})
filename = source.get("path", "<unknown>")
breakpoints = args.get("breakpoints", [])
# Debug log the source information
self._debug_print(f"[DAP] setBreakpoints source info: {source}")
# Set breakpoints in pdb adapter
actual_breakpoints = self.pdb.set_breakpoints(filename, breakpoints)
self.channel.send_response(CMD_SET_BREAKPOINTS, seq,
body={"breakpoints": actual_breakpoints})
def _handle_continue(self, seq, args):
"""Handle continue request."""
self.stepping = False
self.paused = False
self.pdb.continue_execution()
self.channel.send_response(CMD_CONTINUE, seq)
def _handle_next(self, seq, args):
"""Handle next (step over) request."""
self.stepping = True
self.paused = False
self.pdb.step_over()
self.channel.send_response(CMD_NEXT, seq)
def _handle_step_in(self, seq, args):
"""Handle stepIn request."""
self.stepping = True
self.paused = False
self.pdb.step_into()
self.channel.send_response(CMD_STEP_IN, seq)
def _handle_step_out(self, seq, args):
"""Handle stepOut request."""
self.stepping = True
self.paused = False
self.pdb.step_out()
self.channel.send_response(CMD_STEP_OUT, seq)
def _handle_pause(self, seq, args):
"""Handle pause request."""
self.paused = True
self.pdb.pause()
self.channel.send_response(CMD_PAUSE, seq)
def _handle_stack_trace(self, seq, args):
"""Handle stackTrace request."""
stack_frames = self.pdb.get_stack_trace()
self.channel.send_response(CMD_STACK_TRACE, seq,
body={"stackFrames": stack_frames, "totalFrames": len(stack_frames)})
def _handle_scopes(self, seq, args):
"""Handle scopes request."""
frame_id = args.get("frameId", 0)
self._debug_print(f"[DAP] Processing scopes request for frameId={frame_id}")
scopes = self.pdb.get_scopes(frame_id)
self._debug_print(f"[DAP] Generated scopes: {scopes}")
self.channel.send_response(CMD_SCOPES, seq, body={"scopes": scopes})
def _handle_variables(self, seq, args):
"""Handle variables request."""
variables_ref = args.get("variablesReference", 0)
variables = self.pdb.get_variables(variables_ref)
self.channel.send_response(CMD_VARIABLES, seq, body={"variables": variables})
def _handle_evaluate(self, seq, args):
"""Handle evaluate request."""
expression = args.get("expression", "")
frame_id = args.get("frameId")
context = args.get("context", "watch")
if not expression:
self.channel.send_response(CMD_EVALUATE, seq, success=False,
message="No expression provided")
return
try:
result = self.pdb.evaluate_expression(expression, frame_id)
self.channel.send_response(CMD_EVALUATE, seq, body={
"result": str(result),
"variablesReference": 0
})
except Exception as e:
self.channel.send_response(CMD_EVALUATE, seq, success=False,
message=str(e))
def _handle_disconnect(self, seq, args):
"""Handle disconnect request."""
self.channel.send_response(CMD_DISCONNECT, seq)
self.disconnect()
def _handle_configuration_done(self, seq, args):
"""Handle configurationDone request."""
# This indicates that the client has finished configuring breakpoints
# and is ready to start debugging
self.channel.send_response(CMD_CONFIGURATION_DONE, seq)
def _handle_threads(self, seq, args):
"""Handle threads request."""
# MicroPython is single-threaded, so return one thread
threads = [{
"id": self.thread_id,
"name": "main"
}]
self.channel.send_response(CMD_THREADS, seq, body={"threads": threads})
def _handle_source(self, seq, args):
"""Handle source request."""
source = args.get("source", {})
source_path = source.get("path", "")
try:
# Try to read the source file
with open(source_path, 'r') as f:
content = f.read()
self.channel.send_response(CMD_SOURCE, seq, body={"content": content})
except Exception as e:
self.channel.send_response(CMD_SOURCE, seq, success=False,
message=f"Could not read source: {e}")
def _trace_function(self, frame, event, arg):
"""Trace function called by sys.settrace."""
# Process any pending DAP messages frequently
self.process_pending_messages()
# Handle breakpoints and stepping
if self.pdb.should_stop(frame, event, arg):
self._send_stopped_event(STOP_REASON_BREAKPOINT if self.pdb.hit_breakpoint else
STOP_REASON_STEP if self.stepping else STOP_REASON_PAUSE)
# Wait for continue command
self.pdb.wait_for_continue()
return self._trace_function
def _send_stopped_event(self, reason):
"""Send stopped event to client."""
self.channel.send_event(EVENT_STOPPED,
reason=reason,
threadId=self.thread_id,
allThreadsStopped=True)
def wait_for_client(self):
"""Wait for client to initialize."""
# This is a simplified version - in a real implementation
# we might want to wait for specific initialization steps
pass
def trigger_breakpoint(self):
"""Trigger a manual breakpoint."""
if self.initialized:
self._send_stopped_event(STOP_REASON_BREAKPOINT)
def debug_this_thread(self):
"""Enable debugging for current thread."""
if hasattr(sys, 'settrace'):
sys.settrace(self._trace_function)
def is_connected(self):
"""Check if client is connected."""
return self.connected and not self.channel.closed
def disconnect(self):
"""Disconnect from client."""
self.connected = False
if hasattr(sys, 'settrace'):
sys.settrace(None)
self.pdb.cleanup()
self.channel.close()

Wyświetl plik

@ -0,0 +1,333 @@
"""PDB adapter for integrating with MicroPython's trace system."""
import sys
import time
import os
from ..common.constants import (
TRACE_CALL, TRACE_LINE, TRACE_RETURN, TRACE_EXCEPTION,
SCOPE_LOCALS, SCOPE_GLOBALS
)
class PdbAdapter:
"""Adapter between DAP protocol and MicroPython's sys.settrace functionality."""
def __init__(self):
self.breakpoints = {} # filename -> {line_no: breakpoint_info}
self.current_frame = None
self.step_mode = None # None, 'over', 'into', 'out'
self.step_frame = None
self.step_depth = 0
self.hit_breakpoint = False
self.continue_event = False
self.variables_cache = {} # frameId -> variables
self.frame_id_counter = 1
self.path_mapping = {} # runtime_path -> vscode_path mapping
def _debug_print(self, message):
"""Print debug message only if debug logging is enabled."""
if hasattr(self, '_debug_session') and self._debug_session.debug_logging:
print(message)
def _normalize_path(self, path):
"""Normalize a file path for consistent comparisons."""
# Convert to absolute path if possible
try:
if hasattr(os.path, 'abspath'):
path = os.path.abspath(path)
elif hasattr(os.path, 'realpath'):
path = os.path.realpath(path)
except:
pass
# Ensure consistent separators
path = path.replace('\\', '/')
return path
def set_trace_function(self, trace_func):
"""Install the trace function."""
if hasattr(sys, 'settrace'):
sys.settrace(trace_func)
else:
raise RuntimeError("sys.settrace not available")
def set_breakpoints(self, filename, breakpoints):
"""Set breakpoints for a file."""
self.breakpoints[filename] = {}
actual_breakpoints = []
# Debug log the breakpoint path
self._debug_print(f"[PDB] Setting breakpoints for file: {filename}")
for bp in breakpoints:
line = bp.get("line")
if line:
self.breakpoints[filename][line] = {
"line": line,
"verified": True,
"source": {"path": filename}
}
actual_breakpoints.append({
"line": line,
"verified": True,
"source": {"path": filename}
})
return actual_breakpoints
def should_stop(self, frame, event, arg):
"""Determine if execution should stop at this point."""
self.current_frame = frame
self.hit_breakpoint = False
# Get frame information
filename = frame.f_code.co_filename
lineno = frame.f_lineno
# Debug: print filename and line for debugging
if event == TRACE_LINE and lineno in [20, 21, 22, 23, 24]: # Only log lines near our breakpoints
self._debug_print(f"[PDB] Checking {filename}:{lineno} (event={event})")
self._debug_print(f"[PDB] Available breakpoint files: {list(self.breakpoints.keys())}")
# Check for exact filename match first
if filename in self.breakpoints:
if lineno in self.breakpoints[filename]:
self._debug_print(f"[PDB] HIT BREAKPOINT (exact match) at {filename}:{lineno}")
# Record the path mapping (in this case, they're already the same)
self.path_mapping[filename] = filename
self.hit_breakpoint = True
return True
# Also try checking by basename for path mismatches
def basename(path):
return path.split('/')[-1] if '/' in path else path
# Check if this might be a relative path match
def ends_with_path(full_path, relative_path):
"""Check if full_path ends with relative_path components."""
full_parts = full_path.replace('\\', '/').split('/')
rel_parts = relative_path.replace('\\', '/').split('/')
if len(rel_parts) > len(full_parts):
return False
return full_parts[-len(rel_parts):] == rel_parts
file_basename = basename(filename)
self._debug_print(f"[PDB] Fallback basename match: '{file_basename}' vs available files")
for bp_file in self.breakpoints:
bp_basename = basename(bp_file)
self._debug_print(f"[PDB] Comparing '{file_basename}' == '{bp_basename}' ?")
if bp_basename == file_basename:
self._debug_print(f"[PDB] Basename match found! Checking line {lineno} in {list(self.breakpoints[bp_file].keys())}")
if lineno in self.breakpoints[bp_file]:
self._debug_print(f"[PDB] HIT BREAKPOINT (fallback basename match) at {filename}:{lineno} -> {bp_file}")
# Record the path mapping so we can report the correct path in stack traces
self.path_mapping[filename] = bp_file
self.hit_breakpoint = True
return True
# Also check if the runtime path might be relative and the breakpoint path absolute
if ends_with_path(bp_file, filename):
self._debug_print(f"[PDB] Relative path match: {bp_file} ends with {filename}")
if lineno in self.breakpoints[bp_file]:
self._debug_print(f"[PDB] HIT BREAKPOINT (relative path match) at {filename}:{lineno} -> {bp_file}")
# Record the path mapping so we can report the correct path in stack traces
self.path_mapping[filename] = bp_file
self.hit_breakpoint = True
return True
# Check stepping
if self.step_mode == 'into':
if event in (TRACE_CALL, TRACE_LINE):
self.step_mode = None
return True
elif self.step_mode == 'over':
if event == TRACE_LINE and frame == self.step_frame:
self.step_mode = None
return True
elif event == TRACE_RETURN and frame == self.step_frame:
# Continue stepping in caller
if hasattr(frame, 'f_back') and frame.f_back:
self.step_frame = frame.f_back
else:
self.step_mode = None
elif self.step_mode == 'out':
if event == TRACE_RETURN and frame == self.step_frame:
self.step_mode = None
return True
return False
def continue_execution(self):
"""Continue execution."""
self.step_mode = None
self.continue_event = True
def step_over(self):
"""Step over (next line)."""
self.step_mode = 'over'
self.step_frame = self.current_frame
self.continue_event = True
def step_into(self):
"""Step into function calls."""
self.step_mode = 'into'
self.continue_event = True
def step_out(self):
"""Step out of current function."""
self.step_mode = 'out'
self.step_frame = self.current_frame
self.continue_event = True
def pause(self):
"""Pause execution at next opportunity."""
# This is handled by the debug session
pass
def wait_for_continue(self):
"""Wait for continue command (simplified implementation)."""
# In a real implementation, this would block until continue
# For MicroPython, we'll use a simple polling approach
self.continue_event = False
# Process DAP messages while waiting for continue
self._debug_print("[PDB] Waiting for continue command...")
while not self.continue_event:
# Process any pending DAP messages (scopes, variables, etc.)
if hasattr(self, '_debug_session'):
self._debug_session.process_pending_messages()
time.sleep(0.01)
def get_stack_trace(self):
"""Get the current stack trace."""
if not self.current_frame:
return []
frames = []
frame = self.current_frame
frame_id = 0
while frame:
filename = frame.f_code.co_filename
name = frame.f_code.co_name
line = frame.f_lineno
# Use the VS Code path if we have a mapping, otherwise use the original path
display_path = self.path_mapping.get(filename, filename)
if filename != display_path:
self._debug_print(f"[PDB] Stack trace path mapping: {filename} -> {display_path}")
# Create frame info
frames.append({
"id": frame_id,
"name": name,
"source": {"path": display_path},
"line": line,
"column": 1,
"endLine": line,
"endColumn": 1
})
# Cache frame for variable access
self.variables_cache[frame_id] = frame
# MicroPython doesn't have f_back attribute
if hasattr(frame, 'f_back'):
frame = frame.f_back
else:
# Only return the current frame for MicroPython
break
frame_id += 1
return frames
def get_scopes(self, frame_id):
"""Get variable scopes for a frame."""
scopes = [
{
"name": "Locals",
"variablesReference": frame_id * 1000 + 1,
"expensive": False
},
{
"name": "Globals",
"variablesReference": frame_id * 1000 + 2,
"expensive": False
}
]
return scopes
def get_variables(self, variables_ref):
"""Get variables for a scope."""
frame_id = variables_ref // 1000
scope_type = variables_ref % 1000
if frame_id not in self.variables_cache:
return []
frame = self.variables_cache[frame_id]
variables = []
if scope_type == 1: # Locals
var_dict = frame.f_locals if hasattr(frame, 'f_locals') else {}
elif scope_type == 2: # Globals
var_dict = frame.f_globals if hasattr(frame, 'f_globals') else {}
else:
return []
for name, value in var_dict.items():
# Skip private/internal variables
if name.startswith('__') and name.endswith('__'):
continue
try:
value_str = repr(value)
type_str = type(value).__name__
variables.append({
"name": name,
"value": value_str,
"type": type_str,
"variablesReference": 0 # Simple implementation - no nested objects
})
except Exception:
variables.append({
"name": name,
"value": "<error>",
"type": "unknown",
"variablesReference": 0
})
return variables
def evaluate_expression(self, expression, frame_id=None):
"""Evaluate an expression in the context of a frame."""
if frame_id is not None and frame_id in self.variables_cache:
frame = self.variables_cache[frame_id]
globals_dict = frame.f_globals if hasattr(frame, 'f_globals') else {}
locals_dict = frame.f_locals if hasattr(frame, 'f_locals') else {}
else:
# Use current frame
frame = self.current_frame
if frame:
globals_dict = frame.f_globals if hasattr(frame, 'f_globals') else {}
locals_dict = frame.f_locals if hasattr(frame, 'f_locals') else {}
else:
globals_dict = globals()
locals_dict = {}
try:
# Evaluate the expression
result = eval(expression, globals_dict, locals_dict)
return result
except Exception as e:
raise Exception(f"Evaluation error: {e}")
def cleanup(self):
"""Clean up resources."""
self.variables_cache.clear()
self.breakpoints.clear()
if hasattr(sys, 'settrace'):
sys.settrace(None)

Wyświetl plik

@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""Simple demo of MicroPython debugpy functionality."""
import sys
sys.path.insert(0, '.')
import debugpy
def simple_function(a, b):
"""A simple function to demonstrate debugging."""
result = a + b
print(f"Computing {a} + {b} = {result}")
return result
def main():
print("MicroPython debugpy Demo")
print("========================")
print()
# Demonstrate trace functionality
print("1. Testing trace functionality:")
def trace_function(frame, event, arg):
if event == 'call':
print(f" -> Entering function: {frame.f_code.co_name}")
elif event == 'line':
print(f" -> Executing line {frame.f_lineno} in {frame.f_code.co_name}")
elif event == 'return':
print(f" -> Returning from {frame.f_code.co_name} with value: {arg}")
return trace_function
# Enable tracing
sys.settrace(trace_function)
# Execute traced function
result = simple_function(5, 3)
# Disable tracing
sys.settrace(None)
print(f"Result: {result}")
print()
# Demonstrate debugpy components
print("2. Testing debugpy components:")
# Test PDB adapter
from debugpy.server.pdb_adapter import PdbAdapter
pdb = PdbAdapter()
# Set some mock breakpoints
breakpoints = pdb.set_breakpoints("demo.py", [{"line": 10}, {"line": 15}])
print(f" Set breakpoints: {len(breakpoints)} breakpoints")
# Test messaging
from debugpy.common.messaging import JsonMessageChannel
print(" JsonMessageChannel available")
print()
print("3. debugpy is ready for VS Code integration!")
print(" To use with VS Code:")
print(" - Import debugpy in your script")
print(" - Call debugpy.listen() to start the debug server")
print(" - Connect VS Code using the 'Attach to MicroPython' configuration")
print(" - Set breakpoints and debug normally")
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,84 @@
# Debugging MicroPython debugpy with VS Code
## Method 1: Direct Connection with Enhanced Logging
1. **Start MicroPython with enhanced logging:**
```bash
~/micropython2/ports/unix/build-standard/micropython test_vscode.py
```
This will now show detailed DAP protocol messages like:
```
[DAP] RECV: request initialize (seq=1)
[DAP] args: {...}
[DAP] SEND: response initialize (req_seq=1, success=True)
```
2. **Connect VS Code debugger:**
- Use the launch configuration in `.vscode/launch.json`
- Or manually attach to `127.0.0.1:5678`
3. **Look for issues in the terminal output** - you'll see all DAP message exchanges
## Method 2: Using DAP Monitor (Recommended for detailed analysis)
1. **Start MicroPython debugpy server:**
```bash
~/micropython2/ports/unix/build-standard/micropython test_vscode.py
```
2. **In another terminal, start the DAP monitor:**
```bash
python3 dap_monitor.py
```
The monitor listens on port 5679 and forwards to port 5678
3. **Connect VS Code to the monitor:**
- Modify your VS Code launch config to connect to port `5679` instead of `5678`
- Or create a new launch config:
```json
{
"name": "Debug via Monitor",
"type": "python",
"request": "attach",
"connect": {
"host": "127.0.0.1",
"port": 5679
}
}
```
4. **Analyze the complete DAP conversation** in the monitor terminal
## VS Code Debug Logging
Enable VS Code's built-in DAP logging:
1. **Open VS Code settings** (Ctrl+,)
2. **Search for:** `debug.console.verbosity`
3. **Set to:** `verbose`
4. **Also set:** `debug.allowBreakpointsEverywhere` to `true`
## Common Issues to Look For
1. **Missing required DAP capabilities** - check the `initialize` response
2. **Breakpoint verification failures** - look for `setBreakpoints` exchanges
3. **Thread/stack frame issues** - check `stackTrace` and `scopes` responses
4. **Evaluation problems** - monitor `evaluate` request/response pairs
## Expected DAP Sequence
A successful debug session should show this sequence:
1. `initialize` request → response with capabilities
2. `initialized` event
3. `setBreakpoints` request → response with verified breakpoints
4. `configurationDone` request → response
5. `attach` request → response
6. When execution hits breakpoint: `stopped` event
7. `stackTrace` request → response with frames
8. `scopes` request → response with local/global scopes
9. `continue` request → response to resume
If any step fails or is missing, that's where the issue lies.

Wyświetl plik

@ -0,0 +1,6 @@
metadata(
description="MicroPython implementation of debugpy for remote debugging",
version="0.1.0",
)
package("debugpy")

Wyświetl plik

@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""Test script for VS Code debugging with MicroPython debugpy."""
import sys
sys.path.insert(0, '.')
import debugpy
foo = 42
bar = "Hello, MicroPython!"
def fibonacci(n):
"""Calculate fibonacci number (iterative for efficiency)."""
if n <= 1:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
def debuggable_code():
"""The actual code we want to debug - wrapped in a function so sys.settrace will trace it."""
global foo
print("Starting debuggable code...")
# Test data - set breakpoint here (using smaller numbers to avoid slow fibonacci)
numbers = [3, 4, 5]
for i, num in enumerate(numbers):
print(f"Calculating fibonacci({num})...")
result = fibonacci(num) # <-- SET BREAKPOINT HERE (line 26)
foo += result # Modify foo to see if it gets traced
print(f"fibonacci({num}) = {result}")
print(sys.implementation)
import machine
print(dir(machine))
# Test manual breakpoint
print("\nTriggering manual breakpoint...")
debugpy.breakpoint()
print("Manual breakpoint triggered!")
print("Test completed successfully!")
def main():
print("MicroPython VS Code Debugging Test")
print("==================================")
# Start debug server
try:
debugpy.listen()
print("Debug server attached on 127.0.0.1:5678")
print("Connecting back to VS Code debugger now...")
# print("Set a breakpoint on line 26: 'result = fibonacci(num)'")
# print("Press Enter to continue after connecting debugger...")
# try:
# input()
# except:
# pass
# Enable debugging for this thread
debugpy.debug_this_thread()
# Give VS Code a moment to set breakpoints after attach
print("\nGiving VS Code time to set breakpoints...")
import time
time.sleep(2)
# Call the debuggable code function so it gets traced
debuggable_code()
except KeyboardInterrupt:
print("\nTest interrupted by user")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,22 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Attach to MicroPython",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
],
"logToFile": true,
"justMyCode": false
}
]
}