Porównaj commity

...

16 Commity

Autor SHA1 Wiadomość Data
Andrew Leech 7f10dc86cf
Merge f2c95fda80 into 583bc0da70 2024-04-30 09:06:17 -04:00
Angus Gratton 583bc0da70 usb: Add USB device support packages.
These packages build on top of machine.USBDevice() to provide high level
and flexible support for implementing USB devices in Python code.

Additional credits, as per included copyright notices:

- CDC support based on initial implementation by @hoihu with fixes by
  @linted.

- MIDI support based on initial implementation by @paulhamsh.

- HID keypad example based on work by @turmoni.

- Everyone who tested and provided feedback on early versions of these
  packages.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-04-30 15:57:50 +10:00
Andrew Leech f2c95fda80 os.path: Add stub realpath() function. 2023-08-21 12:27:59 +09:30
Andrew Leech c3870d36fb pdb: Initial micropython support. 2023-08-21 12:27:59 +09:30
Andrew Leech 109711d911 cmd: Add (unused) completekey arg for compatibility. 2023-08-21 12:27:59 +09:30
Oliver Robson 7444302ac6 codeop: Initial micropython support. 2023-08-21 12:27:59 +09:30
Oliver Robson ae19d103b3 code: Initial micropython support. 2023-08-21 12:27:59 +09:30
Andrew Leech eb30d317f3 pdb: Add module from cpython 3.11.
source: https://github.com/python/cpython/blob/3.11/Lib/pdb.py
2023-08-21 12:27:59 +09:30
Oliver Robson de6b87184b codeop: Add module from cpython 3.11.
source: https://github.com/python/cpython/blob/3.11/Lib/codeop.py
2023-08-21 12:27:59 +09:30
Oliver Robson 979819a82b code: Add module from cpython 3.11.
source: https://github.com/python/cpython/blob/3.11/Lib/code.py
2023-08-21 12:27:59 +09:30
Andrew Leech 448c38618f bdb: Add initial micropython support.
Requires micropython to be compiled with MICROPY_PY_SYS_SETTRACE.
Also requires https://github.com/micropython/micropython/pull/8767
2023-08-21 12:27:59 +09:30
Andrew Leech 1fe75b8544 tokenize: Add minimal stub to support linecache. 2023-08-21 12:27:59 +09:30
Andrew Leech 796a5b93f5 linecache: Add module from cpython 3.11.
source: https://github.com/python/cpython/blob/3.11/Lib/linecache.py
2023-08-21 12:27:59 +09:30
Andrew Leech 023c58bb59 bdb: Add bdb (debugging) module from cpython 3.10.
Not yet updated for micropython.
Source: https://raw.githubusercontent.com/python/cpython/3.10/Lib/bdb.py
2023-08-21 12:27:59 +09:30
Andrew Leech aac74a8b0a stdlib/os: Provide namedtuple response for os.stat(). 2023-08-21 12:13:56 +09:30
Andrew Leech 82501d721f stdlib/os: Update method of extending builtin os. 2023-08-21 12:08:30 +09:30
40 zmienionych plików z 6382 dodań i 3 usunięć

Wyświetl plik

@ -0,0 +1,136 @@
# USB Drivers
These packages allow implementing USB functionality on a MicroPython system
using pure Python code.
Currently only USB device is implemented, not USB host.
## USB Device support
### Support
USB Device support depends on the low-level
[machine.USBDevice](https://docs.micropython.org/en/latest/library/machine.USBDevice.html)
class. This class is new and not supported on all ports, so please check the
documentation for your MicroPython version. It is possible to implement a USB
device using only the low-level USBDevice class. However, the packages here are
higher level and easier to use.
For more information about how to install packages, or "freeze" them into a
firmware image, consult the [MicroPython documentation on "Package
management"](https://docs.micropython.org/en/latest/reference/packages.html).
### Examples
The [examples/device](examples/device) directory in this repo has a range of
examples. After installing necessary packages, you can download an example and
run it with `mpremote run EXAMPLE_FILENAME.py` ([mpremote
docs](https://docs.micropython.org/en/latest/reference/mpremote.html#mpremote-command-run)).
#### Unexpected serial disconnects
If you normally connect to your MicroPython device over a USB serial port ("USB
CDC"), then running a USB example will disconnect mpremote when the new USB
device configuration activates and the serial port has to temporarily
disconnect. It is likely that mpremote will print an error. The example should
still start running, if necessary then you can reconnect with mpremote and type
Ctrl-B to restore the MicroPython REPL and/or Ctrl-C to stop the running
example.
If you use `mpremote run` again while a different USB device configuration is
already active, then the USB serial port may disconnect immediately before the
example runs. This is because mpremote has to soft-reset MicroPython, and when
the existing USB device is reset then the entire USB port needs to reset. If
this happens, run the same `mpremote run` command again.
We plan to add features to `mpremote` so that this limitation is less
disruptive. Other tools that communicate with MicroPython over the serial port
will encounter similar issues when runtime USB is in use.
### Initialising runtime USB
The overall pattern for enabling USB devices at runtime is:
1. Instantiate the Interface objects for your desired USB device.
2. Call `usb.device.get()` to get the singleton object for the high-level USB device.
3. Call `init(...)` to pass the desired interfaces as arguments, plus any custom
keyword arguments to configure the overall device.
An example, similar to [mouse_example.py](examples/device/mouse_example.py):
```py
m = usb.device.mouse.MouseInterface()
usb.device.get().init(m, builtin_driver=True)
```
Setting `builtin_driver=True` means that any built-in USB serial port will still
be available. Otherwise, you may permanently lose access to MicroPython until
the next time the device resets.
See [Unexpected serial disconnects](#Unexpected-serial-disconnects), above, for
an explanation of possible errors or disconnects when the runtime USB device
initialises.
Placing the call to `usb.device.get().init()` into the `boot.py` of the
MicroPython file system allows the runtime USB device to initialise immediately
on boot, before any built-in USB. This is a feature (not a bug) and allows you
full control over the USB device, for example to only enable USB HID and prevent
REPL access to the system.
However, note that calling this function on boot without `builtin_driver=True`
will make the MicroPython USB serial interface permanently inaccessible until
you "safe mode boot" (on supported boards) or completely erase the flash of your
device.
### Package usb-device
This base package contains the common implementation components for the other
packages, and can be used to implement new and different USB interface support.
All of the other `usb-device-<name>` packages depend on this package, and it
will be automatically installed as needed.
Specicially, this package provides the `usb.device.get()` function for accessing
the Device singleton object, and the `usb.device.core` module which contains the
low-level classes and utility functions for implementing new USB interface
drivers in Python. The best examples of how to use the core classes is the
source code of the other USB device packages.
### Package usb-device-keyboard
This package provides the `usb.device.keyboard` module. See
[keyboard_example.py](examples/device/keyboard_example.py) for an example
program.
### Package usb-device-mouse
This package provides the `usb.device.mouse` module. See
[mouse_example.py](examples/device/mouse_example.py) for an example program.
### Package usb-device-hid
This package provides the `usb.device.hid` module. USB HID (Human Interface
Device) class allows creating a wide variety of device types. The most common
are mouse and keyboard, which have their own packages in micropython-lib.
However, using the usb-device-hid package directly allows creation of any kind
of HID device.
See [hid_custom_keypad_example.py](examples/device/hid_custom_keypad_example.py)
for an example of a Keypad HID device with a custom HID descriptor.
### Package usb-device-cdc
This package provides the `usb.device.cdc` module. USB CDC (Communications
Device Class) is most commonly used for virtual serial port USB interfaces, and
that is what is supported here.
The example [cdc_repl_example.py](examples/device/cdc_repl_example.py)
demonstrates how to add a second USB serial interface and duplicate the
MicroPython REPL between the two.
### Package usb-device-midi
This package provides the `usb.device.midi` module. This allows implementing
USB MIDI devices in MicroPython.
The example [midi_example.py](examples/device/midi_example.py) demonstrates how
to create a simple MIDI device to send MIDI data to and from the USB host.

Wyświetl plik

@ -0,0 +1,47 @@
# MicroPython USB CDC REPL example
#
# Example demonstrating how to use os.dupterm() to provide the
# MicroPython REPL on a dynamic CDCInterface() serial port.
#
# To run this example:
#
# 1. Make sure `usb-device-cdc` is installed via: mpremote mip install usb-device-cdc
#
# 2. Run the example via: mpremote run cdc_repl_example.py
#
# 3. mpremote will exit with an error after the previous step, because when the
# example runs the existing USB device disconnects and then re-enumerates with
# the second serial port. If you check (for example by running mpremote connect
# list) then you should now see two USB serial devices.
#
# 4. Connect to one of the new ports: mpremote connect PORTNAME
#
# It may be necessary to type Ctrl-B to exit the raw REPL mode and resume the
# interactive REPL after mpremote connects.
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
import os
import time
import usb.device
from usb.device.cdc import CDCInterface
cdc = CDCInterface()
cdc.init(timeout=0) # zero timeout makes this non-blocking, suitable for os.dupterm()
# pass builtin_driver=True so that we get the built-in USB-CDC alongside,
# if it's available.
usb.device.get().init(cdc, builtin_driver=True)
print("Waiting for USB host to configure the interface...")
# wait for host enumerate as a CDC device...
while not cdc.is_open():
time.sleep_ms(100)
# Note: This example doesn't wait for the host to access the new CDC port,
# which could be done by polling cdc.dtr, as this will block the REPL
# from resuming while this code is still executing.
print("CDC port enumerated, duplicating REPL...")
old_term = os.dupterm(cdc)

Wyświetl plik

@ -0,0 +1,144 @@
# MicroPython USB HID custom Keypad example
#
# This example demonstrates creating a custom HID device with its own
# HID descriptor, in this case for a USB number keypad.
#
# For higher level examples that require less code to use, see mouse_example.py
# and keyboard_example.py
#
# To run this example:
#
# 1. Make sure `usb-device-hid` is installed via: mpremote mip install usb-device-hid
#
# 2. Run the example via: mpremote run hid_custom_keypad_example.py
#
# 3. mpremote will exit with an error after the previous step, because when the
# example runs the existing USB device disconnects and then re-enumerates with
# the custom HID interface present. At this point, the example is running.
#
# 4. To see output from the example, re-connect: mpremote connect PORTNAME
#
# MIT license; Copyright (c) 2023 Dave Wickham, 2023-2024 Angus Gratton
from micropython import const
import time
import usb.device
from usb.device.hid import HIDInterface
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
def keypad_example():
k = KeypadInterface()
usb.device.get().init(k, builtin_driver=True)
while not k.is_open():
time.sleep_ms(100)
while True:
time.sleep(2)
print("Press NumLock...")
k.send_key("<NumLock>")
time.sleep_ms(100)
k.send_key()
time.sleep(1)
# continue
print("Press ...")
for _ in range(3):
time.sleep(0.1)
k.send_key(".")
time.sleep(0.1)
k.send_key()
print("Starting again...")
class KeypadInterface(HIDInterface):
# Very basic synchronous USB keypad HID interface
def __init__(self):
super().__init__(
_KEYPAD_REPORT_DESC,
set_report_buf=bytearray(1),
protocol=_INTERFACE_PROTOCOL_KEYBOARD,
interface_str="MicroPython Keypad",
)
self.numlock = False
def on_set_report(self, report_data, _report_id, _report_type):
report = report_data[0]
b = bool(report & 1)
if b != self.numlock:
print("Numlock: ", b)
self.numlock = b
def send_key(self, key=None):
if key is None:
self.send_report(b"\x00")
else:
self.send_report(_key_to_id(key).to_bytes(1, "big"))
# See HID Usages and Descriptions 1.4, section 10 Keyboard/Keypad Page (0x07)
#
# This keypad example has a contiguous series of keys (KEYPAD_KEY_IDS) starting
# from the NumLock/Clear keypad key (0x53), but you can send any Key IDs from
# the table in the HID Usages specification.
_KEYPAD_KEY_OFFS = const(0x53)
_KEYPAD_KEY_IDS = [
"<NumLock>",
"/",
"*",
"-",
"+",
"<Enter>",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"0",
".",
]
def _key_to_id(key):
# This is a little slower than making a dict for lookup, but uses
# less memory and O(n) can be fast enough when n is small.
return _KEYPAD_KEY_IDS.index(key) + _KEYPAD_KEY_OFFS
# HID Report descriptor for a numeric keypad
#
# fmt: off
_KEYPAD_REPORT_DESC = (
b'\x05\x01' # Usage Page (Generic Desktop)
b'\x09\x07' # Usage (Keypad)
b'\xA1\x01' # Collection (Application)
b'\x05\x07' # Usage Page (Keypad)
b'\x19\x00' # Usage Minimum (0)
b'\x29\xFF' # Usage Maximum (ff)
b'\x15\x00' # Logical Minimum (0)
b'\x25\xFF' # Logical Maximum (ff)
b'\x95\x01' # Report Count (1),
b'\x75\x08' # Report Size (8),
b'\x81\x00' # Input (Data, Array, Absolute)
b'\x05\x08' # Usage page (LEDs)
b'\x19\x01' # Usage Minimum (1)
b'\x29\x01' # Usage Maximum (1),
b'\x95\x01' # Report Count (1),
b'\x75\x01' # Report Size (1),
b'\x91\x02' # Output (Data, Variable, Absolute)
b'\x95\x01' # Report Count (1),
b'\x75\x07' # Report Size (7),
b'\x91\x01' # Output (Constant) - padding bits
b'\xC0' # End Collection
)
# fmt: on
keypad_example()

Wyświetl plik

@ -0,0 +1,97 @@
# MicroPython USB Keyboard example
#
# To run this example:
#
# 1. Check the KEYS assignment below, and connect buttons or switches to the
# assigned GPIOs. You can change the entries as needed, look up the reference
# for your board to see what pins are available. Note that the example uses
# "active low" logic, so pressing a switch or button should switch the
# connected pin to Ground (0V).
#
# 2. Make sure `usb-device-keyboard` is installed via: mpremote mip install usb-device-keyboard
#
# 3. Run the example via: mpremote run keyboard_example.py
#
# 4. mpremote will exit with an error after the previous step, because when the
# example runs the existing USB device disconnects and then re-enumerates with
# the keyboard interface present. At this point, the example is running.
#
# 5. The example doesn't print anything to the serial port, but to stop it first
# re-connect: mpremote connect PORTNAME
#
# 6. Type Ctrl-C to interrupt the running example and stop it. You may have to
# also type Ctrl-B to restore the interactive REPL.
#
# To implement a keyboard with different USB HID characteristics, copy the
# usb-device-keyboard/usb/device/keyboard.py file into your own project and modify
# KeyboardInterface.
#
# MIT license; Copyright (c) 2024 Angus Gratton
import usb.device
from usb.device.keyboard import KeyboardInterface, KeyCode, LEDCode
from machine import Pin
import time
# Tuples mapping Pin inputs to the KeyCode each input generates
#
# (Big keyboards usually multiplex multiple keys per input with a scan matrix,
# but this is a simple example.)
KEYS = (
(Pin.cpu.GPIO10, KeyCode.CAPS_LOCK),
(Pin.cpu.GPIO11, KeyCode.LEFT_SHIFT),
(Pin.cpu.GPIO12, KeyCode.M),
(Pin.cpu.GPIO13, KeyCode.P),
# ... add more pin to KeyCode mappings here if needed
)
# Tuples mapping Pin outputs to the LEDCode that turns the output on
LEDS = (
(Pin.board.LED, LEDCode.CAPS_LOCK),
# ... add more pin to LEDCode mappings here if needed
)
class ExampleKeyboard(KeyboardInterface):
def on_led_update(self, led_mask):
# print(hex(led_mask))
for pin, code in LEDS:
# Set the pin high if 'code' bit is set in led_mask
pin(code & led_mask)
def keyboard_example():
# Initialise all the pins as active-low inputs with pullup resistors
for pin, _ in KEYS:
pin.init(Pin.IN, Pin.PULL_UP)
# Initialise all the LEDs as active-high outputs
for pin, _ in LEDS:
pin.init(Pin.OUT, value=0)
# Register the keyboard interface and re-enumerate
k = ExampleKeyboard()
usb.device.get().init(k, builtin_driver=True)
print("Entering keyboard loop...")
keys = [] # Keys held down, reuse the same list object
prev_keys = [None] # Previous keys, starts with a dummy value so first
# iteration will always send
while True:
if k.is_open():
keys.clear()
for pin, code in KEYS:
if not pin(): # active-low
keys.append(code)
if keys != prev_keys:
# print(keys)
k.send_keys(keys)
prev_keys.clear()
prev_keys.extend(keys)
# This simple example scans each input in an infinite loop, but a more
# complex implementation would probably use a timer or similar.
time.sleep_ms(1)
keyboard_example()

Wyświetl plik

@ -0,0 +1,78 @@
# MicroPython USB MIDI example
#
# This example demonstrates creating a custom MIDI device.
#
# To run this example:
#
# 1. Make sure `usb-device-midi` is installed via: mpremote mip install usb-device-midi
#
# 2. Run the example via: mpremote run midi_example.py
#
# 3. mpremote will exit with an error after the previous step, because when the
# example runs the existing USB device disconnects and then re-enumerates with
# the MIDI interface present. At this point, the example is running.
#
# 4. To see output from the example, re-connect: mpremote connect PORTNAME
#
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
import usb.device
from usb.device.midi import MIDIInterface
import time
class MIDIExample(MIDIInterface):
# Very simple example event handler functions, showing how to receive note
# and control change messages sent from the host to the device.
#
# If you need to send MIDI data to the host, then it's fine to instantiate
# MIDIInterface class directly.
def on_open(self):
super().on_open()
print("Device opened by host")
def on_note_on(self, channel, pitch, vel):
print(f"RX Note On channel {channel} pitch {pitch} velocity {vel}")
def on_note_off(self, channel, pitch, vel):
print(f"RX Note Off channel {channel} pitch {pitch} velocity {vel}")
def on_control_change(self, channel, controller, value):
print(f"RX Control channel {channel} controller {controller} value {value}")
m = MIDIExample()
# Remove builtin_driver=True if you don't want the MicroPython serial REPL available.
usb.device.get().init(m, builtin_driver=True)
print("Waiting for USB host to configure the interface...")
while not m.is_open():
time.sleep_ms(100)
print("Starting MIDI loop...")
# TX constants
CHANNEL = 0
PITCH = 60
CONTROLLER = 64
control_val = 0
while m.is_open():
time.sleep(1)
print(f"TX Note On channel {CHANNEL} pitch {PITCH}")
m.note_on(CHANNEL, PITCH) # Velocity is an optional third argument
time.sleep(0.5)
print(f"TX Note Off channel {CHANNEL} pitch {PITCH}")
m.note_off(CHANNEL, PITCH)
time.sleep(1)
print(f"TX Control channel {CHANNEL} controller {CONTROLLER} value {control_val}")
m.control_change(CHANNEL, CONTROLLER, control_val)
control_val += 1
if control_val == 0x7F:
control_val = 0
time.sleep(1)
print("USB host has reset device, example done.")

Wyświetl plik

@ -0,0 +1,52 @@
# MicroPython USB Mouse example
#
# To run this example:
#
# 1. Make sure `usb-device-mouse` is installed via: mpremote mip install usb-device-mouse
#
# 2. Run the example via: mpremote run mouse_example.py
#
# 3. mpremote will exit with an error after the previous step, because when the
# example runs the existing USB device disconnects and then re-enumerates with
# the mouse interface present. At this point, the example is running.
#
# 4. You should see the mouse move and right click. At this point, the example
# is finished executing.
#
# To implement a more complex mouse with more buttons or other custom interface
# features, copy the usb-device-mouse/usb/device/mouse.py file into your own
# project and modify MouseInterface.
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
import time
import usb.device
from usb.device.mouse import MouseInterface
def mouse_example():
m = MouseInterface()
# Note: builtin_driver=True means that if there's a USB-CDC REPL
# available then it will appear as well as the HID device.
usb.device.get().init(m, builtin_driver=True)
# wait for host to enumerate as a HID device...
while not m.is_open():
time.sleep_ms(100)
time.sleep_ms(2000)
print("Moving...")
m.move_by(-100, 0)
m.move_by(-100, 0)
time.sleep_ms(500)
print("Clicking...")
m.click_right(True)
time.sleep_ms(200)
m.click_right(False)
print("Done!")
mouse_example()

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device")
package("usb")

Wyświetl plik

@ -0,0 +1,437 @@
# MicroPython USB CDC module
# MIT license; Copyright (c) 2022 Martin Fischer, 2023-2024 Angus Gratton
import io
import time
import errno
import machine
import struct
from micropython import const
from .core import Interface, Buffer, split_bmRequestType
_EP_IN_FLAG = const(1 << 7)
# Control transfer stages
_STAGE_IDLE = const(0)
_STAGE_SETUP = const(1)
_STAGE_DATA = const(2)
_STAGE_ACK = const(3)
# Request types
_REQ_TYPE_STANDARD = const(0x0)
_REQ_TYPE_CLASS = const(0x1)
_REQ_TYPE_VENDOR = const(0x2)
_REQ_TYPE_RESERVED = const(0x3)
_DEV_CLASS_MISC = const(0xEF)
_CS_DESC_TYPE = const(0x24) # CS Interface type communication descriptor
# CDC control interface definitions
_INTERFACE_CLASS_CDC = const(2)
_INTERFACE_SUBCLASS_CDC = const(2) # Abstract Control Mode
_PROTOCOL_NONE = const(0) # no protocol
# CDC descriptor subtype
# see also CDC120.pdf, table 13
_CDC_FUNC_DESC_HEADER = const(0)
_CDC_FUNC_DESC_CALL_MANAGEMENT = const(1)
_CDC_FUNC_DESC_ABSTRACT_CONTROL = const(2)
_CDC_FUNC_DESC_UNION = const(6)
# CDC class requests, table 13, PSTN subclass
_SET_LINE_CODING_REQ = const(0x20)
_GET_LINE_CODING_REQ = const(0x21)
_SET_CONTROL_LINE_STATE = const(0x22)
_SEND_BREAK_REQ = const(0x23)
_LINE_CODING_STOP_BIT_1 = const(0)
_LINE_CODING_STOP_BIT_1_5 = const(1)
_LINE_CODING_STOP_BIT_2 = const(2)
_LINE_CODING_PARITY_NONE = const(0)
_LINE_CODING_PARITY_ODD = const(1)
_LINE_CODING_PARITY_EVEN = const(2)
_LINE_CODING_PARITY_MARK = const(3)
_LINE_CODING_PARITY_SPACE = const(4)
_LINE_STATE_DTR = const(1)
_LINE_STATE_RTS = const(2)
_PARITY_BITS_REPR = "NOEMS"
_STOP_BITS_REPR = ("1", "1.5", "2")
# Other definitions
_CDC_VERSION = const(0x0120) # release number in binary-coded decimal
# Number of endpoints in each interface
_CDC_CONTROL_EP_NUM = const(1)
_CDC_DATA_EP_NUM = const(2)
# CDC data interface definitions
_CDC_ITF_DATA_CLASS = const(0xA)
_CDC_ITF_DATA_SUBCLASS = const(0)
_CDC_ITF_DATA_PROT = const(0) # no protocol
# Length of the bulk transfer endpoints. Maybe should be configurable?
_BULK_EP_LEN = const(64)
# MicroPython error constants (negated as IOBase.ioctl uses negative return values for error codes)
# these must match values in py/mperrno.h
_MP_EINVAL = const(-22)
_MP_ETIMEDOUT = const(-110)
# MicroPython stream ioctl requests, same as py/stream.h
_MP_STREAM_FLUSH = const(1)
_MP_STREAM_POLL = const(3)
# MicroPython ioctl poll values, same as py/stream.h
_MP_STREAM_POLL_WR = const(0x04)
_MP_STREAM_POLL_RD = const(0x01)
_MP_STREAM_POLL_HUP = const(0x10)
class CDCInterface(io.IOBase, Interface):
# USB CDC serial device class, designed to resemble machine.UART
# with some additional methods.
#
# Relies on multiple inheritance so it can be an io.IOBase for stream
# functions and also a Interface (actually an Interface Association
# Descriptor holding two interfaces.)
def __init__(self, **kwargs):
# io.IOBase has no __init__()
Interface.__init__(self)
# Callbacks for particular control changes initiated by the host
self.break_cb = None # Host sent a "break" condition
self.line_state_cb = None
self.line_coding_cb = None
self._line_state = 0 # DTR & RTS
# Set a default line coding of 115200/8N1
self._line_coding = bytearray(b"\x00\xc2\x01\x00\x00\x00\x08")
self._wb = () # Optional write Buffer (IN endpoint), set by CDC.init()
self._rb = () # Optional read Buffer (OUT endpoint), set by CDC.init()
self._timeout = 1000 # set from CDC.init() as well
# one control interface endpoint, two data interface endpoints
self.ep_c_in = self.ep_d_in = self.ep_d_out = None
self._c_itf = None # Number of control interface, data interface is one more
self.init(**kwargs)
def init(
self, baudrate=9600, bits=8, parity="N", stop=1, timeout=None, txbuf=256, rxbuf=256, flow=0
):
# Configure the CDC serial port. Note that many of these settings like
# baudrate, bits, parity, stop don't change the USB-CDC device behavior
# at all, only the "line coding" as communicated from/to the USB host.
# Store initial line coding parameters in the USB CDC binary format
# (there is nothing implemented to further change these from Python
# code, the USB host sets them.)
struct.pack_into(
"<LBBB",
self._line_coding,
0,
baudrate,
_STOP_BITS_REPR.index(str(stop)),
_PARITY_BITS_REPR.index(parity),
bits,
)
if flow != 0:
raise NotImplementedError # UART flow control currently not supported
if not (txbuf and rxbuf):
raise ValueError # Buffer sizes are required
self._timeout = timeout
self._wb = Buffer(txbuf)
self._rb = Buffer(rxbuf)
###
### Line State & Line Coding State property getters
###
@property
def rts(self):
return bool(self._line_state & _LINE_STATE_RTS)
@property
def dtr(self):
return bool(self._line_state & _LINE_STATE_DTR)
# Line Coding Representation
# Byte 0-3 Byte 4 Byte 5 Byte 6
# dwDTERate bCharFormat bParityType bDataBits
@property
def baudrate(self):
return struct.unpack("<LBBB", self._line_coding)[0]
@property
def stop_bits(self):
return _STOP_BITS_REPR[self._line_coding[4]]
@property
def parity(self):
return _PARITY_BITS_REPR[self._line_coding[5]]
@property
def data_bits(self):
return self._line_coding[6]
def __repr__(self):
return f"{self.baudrate}/{self.data_bits}{self.parity}{self.stop_bits} rts={self.rts} dtr={self.dtr}"
###
### Set callbacks for operations initiated by the host
###
def set_break_cb(self, cb):
self.break_cb = cb
def set_line_state_cb(self, cb):
self.line_state_cb = cb
def set_line_coding_cb(self, cb):
self.line_coding_cb = cb
###
### USB Interface Implementation
###
def desc_cfg(self, desc, itf_num, ep_num, strs):
# CDC needs a Interface Association Descriptor (IAD) wrapping two interfaces: Control & Data interfaces
desc.interface_assoc(itf_num, 2, _INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC)
# Now add the Control interface descriptor
self._c_itf = itf_num
desc.interface(itf_num, _CDC_CONTROL_EP_NUM, _INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC)
# Append the CDC class-specific interface descriptor
# see CDC120-track, p20
desc.pack(
"<BBBH",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_HEADER, # bDescriptorSubtype
_CDC_VERSION, # cdc version
)
# CDC-PSTN table3 "Call Management"
# set to No
desc.pack(
"<BBBBB",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_CALL_MANAGEMENT, # bDescriptorSubtype
0, # bmCapabilities - XXX no call managment so far
itf_num + 1, # bDataInterface - interface 1
)
# CDC-PSTN table4 "Abstract Control"
# set to support line_coding and send_break
desc.pack(
"<BBBB",
4, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_ABSTRACT_CONTROL, # bDescriptorSubtype
0x6, # bmCapabilities D1, D2
)
# CDC-PSTN "Union"
# set control interface / data interface number
desc.pack(
"<BBBBB",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_UNION, # bDescriptorSubtype
itf_num, # bControlInterface
itf_num + 1, # bSubordinateInterface0 (data class itf number)
)
# Single control IN endpoint (currently unused in this implementation)
self.ep_c_in = ep_num | _EP_IN_FLAG
desc.endpoint(self.ep_c_in, "interrupt", 8, 16)
# Now add the data interface
desc.interface(
itf_num + 1,
_CDC_DATA_EP_NUM,
_CDC_ITF_DATA_CLASS,
_CDC_ITF_DATA_SUBCLASS,
_CDC_ITF_DATA_PROT,
)
# Two data endpoints, bulk OUT and IN
self.ep_d_out = ep_num + 1
self.ep_d_in = (ep_num + 1) | _EP_IN_FLAG
desc.endpoint(self.ep_d_out, "bulk", _BULK_EP_LEN, 0)
desc.endpoint(self.ep_d_in, "bulk", _BULK_EP_LEN, 0)
def num_itfs(self):
return 2
def num_eps(self):
return 2 # total after masking out _EP_IN_FLAG
def on_open(self):
super().on_open()
# kick off any transfers that may have queued while the device was not open
self._rd_xfer()
self._wr_xfer()
def on_interface_control_xfer(self, stage, request):
# Handle class-specific interface control transfers
bmRequestType, bRequest, wValue, wIndex, wLength = struct.unpack("BBHHH", request)
recipient, req_type, req_dir = split_bmRequestType(bmRequestType)
if wIndex != self._c_itf:
return False # Only for the control interface (may be redundant check?)
if req_type != _REQ_TYPE_CLASS:
return False # Unsupported request type
if stage == _STAGE_SETUP:
if bRequest in (_SET_LINE_CODING_REQ, _GET_LINE_CODING_REQ):
return self._line_coding # Buffer to read or write
# Continue on other supported requests, stall otherwise
return bRequest in (_SET_CONTROL_LINE_STATE, _SEND_BREAK_REQ)
if stage == _STAGE_ACK:
if bRequest == _SET_LINE_CODING_REQ:
if self.line_coding_cb:
self.line_coding_cb(self._line_coding)
elif bRequest == _SET_CONTROL_LINE_STATE:
self._line_state = wValue
if self.line_state_cb:
self.line_state_cb(wValue)
elif bRequest == _SEND_BREAK_REQ:
if self.break_cb:
self.break_cb(wValue)
return True # allow DATA/ACK stages to complete normally
def _wr_xfer(self):
# Submit a new data IN transfer from the _wb buffer, if needed
if self.is_open() and not self.xfer_pending(self.ep_d_in) and self._wb.readable():
self.submit_xfer(self.ep_d_in, self._wb.pend_read(), self._wr_cb)
def _wr_cb(self, ep, res, num_bytes):
# Whenever a data IN transfer ends
if res == 0:
self._wb.finish_read(num_bytes)
self._wr_xfer()
def _rd_xfer(self):
# Keep an active data OUT transfer to read data from the host,
# whenever the receive buffer has room for new data
if self.is_open() and not self.xfer_pending(self.ep_d_out) and self._rb.writable():
# Can only submit up to the endpoint length per transaction, otherwise we won't
# get any transfer callback until the full transaction completes.
self.submit_xfer(self.ep_d_out, self._rb.pend_write(_BULK_EP_LEN), self._rd_cb)
def _rd_cb(self, ep, res, num_bytes):
# Whenever a data OUT transfer ends
if res == 0:
self._rb.finish_write(num_bytes)
self._rd_xfer()
###
### io.IOBase stream implementation
###
def write(self, buf):
# use a memoryview to track how much of 'buf' we've written so far
# (unfortunately, this means a 1 block allocation for each write, but it's otherwise allocation free.)
start = time.ticks_ms()
mv = memoryview(buf)
while True:
# Keep pushing buf into _wb into it's all gone
nbytes = self._wb.write(mv)
self._wr_xfer() # make sure a transfer is running from _wb
if nbytes == len(mv):
return len(buf) # Success
mv = mv[nbytes:]
# check for timeout
if time.ticks_diff(time.ticks_ms(), start) >= self._timeout:
return len(buf) - len(mv)
machine.idle()
def read(self, size):
start = time.ticks_ms()
# Allocate a suitable buffer to read into
if size >= 0:
b = bytearray(size)
else:
# for size == -1, return however many bytes are ready
b = bytearray(self._rb.readable())
n = self._readinto(b, start)
if not n:
return None
if n < len(b):
return b[:n]
return b
def readinto(self, b):
return self._readinto(b, time.ticks_ms())
def _readinto(self, b, start):
if len(b) == 0:
return 0
n = 0
m = memoryview(b)
while n < len(b):
# copy out of the read buffer if there is anything available
if self._rb.readable():
n += self._rb.readinto(m if n == 0 else m[n:])
self._rd_xfer() # if _rd was previously full, no transfer will be running
if n == len(b):
break # Done, exit before we call machine.idle()
if time.ticks_diff(time.ticks_ms(), start) >= self._timeout:
break # Timed out
machine.idle()
return n or None
def ioctl(self, req, arg):
if req == _MP_STREAM_POLL:
return (
(_MP_STREAM_POLL_WR if (arg & _MP_STREAM_POLL_WR) and self._wb.writable() else 0)
| (_MP_STREAM_POLL_RD if (arg & _MP_STREAM_POLL_RD) and self._rb.readable() else 0)
|
# using the USB level "open" (i.e. connected to host) for !HUP, not !DTR (port is open)
(_MP_STREAM_POLL_HUP if (arg & _MP_STREAM_POLL_HUP) and not self.is_open() else 0)
)
elif req == _MP_STREAM_FLUSH:
start = time.ticks_ms()
# Wait until write buffer contains no bytes for the lower TinyUSB layer to "read"
while self._wb.readable():
if not self.is_open():
return _MP_EINVAL
if time.ticks_diff(time.ticks_ms(), start) > self._timeout:
return _MP_ETIMEDOUT
machine.idle()
return 0
return _MP_EINVAL
def flush(self):
# a C implementation of this exists in stream.c, but it's not in io.IOBase
# and can't immediately be called from here (AFAIK)
r = self.ioctl(_MP_STREAM_FLUSH, 0)
if r:
raise OSError(r)

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device")
package("usb")

Wyświetl plik

@ -0,0 +1,232 @@
# MicroPython USB hid module
#
# This implements a base HIDInterface class that can be used directly,
# or subclassed into more specific HID interface types.
#
# MIT license; Copyright (c) 2023 Angus Gratton
from micropython import const
import machine
import struct
import time
from .core import Interface, Descriptor, split_bmRequestType
_EP_IN_FLAG = const(1 << 7)
# Control transfer stages
_STAGE_IDLE = const(0)
_STAGE_SETUP = const(1)
_STAGE_DATA = const(2)
_STAGE_ACK = const(3)
# Request types
_REQ_TYPE_STANDARD = const(0x0)
_REQ_TYPE_CLASS = const(0x1)
_REQ_TYPE_VENDOR = const(0x2)
_REQ_TYPE_RESERVED = const(0x3)
# Descriptor types
_DESC_HID_TYPE = const(0x21)
_DESC_REPORT_TYPE = const(0x22)
_DESC_PHYSICAL_TYPE = const(0x23)
# Interface and protocol identifiers
_INTERFACE_CLASS = const(0x03)
_INTERFACE_SUBCLASS_NONE = const(0x00)
_INTERFACE_SUBCLASS_BOOT = const(0x01)
_INTERFACE_PROTOCOL_NONE = const(0x00)
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
_INTERFACE_PROTOCOL_MOUSE = const(0x02)
# bRequest values for HID control requests
_REQ_CONTROL_GET_REPORT = const(0x01)
_REQ_CONTROL_GET_IDLE = const(0x02)
_REQ_CONTROL_GET_PROTOCOL = const(0x03)
_REQ_CONTROL_GET_DESCRIPTOR = const(0x06)
_REQ_CONTROL_SET_REPORT = const(0x09)
_REQ_CONTROL_SET_IDLE = const(0x0A)
_REQ_CONTROL_SET_PROTOCOL = const(0x0B)
# Standard descriptor lengths
_STD_DESC_INTERFACE_LEN = const(9)
_STD_DESC_ENDPOINT_LEN = const(7)
class HIDInterface(Interface):
# Abstract base class to implement a USB device HID interface in Python.
def __init__(
self,
report_descriptor,
extra_descriptors=[],
set_report_buf=None,
protocol=_INTERFACE_PROTOCOL_NONE,
interface_str=None,
):
# Construct a new HID interface.
#
# - report_descriptor is the only mandatory argument, which is the binary
# data consisting of the HID Report Descriptor. See Device Class
# Definition for Human Interface Devices (HID) v1.11 section 6.2.2 Report
# Descriptor, p23.
#
# - extra_descriptors is an optional argument holding additional HID
# descriptors, to append after the mandatory report descriptor. Most
# HID devices do not use these.
#
# - set_report_buf is an optional writable buffer object (i.e.
# bytearray), where SET_REPORT requests from the host can be
# written. Only necessary if the report_descriptor contains Output
# entries. If set, the size must be at least the size of the largest
# Output entry.
#
# - protocol can be set to a specific value as per HID v1.11 section 4.3 Protocols, p9.
#
# - interface_str is an optional string descriptor to associate with the HID USB interface.
super().__init__()
self.report_descriptor = report_descriptor
self.extra_descriptors = extra_descriptors
self._set_report_buf = set_report_buf
self.protocol = protocol
self.interface_str = interface_str
self._int_ep = None # set during enumeration
def get_report(self):
return False
def on_set_report(self, report_data, report_id, report_type):
# Override this function in order to handle SET REPORT requests from the host,
# where it sends data to the HID device.
#
# This function will only be called if the Report descriptor contains at least one Output entry,
# and the set_report_buf argument is provided to the constructor.
#
# Return True to complete the control transfer normally, False to abort it.
return True
def busy(self):
# Returns True if the interrupt endpoint is busy (i.e. existing transfer is pending)
return self.is_open() and self.xfer_pending(self._int_ep)
def send_report(self, report_data, timeout_ms=100):
# Helper function to send a HID report in the typical USB interrupt
# endpoint associated with a HID interface.
#
# Returns True if successful, False if HID device is not active or timeout
# is reached without being able to queue the report for sending.
deadline = time.ticks_add(time.ticks_ms(), timeout_ms)
while self.busy():
if time.ticks_diff(deadline, time.ticks_ms()) <= 0:
return False
machine.idle()
if not self.is_open():
return False
self.submit_xfer(self._int_ep, report_data)
def desc_cfg(self, desc, itf_num, ep_num, strs):
# Add the standard interface descriptor
desc.interface(
itf_num,
1,
_INTERFACE_CLASS,
_INTERFACE_SUBCLASS_NONE,
self.protocol,
len(strs) if self.interface_str else 0,
)
if self.interface_str:
strs.append(self.interface_str)
# As per HID v1.11 section 7.1 Standard Requests, return the contents of
# the standard HID descriptor before the associated endpoint descriptor.
self.get_hid_descriptor(desc)
# Add the typical single USB interrupt endpoint descriptor associated
# with a HID interface.
self._int_ep = ep_num | _EP_IN_FLAG
desc.endpoint(self._int_ep, "interrupt", 8, 8)
self.idle_rate = 0
self.protocol = 0
def num_eps(self):
return 1
def get_hid_descriptor(self, desc=None):
# Append a full USB HID descriptor from the object's report descriptor
# and optional additional descriptors.
#
# See HID Specification Version 1.1, Section 6.2.1 HID Descriptor p22
l = 9 + 3 * len(self.extra_descriptors) # total length
if desc is None:
desc = Descriptor(bytearray(l))
desc.pack(
"<BBHBBBH",
l, # bLength
_DESC_HID_TYPE, # bDescriptorType
0x111, # bcdHID
0, # bCountryCode
len(self.extra_descriptors) + 1, # bNumDescriptors
0x22, # bDescriptorType, Report
len(self.report_descriptor), # wDescriptorLength, Report
)
# Fill in any additional descriptor type/length pairs
#
# TODO: unclear if this functionality is ever used, may be easier to not
# support in base class
for dt, dd in self.extra_descriptors:
desc.pack("<BH", dt, len(dd))
return desc.b
def on_interface_control_xfer(self, stage, request):
# Handle standard and class-specific interface control transfers for HID devices.
bmRequestType, bRequest, wValue, _, wLength = struct.unpack("BBHHH", request)
recipient, req_type, _ = split_bmRequestType(bmRequestType)
if stage == _STAGE_SETUP:
if req_type == _REQ_TYPE_STANDARD:
# HID Spec p48: 7.1 Standard Requests
if bRequest == _REQ_CONTROL_GET_DESCRIPTOR:
desc_type = wValue >> 8
if desc_type == _DESC_HID_TYPE:
return self.get_hid_descriptor()
if desc_type == _DESC_REPORT_TYPE:
return self.report_descriptor
elif req_type == _REQ_TYPE_CLASS:
# HID Spec p50: 7.2 Class-Specific Requests
if bRequest == _REQ_CONTROL_GET_REPORT:
print("GET_REPORT?")
return False # Unsupported for now
if bRequest == _REQ_CONTROL_GET_IDLE:
return bytes([self.idle_rate])
if bRequest == _REQ_CONTROL_GET_PROTOCOL:
return bytes([self.protocol])
if bRequest in (_REQ_CONTROL_SET_IDLE, _REQ_CONTROL_SET_PROTOCOL):
return True
if bRequest == _REQ_CONTROL_SET_REPORT:
return self._set_report_buf # If None, request will stall
return False # Unsupported request
if stage == _STAGE_ACK:
if req_type == _REQ_TYPE_CLASS:
if bRequest == _REQ_CONTROL_SET_IDLE:
self.idle_rate = wValue >> 8
elif bRequest == _REQ_CONTROL_SET_PROTOCOL:
self.protocol = wValue
elif bRequest == _REQ_CONTROL_SET_REPORT:
report_id = wValue & 0xFF
report_type = wValue >> 8
report_data = self._set_report_buf
if wLength < len(report_data):
# need to truncate the response in the callback if we got less bytes
# than allowed for in the buffer
report_data = memoryview(self._set_report_buf)[:wLength]
self.on_set_report(report_data, report_id, report_type)
return True # allow DATA/ACK stages to complete normally

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device-hid")
package("usb")

Wyświetl plik

@ -0,0 +1,233 @@
# MIT license; Copyright (c) 2023-2024 Angus Gratton
from micropython import const
import time
import usb.device
from usb.device.hid import HIDInterface
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
_KEY_ARRAY_LEN = const(6) # Size of HID key array, must match report descriptor
_KEY_REPORT_LEN = const(_KEY_ARRAY_LEN + 2) # Modifier Byte + Reserved Byte + Array entries
class KeyboardInterface(HIDInterface):
# Synchronous USB keyboard HID interface
def __init__(self):
super().__init__(
_KEYBOARD_REPORT_DESC,
set_report_buf=bytearray(1),
protocol=_INTERFACE_PROTOCOL_KEYBOARD,
interface_str="MicroPython Keyboard",
)
self._key_reports = [
bytearray(_KEY_REPORT_LEN),
bytearray(_KEY_REPORT_LEN),
] # Ping/pong report buffers
self.numlock = False
def on_set_report(self, report_data, _report_id, _report_type):
self.on_led_update(report_data[0])
def on_led_update(self, led_mask):
# Override to handle keyboard LED updates. led_mask is bitwise ORed
# together values as defined in LEDCode.
pass
def send_keys(self, down_keys, timeout_ms=100):
# Update the state of the keyboard by sending a report with down_keys
# set, where down_keys is an iterable (list or similar) of integer
# values such as the values defined in KeyCode.
#
# Will block for up to timeout_ms if a previous report is still
# pending to be sent to the host. Returns True on success.
r, s = self._key_reports # next report buffer to send, spare report buffer
r[0] = 0 # modifier byte
i = 2 # index for next key array item to write to
for k in down_keys:
if k < 0: # Modifier key
r[0] |= -k
elif i < _KEY_REPORT_LEN:
r[i] = k
i += 1
else: # Excess rollover! Can't report
r[0] = 0
for i in range(2, _KEY_REPORT_LEN):
r[i] = 0xFF
break
while i < _KEY_REPORT_LEN:
r[i] = 0
i += 1
if self.send_report(r, timeout_ms):
# Swap buffers if the previous one is newly queued to send, so
# any subsequent call can't modify that buffer mid-send
self._key_reports[0] = s
self._key_reports[1] = r
return True
return False
# HID keyboard report descriptor
#
# From p69 of http://www.usb.org/developers/devclass_docs/HID1_11.pdf
#
# fmt: off
_KEYBOARD_REPORT_DESC = (
b'\x05\x01' # Usage Page (Generic Desktop),
b'\x09\x06' # Usage (Keyboard),
b'\xA1\x01' # Collection (Application),
b'\x05\x07' # Usage Page (Key Codes);
b'\x19\xE0' # Usage Minimum (224),
b'\x29\xE7' # Usage Maximum (231),
b'\x15\x00' # Logical Minimum (0),
b'\x25\x01' # Logical Maximum (1),
b'\x75\x01' # Report Size (1),
b'\x95\x08' # Report Count (8),
b'\x81\x02' # Input (Data, Variable, Absolute), ;Modifier byte
b'\x95\x01' # Report Count (1),
b'\x75\x08' # Report Size (8),
b'\x81\x01' # Input (Constant), ;Reserved byte
b'\x95\x05' # Report Count (5),
b'\x75\x01' # Report Size (1),
b'\x05\x08' # Usage Page (Page# for LEDs),
b'\x19\x01' # Usage Minimum (1),
b'\x29\x05' # Usage Maximum (5),
b'\x91\x02' # Output (Data, Variable, Absolute), ;LED report
b'\x95\x01' # Report Count (1),
b'\x75\x03' # Report Size (3),
b'\x91\x01' # Output (Constant), ;LED report padding
b'\x95\x06' # Report Count (6),
b'\x75\x08' # Report Size (8),
b'\x15\x00' # Logical Minimum (0),
b'\x25\x65' # Logical Maximum(101),
b'\x05\x07' # Usage Page (Key Codes),
b'\x19\x00' # Usage Minimum (0),
b'\x29\x65' # Usage Maximum (101),
b'\x81\x00' # Input (Data, Array), ;Key arrays (6 bytes)
b'\xC0' # End Collection
)
# fmt: on
# Standard HID keycodes, as a pseudo-enum class for easy access
#
# Modifier keys are encoded as negative values
class KeyCode:
A = 4
B = 5
C = 6
D = 7
E = 8
F = 9
G = 10
H = 11
I = 12
J = 13
K = 14
L = 15
M = 16
N = 17
O = 18
P = 19
Q = 20
R = 21
S = 22
T = 23
U = 24
V = 25
W = 26
X = 27
Y = 28
Z = 29
N1 = 30 # Standard number row keys
N2 = 31
N3 = 32
N4 = 33
N5 = 34
N6 = 35
N7 = 36
N8 = 37
N9 = 38
N0 = 39
ENTER = 40
ESCAPE = 41
BACKSPACE = 42
TAB = 43
SPACE = 44
MINUS = 45 # - _
EQUAL = 46 # = +
OPEN_BRACKET = 47 # [ {
CLOSE_BRACKET = 48 # ] }
BACKSLASH = 49 # \ |
HASH = 50 # # ~
COLON = 51 # ; :
QUOTE = 52 # ' "
TILDE = 53 # ` ~
COMMA = 54 # , <
DOT = 55 # . >
SLASH = 56 # / ?
CAPS_LOCK = 57
F1 = 58
F2 = 59
F3 = 60
F4 = 61
F5 = 62
F6 = 63
F7 = 64
F8 = 65
F9 = 66
F10 = 67
F11 = 68
F12 = 69
PRINTSCREEN = 70
SCROLL_LOCK = 71
PAUSE = 72
INSERT = 73
HOME = 74
PAGEUP = 75
DELETE = 76
END = 77
PAGEDOWN = 78
RIGHT = 79 # Arrow keys
LEFT = 80
DOWN = 81
UP = 82
KP_NUM_LOCK = 83
KP_DIVIDE = 84
KP_AT = 85
KP_MULTIPLY = 85
KP_MINUS = 86
KP_PLUS = 87
KP_ENTER = 88
KP_1 = 89
KP_2 = 90
KP_3 = 91
KP_4 = 92
KP_5 = 93
KP_6 = 94
KP_7 = 95
KP_8 = 96
KP_9 = 97
KP_0 = 98
# HID modifier values (negated to allow them to be passed along with the normal keys)
LEFT_CTRL = -0x01
LEFT_SHIFT = -0x02
LEFT_ALT = -0x04
LEFT_UI = -0x08
RIGHT_CTRL = -0x10
RIGHT_SHIFT = -0x20
RIGHT_ALT = -0x40
RIGHT_UI = -0x80
# HID LED values
class LEDCode:
NUM_LOCK = 0x01
CAPS_LOCK = 0x02
SCROLL_LOCK = 0x04
COMPOSE = 0x08
KANA = 0x10

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device")
package("usb")

Wyświetl plik

@ -0,0 +1,306 @@
# MicroPython USB MIDI module
# MIT license; Copyright (c) 2023 Paul Hamshere, 2023-2024 Angus Gratton
from micropython import const, schedule
import struct
from .core import Interface, Buffer
_EP_IN_FLAG = const(1 << 7)
_INTERFACE_CLASS_AUDIO = const(0x01)
_INTERFACE_SUBCLASS_AUDIO_CONTROL = const(0x01)
_INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING = const(0x03)
# Audio subclass extends the standard endpoint descriptor
# with two extra bytes
_STD_DESC_AUDIO_ENDPOINT_LEN = const(9)
_CLASS_DESC_ENDPOINT_LEN = const(5)
_STD_DESC_ENDPOINT_TYPE = const(0x5)
_JACK_TYPE_EMBEDDED = const(0x01)
_JACK_TYPE_EXTERNAL = const(0x02)
_JACK_IN_DESC_LEN = const(6)
_JACK_OUT_DESC_LEN = const(9)
# MIDI Status bytes. For Channel messages these are only the upper 4 bits, ORed with the channel number.
# As per https://www.midi.org/specifications-old/item/table-1-summary-of-midi-message
_MIDI_NOTE_OFF = const(0x80)
_MIDI_NOTE_ON = const(0x90)
_MIDI_POLY_KEYPRESS = const(0xA0)
_MIDI_CONTROL_CHANGE = const(0xB0)
# USB-MIDI CINs (Code Index Numbers), as per USB MIDI Table 4-1
_CIN_SYS_COMMON_2BYTE = const(0x2)
_CIN_SYS_COMMON_3BYTE = const(0x3)
_CIN_SYSEX_START = const(0x4)
_CIN_SYSEX_END_1BYTE = const(0x5)
_CIN_SYSEX_END_2BYTE = const(0x6)
_CIN_SYSEX_END_3BYTE = const(0x7)
_CIN_NOTE_OFF = const(0x8)
_CIN_NOTE_ON = const(0x9)
_CIN_POLY_KEYPRESS = const(0xA)
_CIN_CONTROL_CHANGE = const(0xB)
_CIN_PROGRAM_CHANGE = const(0xC)
_CIN_CHANNEL_PRESSURE = const(0xD)
_CIN_PITCH_BEND = const(0xE)
_CIN_SINGLE_BYTE = const(0xF) # Not currently supported
# Jack IDs for a simple bidrectional MIDI device(!)
_EMB_IN_JACK_ID = const(1)
_EXT_IN_JACK_ID = const(2)
_EMB_OUT_JACK_ID = const(3)
_EXT_OUT_JACK_ID = const(4)
# Data flows, as modelled by USB-MIDI and this hypothetical interface, are as follows:
# Device RX = USB OUT EP => _EMB_IN_JACK => _EMB_OUT_JACK
# Device TX = _EXT_IN_JACK => _EMB_OUT_JACK => USB IN EP
class MIDIInterface(Interface):
# Base class to implement a USB MIDI device in Python.
#
# To be compliant this also regisers a dummy USB Audio interface, but that
# interface isn't otherwise used.
def __init__(self, rxlen=16, txlen=16):
# Arguments are size of transmit and receive buffers in bytes.
super().__init__()
self.ep_out = None # Set during enumeration. RX direction (host to device)
self.ep_in = None # TX direction (device to host)
self._rx = Buffer(rxlen)
self._tx = Buffer(txlen)
# Callbacks for handling received MIDI messages.
#
# Subclasses can choose between overriding on_midi_event
# and handling all MIDI events manually, or overriding the
# functions for note on/off and control change, only.
def on_midi_event(self, cin, midi0, midi1, midi2):
ch = midi0 & 0x0F
if cin == _CIN_NOTE_ON:
self.on_note_on(ch, midi1, midi2)
elif cin == _CIN_NOTE_OFF:
self.on_note_off(ch, midi1, midi2)
elif cin == _CIN_CONTROL_CHANGE:
self.on_control_change(ch, midi1, midi2)
def on_note_on(self, channel, pitch, vel):
pass # Override to handle Note On messages
def on_note_off(self, channel, pitch, vel):
pass # Override to handle Note On messages
def on_control_change(self, channel, controller, value):
pass # Override to handle Control Change messages
# Helper functions for sending common MIDI messages
def note_on(self, channel, pitch, vel=0x40):
self.send_event(_CIN_NOTE_ON, _MIDI_NOTE_ON | channel, pitch, vel)
def note_off(self, channel, pitch, vel=0x40):
self.send_event(_CIN_NOTE_OFF, _MIDI_NOTE_OFF | channel, pitch, vel)
def control_change(self, channel, controller, value):
self.send_event(_CIN_CONTROL_CHANGE, _MIDI_CONTROL_CHANGE | channel, controller, value)
def send_event(self, cin, midi0, midi1=0, midi2=0):
# Queue a MIDI Event Packet to send to the host.
#
# CIN = USB-MIDI Code Index Number, see USB MIDI 1.0 section 4 "USB-MIDI Event Packets"
#
# Remaining arguments are 0-3 MIDI data bytes.
#
# Note this function returns when the MIDI Event Packet has been queued,
# not when it's been received by the host.
#
# Returns False if the TX buffer is full and the MIDI Event could not be queued.
w = self._tx.pend_write()
if len(w) < 4:
return False # TX buffer is full. TODO: block here?
w[0] = cin # leave cable number as 0?
w[1] = midi0
w[2] = midi1
w[3] = midi2
self._tx.finish_write(4)
self._tx_xfer()
return True
def _tx_xfer(self):
# Keep an active IN transfer to send data to the host, whenever
# there is data to send.
if self.is_open() and not self.xfer_pending(self.ep_in) and self._tx.readable():
self.submit_xfer(self.ep_in, self._tx.pend_read(), self._tx_cb)
def _tx_cb(self, ep, res, num_bytes):
if res == 0:
self._tx.finish_read(num_bytes)
self._tx_xfer()
def _rx_xfer(self):
# Keep an active OUT transfer to receive MIDI events from the host
if self.is_open() and not self.xfer_pending(self.ep_out) and self._rx.writable():
self.submit_xfer(self.ep_out, self._rx.pend_write(), self._rx_cb)
def _rx_cb(self, ep, res, num_bytes):
if res == 0:
self._rx.finish_write(num_bytes)
schedule(self._on_rx, None)
self._rx_xfer()
def on_open(self):
super().on_open()
# kick off any transfers that may have queued while the device was not open
self._tx_xfer()
self._rx_xfer()
def _on_rx(self, _):
# Receive MIDI events. Called via micropython.schedule, outside of the USB callback function.
m = self._rx.pend_read()
i = 0
while i <= len(m) - 4:
cin = m[i] & 0x0F
self.on_midi_event(cin, m[i + 1], m[i + 2], m[i + 3])
i += 4
self._rx.finish_read(i)
def desc_cfg(self, desc, itf_num, ep_num, strs):
# Start by registering a USB Audio Control interface, that is required to point to the
# actual MIDI interface
desc.interface(itf_num, 0, _INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_CONTROL)
# Append the class-specific AudioControl interface descriptor
desc.pack(
"<BBBHHBB",
9, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x01, # bDescriptorSubtype MS_HEADER
0x0100, # BcdADC
0x0009, # wTotalLength
0x01, # bInCollection,
itf_num + 1, # baInterfaceNr - points to the MIDI Streaming interface
)
# Next add the MIDI Streaming interface descriptor
desc.interface(
itf_num + 1, 2, _INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING
)
# Append the class-specific interface descriptors
# Midi Streaming interface descriptor
desc.pack(
"<BBBHH",
7, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x01, # bDescriptorSubtype MS_HEADER
0x0100, # BcdADC
# wTotalLength: of all class-specific descriptors
7
+ 2
* (
_JACK_IN_DESC_LEN
+ _JACK_OUT_DESC_LEN
+ _STD_DESC_AUDIO_ENDPOINT_LEN
+ _CLASS_DESC_ENDPOINT_LEN
),
)
# The USB MIDI standard 1.0 allows modelling a baffling range of MIDI
# devices with different permutations of Jack descriptors, with a lot of
# scope for indicating internal connections in the device (as
# "virtualised" by the USB MIDI standard). Much of the options don't
# really change the USB behaviour but provide metadata to the host.
#
# As observed elsewhere online, the standard ends up being pretty
# complex and unclear in parts, but there is a clear simple example in
# an Appendix. So nearly everyone implements the device from the
# Appendix as-is, even when it's not a good fit for their application,
# and ignores the rest of the standard.
#
# For now, this is what this class does as well.
_jack_in_desc(desc, _JACK_TYPE_EMBEDDED, _EMB_IN_JACK_ID)
_jack_in_desc(desc, _JACK_TYPE_EXTERNAL, _EXT_IN_JACK_ID)
_jack_out_desc(desc, _JACK_TYPE_EMBEDDED, _EMB_OUT_JACK_ID, _EXT_IN_JACK_ID, 1)
_jack_out_desc(desc, _JACK_TYPE_EXTERNAL, _EXT_OUT_JACK_ID, _EMB_IN_JACK_ID, 1)
# One MIDI endpoint in each direction, plus the
# associated CS descriptors
self.ep_out = ep_num
self.ep_in = ep_num | _EP_IN_FLAG
# rx side, USB "in" endpoint and embedded MIDI IN Jacks
_audio_endpoint(desc, self.ep_in, _EMB_OUT_JACK_ID)
# tx side, USB "out" endpoint and embedded MIDI OUT jacks
_audio_endpoint(desc, self.ep_out, _EMB_IN_JACK_ID)
def num_itfs(self):
return 2
def num_eps(self):
return 1
def _jack_in_desc(desc, bJackType, bJackID):
# Utility function appends a "JACK IN" descriptor with
# specified bJackType and bJackID
desc.pack(
"<BBBBBB",
_JACK_IN_DESC_LEN, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x02, # bDescriptorSubtype MIDI_IN_JACK
bJackType,
bJackID,
0x00, # iJack, no string descriptor support yet
)
def _jack_out_desc(desc, bJackType, bJackID, bSourceId, bSourcePin):
# Utility function appends a "JACK IN" descriptor with
# specified bJackType and bJackID
desc.pack(
"<BBBBBBBBB",
_JACK_OUT_DESC_LEN, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x03, # bDescriptorSubtype MIDI_OUT_JACK
bJackType,
bJackID,
0x01, # bNrInputPins
bSourceId, # baSourceID(1)
bSourcePin, # baSourcePin(1)
0x00, # iJack, no string descriptor support yet
)
def _audio_endpoint(desc, bEndpointAddress, emb_jack_id):
# Append a standard USB endpoint descriptor and the USB class endpoint descriptor
# for this endpoint.
#
# Audio Class devices extend the standard endpoint descriptor with two extra bytes,
# so we can't easily call desc.endpoint() for the first part.
desc.pack(
# Standard USB endpoint descriptor (plus audio tweaks)
"<BBBBHBBB"
# Class endpoint descriptor
"BBBBB",
_STD_DESC_AUDIO_ENDPOINT_LEN, # wLength
_STD_DESC_ENDPOINT_TYPE, # bDescriptorType
bEndpointAddress,
2, # bmAttributes, bulk
64, # wMaxPacketSize
0, # bInterval
0, # bRefresh (unused)
0, # bSynchInterval (unused)
_CLASS_DESC_ENDPOINT_LEN, # bLength
0x25, # bDescriptorType CS_ENDPOINT
0x01, # bDescriptorSubtype MS_GENERAL
1, # bNumEmbMIDIJack
emb_jack_id, # BaAssocJackID(1)
)

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device-hid")
package("usb")

Wyświetl plik

@ -0,0 +1,100 @@
# MicroPython USB Mouse module
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
from micropython import const
import struct
import machine
from usb.device.hid import HIDInterface
_INTERFACE_PROTOCOL_MOUSE = const(0x02)
class MouseInterface(HIDInterface):
# A basic three button USB mouse HID interface
def __init__(self, interface_str="MicroPython Mouse"):
super().__init__(
_MOUSE_REPORT_DESC,
protocol=_INTERFACE_PROTOCOL_MOUSE,
interface_str=interface_str,
)
self._l = False # Left button
self._m = False # Middle button
self._r = False # Right button
self._buf = bytearray(3)
def send_report(self, dx=0, dy=0):
b = 0
if self._l:
b |= 1 << 0
if self._r:
b |= 1 << 1
if self._m:
b |= 1 << 2
# Wait for any pending report to be sent to the host
# before updating contents of _buf.
#
# This loop can be removed if you don't care about possibly missing a
# transient report, the final report buffer contents will always be the
# last one sent to the host (it just might lose one of the ones in the
# middle).
while self.busy():
machine.idle()
struct.pack_into("Bbb", self._buf, 0, b, dx, dy)
return super().send_report(self._buf)
def click_left(self, down=True):
self._l = down
return self.send_report()
def click_middle(self, down=True):
self._m = down
return self.send_report()
def click_right(self, down=True):
self._r = down
return self.send_report()
def move_by(self, dx, dy):
if not -127 <= dx <= 127:
raise ValueError("dx")
if not -127 <= dy <= 127:
raise ValueError("dy")
return self.send_report(dx, dy)
# Basic 3-button mouse HID Report Descriptor.
# This is based on Appendix E.10 of the HID v1.11 document.
# fmt: off
_MOUSE_REPORT_DESC = (
b'\x05\x01' # Usage Page (Generic Desktop)
b'\x09\x02' # Usage (Mouse)
b'\xA1\x01' # Collection (Application)
b'\x09\x01' # Usage (Pointer)
b'\xA1\x00' # Collection (Physical)
b'\x05\x09' # Usage Page (Buttons)
b'\x19\x01' # Usage Minimum (01),
b'\x29\x03' # Usage Maximun (03),
b'\x15\x00' # Logical Minimum (0),
b'\x25\x01' # Logical Maximum (1),
b'\x95\x03' # Report Count (3),
b'\x75\x01' # Report Size (1),
b'\x81\x02' # Input (Data, Variable, Absolute), ;3 button bits
b'\x95\x01' # Report Count (1),
b'\x75\x05' # Report Size (5),
b'\x81\x01' # Input (Constant), ;5 bit padding
b'\x05\x01' # Usage Page (Generic Desktop),
b'\x09\x30' # Usage (X),
b'\x09\x31' # Usage (Y),
b'\x15\x81' # Logical Minimum (-127),
b'\x25\x7F' # Logical Maximum (127),
b'\x75\x08' # Report Size (8),
b'\x95\x02' # Report Count (2),
b'\x81\x06' # Input (Data, Variable, Relative), ;2 position bytes (X & Y)
b'\xC0' # End Collection
b'\xC0' # End Collection
)
# fmt: on

Wyświetl plik

@ -0,0 +1,2 @@
metadata(version="0.1.0")
package("usb")

Wyświetl plik

@ -0,0 +1,97 @@
# Tests for the Buffer class included in usb.device.core
#
# The easiest way to run this is using unix port. From the parent usb-device
# directory, run as:
#
# $ micropython -m tests.test_core_buffer
#
import micropython
from usb.device import core
if not hasattr(core.machine, "disable_irq"):
# Inject a fake machine module which allows testing on the unix port, and as
# a bonus have tests fail if the buffer allocates inside a critical section.
class FakeMachine:
def disable_irq(self):
return micropython.heap_lock()
def enable_irq(self, was_locked):
if not was_locked:
micropython.heap_unlock()
core.machine = FakeMachine()
b = core.Buffer(16)
# Check buffer is empty
assert b.readable() == 0
assert b.writable() == 16
# Single write then read
w = b.pend_write()
assert len(w) == 16
w[:8] = b"12345678"
b.finish_write(8)
# Empty again
assert b.readable() == 8
assert b.writable() == 8
r = b.pend_read()
assert len(r) == 8
assert r == b"12345678"
b.finish_read(8)
# Empty buffer again
assert b.readable() == 0
assert b.writable() == 16
# Single write then split reads
b.write(b"abcdefghijklmnop")
assert b.writable() == 0 # full buffer
r = b.pend_read()
assert r == b"abcdefghijklmnop"
b.finish_read(2)
r = b.pend_read()
assert r == b"cdefghijklmnop"
b.finish_read(3)
# write to end of buffer
b.write(b"AB")
r = b.pend_read()
assert r == b"fghijklmnopAB"
# write while a read is pending
b.write(b"XY")
# previous pend_read() memoryview should be the same
assert r == b"fghijklmnopAB"
b.finish_read(4)
r = b.pend_read()
assert r == b"jklmnopABXY" # four bytes consumed from head, one new byte at tail
# read while a write is pending
w = b.pend_write()
assert len(w) == 5
r = b.pend_read()
assert len(r) == 11
b.finish_read(3)
w[:2] = b"12"
b.finish_write(2)
# Expected final state of buffer
tmp = bytearray(b.readable())
assert b.readinto(tmp) == len(tmp)
assert tmp == b"mnopABXY12"
# Now buffer is empty again
assert b.readable() == 0
assert b.readinto(tmp) == 0
assert b.writable() == 16
print("All Buffer tests passed")

Wyświetl plik

@ -0,0 +1,2 @@
from . import core
from .core import get # Singleton _Device getter

Wyświetl plik

@ -0,0 +1,851 @@
# MicroPython Library runtime USB device implementation
#
# These contain the classes and utilities that are needed to
# implement a USB device, not any complete USB drivers.
#
# MIT license; Copyright (c) 2022-2024 Angus Gratton
from micropython import const
import machine
import struct
_EP_IN_FLAG = const(1 << 7)
# USB descriptor types
_STD_DESC_DEV_TYPE = const(0x1)
_STD_DESC_CONFIG_TYPE = const(0x2)
_STD_DESC_STRING_TYPE = const(0x3)
_STD_DESC_INTERFACE_TYPE = const(0x4)
_STD_DESC_ENDPOINT_TYPE = const(0x5)
_STD_DESC_INTERFACE_ASSOC = const(0xB)
_ITF_ASSOCIATION_DESC_TYPE = const(0xB) # Interface Association descriptor
# Standard USB descriptor lengths
_STD_DESC_CONFIG_LEN = const(9)
_STD_DESC_ENDPOINT_LEN = const(7)
_STD_DESC_INTERFACE_LEN = const(9)
_DESC_OFFSET_LEN = const(0)
_DESC_OFFSET_TYPE = const(1)
_DESC_OFFSET_INTERFACE_NUM = const(2) # for _STD_DESC_INTERFACE_TYPE
_DESC_OFFSET_ENDPOINT_NUM = const(2) # for _STD_DESC_ENDPOINT_TYPE
# Standard control request bmRequest fields, can extract by calling split_bmRequestType()
_REQ_RECIPIENT_DEVICE = const(0x0)
_REQ_RECIPIENT_INTERFACE = const(0x1)
_REQ_RECIPIENT_ENDPOINT = const(0x2)
_REQ_RECIPIENT_OTHER = const(0x3)
# Offsets into the standard configuration descriptor, to fixup
_OFFS_CONFIG_iConfiguration = const(6)
_INTERFACE_CLASS_VENDOR = const(0xFF)
_INTERFACE_SUBCLASS_NONE = const(0x00)
_PROTOCOL_NONE = const(0x00)
# These need to match the constants in tusb_config.h
_USB_STR_MANUF = const(0x01)
_USB_STR_PRODUCT = const(0x02)
_USB_STR_SERIAL = const(0x03)
# Error constant to match mperrno.h
_MP_EINVAL = const(22)
_dev = None # Singleton _Device instance
def get():
# Getter to access the singleton instance of the
# MicroPython _Device object
#
# (note this isn't the low-level machine.USBDevice object, the low-level object is
# get()._usbd.)
global _dev
if not _dev:
_dev = _Device()
return _dev
class _Device:
# Class that implements the Python parts of the MicroPython USBDevice.
#
# This class should only be instantiated by the singleton getter
# function usb.device.get(), never directly.
def __init__(self):
self._itfs = {} # Mapping from interface number to interface object, set by init()
self._eps = {} # Mapping from endpoint address to interface object, set by _open_cb()
self._ep_cbs = {} # Mapping from endpoint address to Optional[xfer callback]
self._usbd = machine.USBDevice() # low-level API
def init(self, *itfs, **kwargs):
# Helper function to configure the USB device and activate it in a single call
self.active(False)
self.config(*itfs, **kwargs)
self.active(True)
def config( # noqa: PLR0913
self,
*itfs,
builtin_driver=False,
manufacturer_str=None,
product_str=None,
serial_str=None,
configuration_str=None,
id_vendor=None,
id_product=None,
bcd_device=None,
device_class=0,
device_subclass=0,
device_protocol=0,
config_str=None,
max_power_ma=None,
):
# Configure the USB device with a set of interfaces, and optionally reconfiguring the
# device and configuration descriptor fields
_usbd = self._usbd
if self.active():
raise OSError(_MP_EINVAL) # Must set active(False) first
# Convenience: Allow builtin_driver to be True, False or one of
# the machine.USBDevice.BUILTIN_ constants
if isinstance(builtin_driver, bool):
builtin_driver = _usbd.BUILTIN_DEFAULT if builtin_driver else _usbd.BUILTIN_NONE
_usbd.builtin_driver = builtin_driver
# Putting None for any strings that should fall back to the "built-in" value
# Indexes in this list depends on _USB_STR_MANUF, _USB_STR_PRODUCT, _USB_STR_SERIAL
strs = [None, manufacturer_str, product_str, serial_str]
# Build the device descriptor
FMT = "<BBHBBBBHHHBBBB"
# read the static descriptor fields
f = struct.unpack(FMT, builtin_driver.desc_dev)
def maybe_set(value, idx):
# Override a numeric descriptor value or keep builtin value f[idx] if 'value' is None
if value is not None:
return value
return f[idx]
# Either copy each descriptor field directly from the builtin device descriptor, or 'maybe'
# set it to the custom value from the object
desc_dev = struct.pack(
FMT,
f[0], # bLength
f[1], # bDescriptorType
f[2], # bcdUSB
device_class, # bDeviceClass
device_subclass, # bDeviceSubClass
device_protocol, # bDeviceProtocol
f[6], # bMaxPacketSize0, TODO: allow overriding this value?
maybe_set(id_vendor, 7), # idVendor
maybe_set(id_product, 8), # idProduct
maybe_set(bcd_device, 9), # bcdDevice
_USB_STR_MANUF, # iManufacturer
_USB_STR_PRODUCT, # iProduct
_USB_STR_SERIAL, # iSerialNumber
1,
) # bNumConfigurations
# Iterate interfaces to build the configuration descriptor
# Keep track of the interface and endpoint indexes
itf_num = builtin_driver.itf_max
ep_num = max(builtin_driver.ep_max, 1) # Endpoint 0 always reserved for control
while len(strs) < builtin_driver.str_max:
strs.append(None) # Reserve other string indexes used by builtin drivers
initial_cfg = builtin_driver.desc_cfg or (b"\x00" * _STD_DESC_CONFIG_LEN)
self._itfs = {}
# Determine the total length of the configuration descriptor, by making dummy
# calls to build the config descriptor
desc = Descriptor(None)
desc.extend(initial_cfg)
for itf in itfs:
itf.desc_cfg(desc, 0, 0, [])
# Allocate the real Descriptor helper to write into it, starting
# after the standard configuration descriptor
desc = Descriptor(bytearray(desc.o))
desc.extend(initial_cfg)
for itf in itfs:
itf.desc_cfg(desc, itf_num, ep_num, strs)
for _ in range(itf.num_itfs()):
self._itfs[itf_num] = itf # Mapping from interface numbers to interfaces
itf_num += 1
ep_num += itf.num_eps()
# Go back and update the Standard Configuration Descriptor
# header at the start with values based on the complete
# descriptor.
#
# See USB 2.0 specification section 9.6.3 p264 for details.
bmAttributes = (
(1 << 7) # Reserved
| (0 if max_power_ma else (1 << 6)) # Self-Powered
# Remote Wakeup not currently supported
)
# Configuration string is optional but supported
iConfiguration = 0
if configuration_str:
iConfiguration = len(strs)
strs.append(configuration_str)
if max_power_ma is not None:
# Convert from mA to the units used in the descriptor
max_power_ma //= 2
else:
try:
# Default to whatever value the builtin driver reports
max_power_ma = _usbd.BUILTIN_DEFAULT.desc_cfg[8]
except IndexError:
# If no built-in driver, default to 250mA
max_power_ma = 125
desc.pack_into(
"<BBHBBBBB",
0,
_STD_DESC_CONFIG_LEN, # bLength
_STD_DESC_CONFIG_TYPE, # bDescriptorType
len(desc.b), # wTotalLength
itf_num,
1, # bConfigurationValue
iConfiguration,
bmAttributes,
max_power_ma,
)
_usbd.config(
desc_dev,
desc.b,
strs,
self._open_itf_cb,
self._reset_cb,
self._control_xfer_cb,
self._xfer_cb,
)
def active(self, *optional_value):
# Thin wrapper around the USBDevice active() function.
#
# Note: active only means the USB device is available, not that it has
# actually been connected to and configured by a USB host. Use the
# Interface.is_open() function to check if the host has configured an
# interface of the device.
return self._usbd.active(*optional_value)
def _open_itf_cb(self, desc):
# Singleton callback from TinyUSB custom class driver, when USB host does
# Set Configuration. Called once per interface or IAD.
# Note that even if the configuration descriptor contains an IAD, 'desc'
# starts from the first interface descriptor in the IAD and not the IAD
# descriptor.
itf_num = desc[_DESC_OFFSET_INTERFACE_NUM]
itf = self._itfs[itf_num]
# Scan the full descriptor:
# - Build _eps and _ep_addr from the endpoint descriptors
# - Find the highest numbered interface provided to the callback
# (which will be the first interface, unless we're scanning
# multiple interfaces inside an IAD.)
offs = 0
max_itf = itf_num
while offs < len(desc):
dl = desc[offs + _DESC_OFFSET_LEN]
dt = desc[offs + _DESC_OFFSET_TYPE]
if dt == _STD_DESC_ENDPOINT_TYPE:
ep_addr = desc[offs + _DESC_OFFSET_ENDPOINT_NUM]
self._eps[ep_addr] = itf
self._ep_cbs[ep_addr] = None
elif dt == _STD_DESC_INTERFACE_TYPE:
max_itf = max(max_itf, desc[offs + _DESC_OFFSET_INTERFACE_NUM])
offs += dl
# If 'desc' is not the inside of an Interface Association Descriptor but
# 'itf' object still represents multiple USB interfaces (i.e. MIDI),
# defer calling 'itf.on_open()' until this callback fires for the
# highest numbered USB interface.
#
# This means on_open() is only called once, and that it can
# safely submit transfers on any of the USB interfaces' endpoints.
if self._itfs.get(max_itf + 1, None) != itf:
itf.on_open()
def _reset_cb(self):
# Callback when the USB device is reset by the host
# Allow interfaces to respond to the reset
for itf in self._itfs.values():
itf.on_reset()
# Rebuilt when host re-enumerates
self._eps = {}
self._ep_cbs = {}
def _submit_xfer(self, ep_addr, data, done_cb=None):
# Singleton function to submit a USB transfer (of any type except control).
#
# Generally, drivers should call Interface.submit_xfer() instead. See
# that function for documentation about the possible parameter values.
if ep_addr not in self._eps:
raise ValueError("ep_addr")
if self._ep_cbs[ep_addr]:
raise RuntimeError("xfer_pending")
# USBDevice callback may be called immediately, before Python execution
# continues, so set it first.
#
# To allow xfer_pending checks to work, store True instead of None.
self._ep_cbs[ep_addr] = done_cb or True
return self._usbd.submit_xfer(ep_addr, data)
def _xfer_cb(self, ep_addr, result, xferred_bytes):
# Singleton callback from TinyUSB custom class driver when a transfer completes.
cb = self._ep_cbs.get(ep_addr, None)
self._ep_cbs[ep_addr] = None
if callable(cb):
cb(ep_addr, result, xferred_bytes)
def _control_xfer_cb(self, stage, request):
# Singleton callback from TinyUSB custom class driver when a control
# transfer is in progress.
#
# stage determines appropriate responses (possible values
# utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK).
#
# The TinyUSB class driver framework only calls this function for
# particular types of control transfer, other standard control transfers
# are handled by TinyUSB itself.
wIndex = request[4] + (request[5] << 8)
recipient, _, _ = split_bmRequestType(request[0])
itf = None
result = None
if recipient == _REQ_RECIPIENT_DEVICE:
itf = self._itfs.get(wIndex & 0xFFFF, None)
if itf:
result = itf.on_device_control_xfer(stage, request)
elif recipient == _REQ_RECIPIENT_INTERFACE:
itf = self._itfs.get(wIndex & 0xFFFF, None)
if itf:
result = itf.on_interface_control_xfer(stage, request)
elif recipient == _REQ_RECIPIENT_ENDPOINT:
ep_num = wIndex & 0xFFFF
itf = self._eps.get(ep_num, None)
if itf:
result = itf.on_endpoint_control_xfer(stage, request)
if not itf:
# At time this code was written, only the control transfers shown
# above are passed to the class driver callback. See
# invoke_class_control() in tinyusb usbd.c
raise RuntimeError(f"Unexpected control request type {request[0]:#x}")
# Expecting any of the following possible replies from
# on_NNN_control_xfer():
#
# True - Continue transfer, no data
# False - STALL transfer
# Object with buffer interface - submit this data for the control transfer
return result
class Interface:
# Abstract base class to implement USB Interface (and associated endpoints),
# or a collection of USB Interfaces, in Python
#
# (Despite the name an object of type Interface can represent multiple
# associated interfaces, with or without an Interface Association Descriptor
# prepended to them. Override num_itfs() if assigning >1 USB interface.)
def __init__(self):
self._open = False
def desc_cfg(self, desc, itf_num, ep_num, strs):
# Function to build configuration descriptor contents for this interface
# or group of interfaces. This is called on each interface from
# USBDevice.init().
#
# This function should insert:
#
# - At least one standard Interface descriptor (can call
# - desc.interface()).
#
# Plus, optionally:
#
# - One or more endpoint descriptors (can call desc.endpoint()).
# - An Interface Association Descriptor, prepended before.
# - Other class-specific configuration descriptor data.
#
# This function is called twice per call to USBDevice.init(). The first
# time the values of all arguments are dummies that are used only to
# calculate the total length of the descriptor. Therefore, anything this
# function does should be idempotent and it should add the same
# descriptors each time. If saving interface numbers or endpoint numbers
# for later
#
# Parameters:
#
# - desc - Descriptor helper to write the configuration descriptor bytes into.
# The first time this function is called 'desc' is a dummy object
# with no backing buffer (exists to count the number of bytes needed).
#
# - itf_num - First bNumInterfaces value to assign. The descriptor
# should contain the same number of interfaces returned by num_itfs(),
# starting from this value.
#
# - ep_num - Address of the first available endpoint number to use for
# endpoint descriptor addresses. Subclasses should save the
# endpoint addresses selected, to look up later (although note the first
# time this function is called, the values will be dummies.)
#
# - strs - list of string descriptors for this USB device. This function
# can append to this list, and then insert the index of the new string
# in the list into the configuration descriptor.
raise NotImplementedError
def num_itfs(self):
# Return the number of actual USB Interfaces represented by this object
# (as set in desc_cfg().)
#
# Only needs to be overriden if implementing a Interface class that
# represents more than one USB Interface descriptor (i.e. MIDI), or an
# Interface Association Descriptor (i.e. USB-CDC).
return 1
def num_eps(self):
# Return the number of USB Endpoint numbers represented by this object
# (as set in desc_cfg().)
#
# Note for each count returned by this function, the interface may
# choose to have both an IN and OUT endpoint (i.e. IN flag is not
# considered a value here.)
#
# This value can be zero, if the USB Host only communicates with this
# interface using control transfers.
return 0
def on_open(self):
# Callback called when the USB host accepts the device configuration.
#
# Override this function to initiate any operations that the USB interface
# should do when the USB device is configured to the host.
self._open = True
def on_reset(self):
# Callback called on every registered interface when the USB device is
# reset by the host. This can happen when the USB device is unplugged,
# or if the host triggers a reset for some other reason.
#
# Override this function to cancel any pending operations specific to
# the interface (outstanding USB transfers are already cancelled).
#
# At this point, no USB functionality is available - on_open() will
# be called later if/when the USB host re-enumerates and configures the
# interface.
self._open = False
def is_open(self):
# Returns True if the interface has been configured by the host and is in
# active use.
return self._open
def on_device_control_xfer(self, stage, request):
# Control transfer callback. Override to handle a non-standard device
# control transfer where bmRequestType Recipient is Device, Type is
# utils.REQ_TYPE_CLASS, and the lower byte of wIndex indicates this interface.
#
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
#
# This particular request type seems pretty uncommon for a device class
# driver to need to handle, most hosts will not send this so most
# implementations won't need to override it.
#
# Parameters:
#
# - stage is one of utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK.
#
# - request is a memoryview into a USB request packet, as per USB 2.0
# specification 9.3 USB Device Requests, p250. the memoryview is only
# valid while the callback is running.
#
# The function can call split_bmRequestType(request[0]) to split
# bmRequestType into (Recipient, Type, Direction).
#
# Result, any of:
#
# - True to continue the request, False to STALL the endpoint.
# - Buffer interface object to provide a buffer to the host as part of the
# transfer, if applicable.
return False
def on_interface_control_xfer(self, stage, request):
# Control transfer callback. Override to handle a device control
# transfer where bmRequestType Recipient is Interface, and the lower byte
# of wIndex indicates this interface.
#
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
#
# bmRequestType Type field may have different values. It's not necessary
# to handle the mandatory Standard requests (bmRequestType Type ==
# utils.REQ_TYPE_STANDARD), if the driver returns False in these cases then
# TinyUSB will provide the necessary responses.
#
# See on_device_control_xfer() for a description of the arguments and
# possible return values.
return False
def on_endpoint_control_xfer(self, stage, request):
# Control transfer callback. Override to handle a device
# control transfer where bmRequestType Recipient is Endpoint and
# the lower byte of wIndex indicates an endpoint address associated
# with this interface.
#
# bmRequestType Type will generally have any value except
# utils.REQ_TYPE_STANDARD, as Standard endpoint requests are handled by
# TinyUSB. The exception is the the Standard "Set Feature" request. This
# is handled by Tiny USB but also passed through to the driver in case it
# needs to change any internal state, but most drivers can ignore and
# return False in this case.
#
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
#
# See on_device_control_xfer() for a description of the parameters and
# possible return values.
return False
def xfer_pending(self, ep_addr):
# Return True if a transfer is already pending on ep_addr.
#
# Only one transfer can be submitted at a time.
return _dev and bool(_dev._ep_cbs[ep_addr])
def submit_xfer(self, ep_addr, data, done_cb=None):
# Submit a USB transfer (of any type except control)
#
# Parameters:
#
# - ep_addr. Address of the endpoint to submit the transfer on. Caller is
# responsible for ensuring that ep_addr is correct and belongs to this
# interface. Only one transfer can be active at a time on each endpoint.
#
# - data. Buffer containing data to send, or for data to be read into
# (depending on endpoint direction).
#
# - done_cb. Optional callback function for when the transfer
# completes. The callback is called with arguments (ep_addr, result,
# xferred_bytes) where result is one of xfer_result_t enum (see top of
# this file), and xferred_bytes is an integer.
#
# If the function returns, the transfer is queued.
#
# The function will raise RuntimeError under the following conditions:
#
# - The interface is not "open" (i.e. has not been enumerated and configured
# by the host yet.)
#
# - A transfer is already pending on this endpoint (use xfer_pending() to check
# before sending if needed.)
#
# - A DCD error occurred when queueing the transfer on the hardware.
#
#
# Will raise TypeError if 'data' isn't he correct type of buffer for the
# endpoint transfer direction.
#
# Note that done_cb may be called immediately, possibly before this
# function has returned to the caller.
if not self._open:
raise RuntimeError("Not open")
_dev._submit_xfer(ep_addr, data, done_cb)
def stall(self, ep_addr, *args):
# Set or get the endpoint STALL state.
#
# To get endpoint stall stage, call with a single argument.
# To set endpoint stall state, call with an additional boolean
# argument to set or clear.
#
# Generally endpoint STALL is handled automatically, but there are some
# device classes that need to explicitly stall or unstall an endpoint
# under certain conditions.
if not self._open or ep_addr not in self._eps:
raise RuntimeError
_dev._usbd.stall(ep_addr, *args)
class Descriptor:
# Wrapper class for writing a descriptor in-place into a provided buffer
#
# Doesn't resize the buffer.
#
# Can be initialised with b=None to perform a dummy pass that calculates the
# length needed for the buffer.
def __init__(self, b):
self.b = b
self.o = 0 # offset of data written to the buffer
def pack(self, fmt, *args):
# Utility function to pack new data into the descriptor
# buffer, starting at the current offset.
#
# Arguments are the same as struct.pack(), but it fills the
# pre-allocated descriptor buffer (growing if needed), instead of
# returning anything.
self.pack_into(fmt, self.o, *args)
def pack_into(self, fmt, offs, *args):
# Utility function to pack new data into the descriptor at offset 'offs'.
#
# If the data written is before 'offs' then self.o isn't incremented,
# otherwise it's incremented to point at the end of the written data.
end = offs + struct.calcsize(fmt)
if self.b:
struct.pack_into(fmt, self.b, offs, *args)
self.o = max(self.o, end)
def extend(self, a):
# Extend the descriptor with some bytes-like data
if self.b:
self.b[self.o : self.o + len(a)] = a
self.o += len(a)
# TODO: At the moment many of these arguments are named the same as the relevant field
# in the spec, as this is easier to understand. Can save some code size by collapsing them
# down.
def interface(
self,
bInterfaceNumber,
bNumEndpoints,
bInterfaceClass=_INTERFACE_CLASS_VENDOR,
bInterfaceSubClass=_INTERFACE_SUBCLASS_NONE,
bInterfaceProtocol=_PROTOCOL_NONE,
iInterface=0,
):
# Utility function to append a standard Interface descriptor, with
# the properties specified in the parameter list.
#
# Defaults for bInterfaceClass, SubClass and Protocol are a "vendor"
# device.
#
# Note that iInterface is a string index number. If set, it should be set
# by the caller Interface to the result of self._get_str_index(s),
# where 's' is a string found in self.strs.
self.pack(
"BBBBBBBBB",
_STD_DESC_INTERFACE_LEN, # bLength
_STD_DESC_INTERFACE_TYPE, # bDescriptorType
bInterfaceNumber,
0, # bAlternateSetting, not currently supported
bNumEndpoints,
bInterfaceClass,
bInterfaceSubClass,
bInterfaceProtocol,
iInterface,
)
def endpoint(self, bEndpointAddress, bmAttributes, wMaxPacketSize, bInterval=1):
# Utility function to append a standard Endpoint descriptor, with
# the properties specified in the parameter list.
#
# See USB 2.0 specification section 9.6.6 Endpoint p269
#
# As well as a numeric value, bmAttributes can be a string value to represent
# common endpoint types: "control", "bulk", "interrupt".
if bmAttributes == "control":
bmAttributes = 0
elif bmAttributes == "bulk":
bmAttributes = 2
elif bmAttributes == "interrupt":
bmAttributes = 3
self.pack(
"<BBBBHB",
_STD_DESC_ENDPOINT_LEN,
_STD_DESC_ENDPOINT_TYPE,
bEndpointAddress,
bmAttributes,
wMaxPacketSize,
bInterval,
)
def interface_assoc(
self,
bFirstInterface,
bInterfaceCount,
bFunctionClass,
bFunctionSubClass,
bFunctionProtocol=_PROTOCOL_NONE,
iFunction=0,
):
# Utility function to append an Interface Association descriptor,
# with the properties specified in the parameter list.
#
# See USB ECN: Interface Association Descriptor.
self.pack(
"<BBBBBBBB",
8,
_ITF_ASSOCIATION_DESC_TYPE,
bFirstInterface,
bInterfaceCount,
bFunctionClass,
bFunctionSubClass,
bFunctionProtocol,
iFunction,
)
def split_bmRequestType(bmRequestType):
# Utility function to split control transfer field bmRequestType into a tuple of 3 fields:
#
# Recipient
# Type
# Data transfer direction
#
# See USB 2.0 specification section 9.3 USB Device Requests and 9.3.1 bmRequestType, p248.
return (
bmRequestType & 0x1F,
(bmRequestType >> 5) & 0x03,
(bmRequestType >> 7) & 0x01,
)
class Buffer:
# An interrupt-safe producer/consumer buffer that wraps a bytearray object.
#
# Kind of like a ring buffer, but supports the idea of returning a
# memoryview for either read or write of multiple bytes (suitable for
# passing to a buffer function without needing to allocate another buffer to
# read into.)
#
# Consumer can call pend_read() to get a memoryview to read from, and then
# finish_read(n) when done to indicate it read 'n' bytes from the
# memoryview. There is also a readinto() convenience function.
#
# Producer must call pend_write() to get a memorybuffer to write into, and
# then finish_write(n) when done to indicate it wrote 'n' bytes into the
# memoryview. There is also a normal write() convenience function.
#
# - Only one producer and one consumer is supported.
#
# - Calling pend_read() and pend_write() is effectively idempotent, they can be
# called more than once without a corresponding finish_x() call if necessary
# (provided only one thread does this, as per the previous point.)
#
# - Calling finish_write() and finish_read() is hard interrupt safe (does
# not allocate). pend_read() and pend_write() each allocate 1 block for
# the memoryview that is returned.
#
# The buffer contents are always laid out as:
#
# - Slice [:_n] = bytes of valid data waiting to read
# - Slice [_n:_w] = unused space
# - Slice [_w:] = bytes of pending write buffer waiting to be written
#
# This buffer should be fast when most reads and writes are balanced and use
# the whole buffer. When this doesn't happen, performance degrades to
# approximate a Python-based single byte ringbuffer.
#
def __init__(self, length):
self._b = memoryview(bytearray(length))
# number of bytes in buffer read to read, starting at index 0. Updated
# by both producer & consumer.
self._n = 0
# start index of a pending write into the buffer, if any. equals
# len(self._b) if no write is pending. Updated by producer only.
self._w = length
def writable(self):
# Number of writable bytes in the buffer. Assumes no pending write is outstanding.
return len(self._b) - self._n
def readable(self):
# Number of readable bytes in the buffer. Assumes no pending read is outstanding.
return self._n
def pend_write(self, wmax=None):
# Returns a memoryview that the producer can write bytes into.
# start the write at self._n, the end of data waiting to read
#
# If wmax is set then the memoryview is pre-sliced to be at most
# this many bytes long.
#
# (No critical section needed as self._w is only updated by the producer.)
self._w = self._n
end = (self._w + wmax) if wmax else len(self._b)
return self._b[self._w : end]
def finish_write(self, nbytes):
# Called by the producer to indicate it wrote nbytes into the buffer.
ist = machine.disable_irq()
try:
assert nbytes <= len(self._b) - self._w # can't say we wrote more than was pended
if self._n == self._w:
# no data was read while the write was happening, so the buffer is already in place
# (this is the fast path)
self._n += nbytes
else:
# Slow path: data was read while the write was happening, so
# shuffle the newly written bytes back towards index 0 to avoid fragmentation
#
# As this updates self._n we have to do it in the critical
# section, so do it byte by byte to avoid allocating.
while nbytes > 0:
self._b[self._n] = self._b[self._w]
self._n += 1
self._w += 1
nbytes -= 1
self._w = len(self._b)
finally:
machine.enable_irq(ist)
def write(self, w):
# Helper method for the producer to write into the buffer in one call
pw = self.pend_write()
to_w = min(len(w), len(pw))
if to_w:
pw[:to_w] = w[:to_w]
self.finish_write(to_w)
return to_w
def pend_read(self):
# Return a memoryview slice that the consumer can read bytes from
return self._b[: self._n]
def finish_read(self, nbytes):
# Called by the consumer to indicate it read nbytes from the buffer.
if not nbytes:
return
ist = machine.disable_irq()
try:
assert nbytes <= self._n # can't say we read more than was available
i = 0
self._n -= nbytes
while i < self._n:
# consumer only read part of the buffer, so shuffle remaining
# read data back towards index 0 to avoid fragmentation
self._b[i] = self._b[i + nbytes]
i += 1
finally:
machine.enable_irq(ist)
def readinto(self, b):
# Helper method for the consumer to read out of the buffer in one call
pr = self.pend_read()
to_r = min(len(pr), len(b))
if to_r:
b[:to_r] = pr[:to_r]
self.finish_read(to_r)
return to_r

Wyświetl plik

@ -0,0 +1,900 @@
"""Debugger basics"""
# This is originally from cpython 3.10: https://raw.githubusercontent.com/python/cpython/3.10/Lib/bdb.py
# Patches for micropython have been commented as such.
import fnmatch
import sys
import os
## MPY: no inspect module avaialble
# from inspect import CO_GENERATOR, CO_COROUTINE, CO_ASYNC_GENERATOR
__all__ = ["BdbQuit", "Bdb", "Breakpoint"]
## MPY: These flags currently don't exist
# GENERATOR_AND_COROUTINE_FLAGS = CO_GENERATOR | CO_COROUTINE | CO_ASYNC_GENERATOR
class BdbQuit(Exception):
"""Exception to give up completely."""
class Bdb:
"""Generic Python debugger base class.
This class takes care of details of the trace facility;
a derived class should implement user interaction.
The standard debugger class (pdb.Pdb) is an example.
The optional skip argument must be an iterable of glob-style
module name patterns. The debugger will not step into frames
that originate in a module that matches one of these patterns.
Whether a frame is considered to originate in a certain module
is determined by the __name__ in the frame globals.
"""
def __init__(self, skip=None):
self.skip = set(skip) if skip else None
self.breaks = {}
self.fncache = {}
self.frame_returning = None
self._load_breaks()
def canonic(self, filename):
"""Return canonical form of filename.
For real filenames, the canonical form is a case-normalized (on
case insensitive filesystems) absolute path. 'Filenames' with
angle brackets, such as "<stdin>", generated in interactive
mode, are returned unchanged.
"""
if filename == "<" + filename[1:-1] + ">":
return filename
canonic = self.fncache.get(filename)
if not canonic:
canonic = os.path.abspath(filename)
canonic = os.path.normcase(canonic)
self.fncache[filename] = canonic
return canonic
def reset(self):
"""Set values of attributes as ready to start debugging."""
import linecache
linecache.checkcache()
self.botframe = None
self._set_stopinfo(None, None)
def trace_dispatch(self, frame, event, arg):
"""Dispatch a trace function for debugged frames based on the event.
This function is installed as the trace function for debugged
frames. Its return value is the new trace function, which is
usually itself. The default implementation decides how to
dispatch a frame, depending on the type of event (passed in as a
string) that is about to be executed.
The event can be one of the following:
line: A new line of code is going to be executed.
call: A function is about to be called or another code block
is entered.
return: A function or other code block is about to return.
exception: An exception has occurred.
c_call: A C function is about to be called.
c_return: A C function has returned.
c_exception: A C function has raised an exception.
For the Python events, specialized functions (see the dispatch_*()
methods) are called. For the C events, no action is taken.
The arg parameter depends on the previous event.
"""
if self.quitting:
return # None
if event == 'line':
return self.dispatch_line(frame)
if event == 'call':
return self.dispatch_call(frame, arg)
if event == 'return':
return self.dispatch_return(frame, arg)
if event == 'exception':
return self.dispatch_exception(frame, arg)
if event == 'c_call':
return self.trace_dispatch
if event == 'c_exception':
return self.trace_dispatch
if event == 'c_return':
return self.trace_dispatch
print('bdb.Bdb.dispatch: unknown debugging event:', repr(event))
return self.trace_dispatch
def dispatch_line(self, frame):
"""Invoke user function and return trace function for line event.
If the debugger stops on the current line, invoke
self.user_line(). Raise BdbQuit if self.quitting is set.
Return self.trace_dispatch to continue tracing in this scope.
"""
if self.stop_here(frame) or self.break_here(frame):
self.user_line(frame)
if self.quitting: raise BdbQuit
return self.trace_dispatch
def is_coroutine(self, frame):
## MPY: co_flags attrib not available, compatible method of detecting coroutine TBD
# return frame.f_code.co_flags & GENERATOR_AND_COROUTINE_FLAGS
return False
def dispatch_call(self, frame, arg):
"""Invoke user function and return trace function for call event.
If the debugger stops on this function call, invoke
self.user_call(). Raise BdbQuit if self.quitting is set.
Return self.trace_dispatch to continue tracing in this scope.
"""
# XXX 'arg' is no longer used
if self.botframe is None:
# First call of dispatch since reset()
self.botframe = frame.f_back # (CT) Note that this may also be None!
return self.trace_dispatch
if not (self.stop_here(frame) or self.break_anywhere(frame)):
# No need to trace this function
return # None
# Ignore call events in generator except when stepping.
if self.stopframe and self.is_coroutine(frame):
return self.trace_dispatch
self.user_call(frame, arg)
if self.quitting: raise BdbQuit
return self.trace_dispatch
def dispatch_return(self, frame, arg):
"""Invoke user function and return trace function for return event.
If the debugger stops on this function return, invoke
self.user_return(). Raise BdbQuit if self.quitting is set.
Return self.trace_dispatch to continue tracing in this scope.
"""
if self.stop_here(frame) or frame == self.returnframe:
# Ignore return events in generator except when stepping.
if self.stopframe and self.is_coroutine(frame):
return self.trace_dispatch
try:
self.frame_returning = frame
self.user_return(frame, arg)
finally:
self.frame_returning = None
if self.quitting: raise BdbQuit
# The user issued a 'next' or 'until' command.
if self.stopframe is frame and self.stoplineno != -1:
self._set_stopinfo(None, None)
return self.trace_dispatch
def dispatch_exception(self, frame, arg):
"""Invoke user function and return trace function for exception event.
If the debugger stops on this exception, invoke
self.user_exception(). Raise BdbQuit if self.quitting is set.
Return self.trace_dispatch to continue tracing in this scope.
"""
if self.stop_here(frame):
# When stepping with next/until/return in a generator frame, skip
# the internal StopIteration exception (with no traceback)
# triggered by a subiterator run with the 'yield from' statement.
if not (self.is_coroutine(frame)
and arg[0] is StopIteration and arg[2] is None):
self.user_exception(frame, arg)
if self.quitting: raise BdbQuit
# Stop at the StopIteration or GeneratorExit exception when the user
# has set stopframe in a generator by issuing a return command, or a
# next/until command at the last statement in the generator before the
# exception.
elif (self.stopframe and frame is not self.stopframe
and self.is_coroutine(self.stopframe)
and arg[0] in (StopIteration, GeneratorExit)):
self.user_exception(frame, arg)
if self.quitting: raise BdbQuit
return self.trace_dispatch
# Normally derived classes don't override the following
# methods, but they may if they want to redefine the
# definition of stopping and breakpoints.
def is_skipped_module(self, module_name):
"Return True if module_name matches any skip pattern."
if module_name is None: # some modules do not have names
return False
for pattern in self.skip:
if fnmatch.fnmatch(module_name, pattern):
return True
return False
def stop_here(self, frame):
"Return True if frame is below the starting frame in the stack."
# (CT) stopframe may now also be None, see dispatch_call.
# (CT) the former test for None is therefore removed from here.
if self.skip and \
self.is_skipped_module(frame.f_globals.get('__name__')):
return False
if frame is self.stopframe:
if self.stoplineno == -1:
return False
return frame.f_lineno >= self.stoplineno
if not self.stopframe:
return True
return False
def break_here(self, frame):
"""Return True if there is an effective breakpoint for this line.
Check for line or function breakpoint and if in effect.
Delete temporary breakpoints if effective() says to.
"""
filename = self.canonic(frame.f_code.co_filename)
if filename not in self.breaks:
return False
lineno = frame.f_lineno
if lineno not in self.breaks[filename]:
# The line itself has no breakpoint, but maybe the line is the
# first line of a function with breakpoint set by function name.
lineno = frame.f_code.co_firstlineno
if lineno not in self.breaks[filename]:
return False
# flag says ok to delete temp. bp
(bp, flag) = effective(filename, lineno, frame)
if bp:
self.currentbp = bp.number
if (flag and bp.temporary):
self.do_clear(str(bp.number))
return True
else:
return False
def do_clear(self, arg):
"""Remove temporary breakpoint.
Must implement in derived classes or get NotImplementedError.
"""
raise NotImplementedError("subclass of bdb must implement do_clear()")
def break_anywhere(self, frame):
"""Return True if there is any breakpoint for frame's filename.
"""
return self.canonic(frame.f_code.co_filename) in self.breaks
# Derived classes should override the user_* methods
# to gain control.
def user_call(self, frame, argument_list):
"""Called if we might stop in a function."""
pass
def user_line(self, frame):
"""Called when we stop or break at a line."""
pass
def user_return(self, frame, return_value):
"""Called when a return trap is set here."""
pass
def user_exception(self, frame, exc_info):
"""Called when we stop on an exception."""
pass
def _set_stopinfo(self, stopframe, returnframe, stoplineno=0):
"""Set the attributes for stopping.
If stoplineno is greater than or equal to 0, then stop at line
greater than or equal to the stopline. If stoplineno is -1, then
don't stop at all.
"""
self.stopframe = stopframe
self.returnframe = returnframe
self.quitting = False
# stoplineno >= 0 means: stop at line >= the stoplineno
# stoplineno -1 means: don't stop at all
self.stoplineno = stoplineno
# Derived classes and clients can call the following methods
# to affect the stepping state.
def set_until(self, frame, lineno=None):
"""Stop when the line with the lineno greater than the current one is
reached or when returning from current frame."""
# the name "until" is borrowed from gdb
if lineno is None:
lineno = frame.f_lineno + 1
self._set_stopinfo(frame, frame, lineno)
def set_step(self):
"""Stop after one line of code."""
# Issue #13183: pdb skips frames after hitting a breakpoint and running
# step commands.
# Restore the trace function in the caller (that may not have been set
# for performance reasons) when returning from the current frame.
if self.frame_returning:
caller_frame = self.frame_returning.f_back
if caller_frame and not caller_frame.f_trace:
caller_frame.f_trace = self.trace_dispatch
self._set_stopinfo(None, None)
def set_next(self, frame):
"""Stop on the next line in or below the given frame."""
self._set_stopinfo(frame, None)
def set_return(self, frame):
"""Stop when returning from the given frame."""
if self.is_coroutine(frame):
self._set_stopinfo(frame, None, -1)
else:
self._set_stopinfo(frame.f_back, frame)
def set_trace(self, frame=None):
"""Start debugging from frame.
If frame is not specified, debugging starts from caller's frame.
"""
if frame is None:
frame = sys._getframe().f_back
self.reset()
while frame:
frame.f_trace = self.trace_dispatch
self.botframe = frame
frame = frame.f_back
self.set_step()
sys.settrace(self.trace_dispatch)
def set_continue(self):
"""Stop only at breakpoints or when finished.
If there are no breakpoints, set the system trace function to None.
"""
# Don't stop except at breakpoints or when finished
self._set_stopinfo(self.botframe, None, -1)
if not self.breaks:
# no breakpoints; run without debugger overhead
sys.settrace(None)
frame = sys._getframe().f_back
while frame and frame is not self.botframe:
del frame.f_trace
frame = frame.f_back
def set_quit(self):
"""Set quitting attribute to True.
Raises BdbQuit exception in the next call to a dispatch_*() method.
"""
self.stopframe = self.botframe
self.returnframe = None
self.quitting = True
sys.settrace(None)
# Derived classes and clients can call the following methods
# to manipulate breakpoints. These methods return an
# error message if something went wrong, None if all is well.
# Set_break prints out the breakpoint line and file:lineno.
# Call self.get_*break*() to see the breakpoints or better
# for bp in Breakpoint.bpbynumber: if bp: bp.bpprint().
def _add_to_breaks(self, filename, lineno):
"""Add breakpoint to breaks, if not already there."""
bp_linenos = self.breaks.setdefault(filename, [])
if lineno not in bp_linenos:
bp_linenos.append(lineno)
def set_break(self, filename, lineno, temporary=False, cond=None,
funcname=None):
"""Set a new breakpoint for filename:lineno.
If lineno doesn't exist for the filename, return an error message.
The filename should be in canonical form.
"""
filename = self.canonic(filename)
import linecache # Import as late as possible
line = linecache.getline(filename, lineno)
if not line:
return 'Line %s:%d does not exist' % (filename, lineno)
self._add_to_breaks(filename, lineno)
bp = Breakpoint(filename, lineno, temporary, cond, funcname)
return None
def _load_breaks(self):
"""Apply all breakpoints (set in other instances) to this one.
Populates this instance's breaks list from the Breakpoint class's
list, which can have breakpoints set by another Bdb instance. This
is necessary for interactive sessions to keep the breakpoints
active across multiple calls to run().
"""
for (filename, lineno) in Breakpoint.bplist.keys():
self._add_to_breaks(filename, lineno)
def _prune_breaks(self, filename, lineno):
"""Prune breakpoints for filename:lineno.
A list of breakpoints is maintained in the Bdb instance and in
the Breakpoint class. If a breakpoint in the Bdb instance no
longer exists in the Breakpoint class, then it's removed from the
Bdb instance.
"""
if (filename, lineno) not in Breakpoint.bplist:
self.breaks[filename].remove(lineno)
if not self.breaks[filename]:
del self.breaks[filename]
def clear_break(self, filename, lineno):
"""Delete breakpoints for filename:lineno.
If no breakpoints were set, return an error message.
"""
filename = self.canonic(filename)
if filename not in self.breaks:
return 'There are no breakpoints in %s' % filename
if lineno not in self.breaks[filename]:
return 'There is no breakpoint at %s:%d' % (filename, lineno)
# If there's only one bp in the list for that file,line
# pair, then remove the breaks entry
for bp in Breakpoint.bplist[filename, lineno][:]:
bp.deleteMe()
self._prune_breaks(filename, lineno)
return None
def clear_bpbynumber(self, arg):
"""Delete a breakpoint by its index in Breakpoint.bpbynumber.
If arg is invalid, return an error message.
"""
try:
bp = self.get_bpbynumber(arg)
except ValueError as err:
return str(err)
bp.deleteMe()
self._prune_breaks(bp.file, bp.line)
return None
def clear_all_file_breaks(self, filename):
"""Delete all breakpoints in filename.
If none were set, return an error message.
"""
filename = self.canonic(filename)
if filename not in self.breaks:
return 'There are no breakpoints in %s' % filename
for line in self.breaks[filename]:
blist = Breakpoint.bplist[filename, line]
for bp in blist:
bp.deleteMe()
del self.breaks[filename]
return None
def clear_all_breaks(self):
"""Delete all existing breakpoints.
If none were set, return an error message.
"""
if not self.breaks:
return 'There are no breakpoints'
for bp in Breakpoint.bpbynumber:
if bp:
bp.deleteMe()
self.breaks = {}
return None
def get_bpbynumber(self, arg):
"""Return a breakpoint by its index in Breakpoint.bybpnumber.
For invalid arg values or if the breakpoint doesn't exist,
raise a ValueError.
"""
if not arg:
raise ValueError('Breakpoint number expected')
try:
number = int(arg)
except ValueError:
raise ValueError('Non-numeric breakpoint number %s' % arg) from None
try:
bp = Breakpoint.bpbynumber[number]
except IndexError:
raise ValueError('Breakpoint number %d out of range' % number) from None
if bp is None:
raise ValueError('Breakpoint %d already deleted' % number)
return bp
def get_break(self, filename, lineno):
"""Return True if there is a breakpoint for filename:lineno."""
filename = self.canonic(filename)
return filename in self.breaks and \
lineno in self.breaks[filename]
def get_breaks(self, filename, lineno):
"""Return all breakpoints for filename:lineno.
If no breakpoints are set, return an empty list.
"""
filename = self.canonic(filename)
return filename in self.breaks and \
lineno in self.breaks[filename] and \
Breakpoint.bplist[filename, lineno] or []
def get_file_breaks(self, filename):
"""Return all lines with breakpoints for filename.
If no breakpoints are set, return an empty list.
"""
filename = self.canonic(filename)
if filename in self.breaks:
return self.breaks[filename]
else:
return []
def get_all_breaks(self):
"""Return all breakpoints that are set."""
return self.breaks
# Derived classes and clients can call the following method
# to get a data structure representing a stack trace.
def get_stack(self, f, t):
"""Return a list of (frame, lineno) in a stack trace and a size.
List starts with original calling frame, if there is one.
Size may be number of frames above or below f.
"""
stack = []
if t and t.tb_frame is f:
t = t.tb_next
while f is not None:
stack.append((f, f.f_lineno))
if f is self.botframe:
break
f = f.f_back
stack.reverse()
i = max(0, len(stack) - 1)
while t is not None:
stack.append((t.tb_frame, t.tb_lineno))
t = t.tb_next
if f is None:
i = max(0, len(stack) - 1)
return stack, i
def format_stack_entry(self, frame_lineno, lprefix=': '):
"""Return a string with information about a stack entry.
The stack entry frame_lineno is a (frame, lineno) tuple. The
return string contains the canonical filename, the function name
or '<lambda>', the input arguments, the return value, and the
line of code (if it exists).
"""
## MPY: reprlib not yet available
import linecache #, reprlib
frame, lineno = frame_lineno
filename = self.canonic(frame.f_code.co_filename)
s = '%s(%r)' % (filename, lineno)
if frame.f_code.co_name:
s += frame.f_code.co_name
else:
s += "<lambda>"
s += '()'
if '__return__' in frame.f_locals:
rv = frame.f_locals['__return__']
s += '->'
s += repr(rv)
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
s += lprefix + line.strip()
return s
# The following methods can be called by clients to use
# a debugger to debug a statement or an expression.
# Both can be given as a string, or a code object.
def run(self, cmd, globals=None, locals=None):
"""Debug a statement executed via the exec() function.
globals defaults to __main__.dict; locals defaults to globals.
"""
if globals is None:
import __main__
globals = __main__.__dict__
if locals is None:
locals = globals
self.reset()
if isinstance(cmd, str):
cmd = compile(cmd, "<string>", "exec")
sys.settrace(self.trace_dispatch)
try:
exec(cmd, globals, locals)
except BdbQuit:
pass
finally:
self.quitting = True
sys.settrace(None)
def runeval(self, expr, globals=None, locals=None):
"""Debug an expression executed via the eval() function.
globals defaults to __main__.dict; locals defaults to globals.
"""
if globals is None:
import __main__
globals = __main__.__dict__
if locals is None:
locals = globals
self.reset()
sys.settrace(self.trace_dispatch)
try:
return eval(expr, globals, locals)
except BdbQuit:
pass
finally:
self.quitting = True
sys.settrace(None)
def runctx(self, cmd, globals, locals):
"""For backwards-compatibility. Defers to run()."""
# B/W compatibility
self.run(cmd, globals, locals)
# This method is more useful to debug a single function call.
def runcall(self, func, *args, **kwds):
"""Debug a single function call.
Return the result of the function call.
"""
self.reset()
sys.settrace(self.trace_dispatch)
res = None
try:
res = func(*args, **kwds)
except BdbQuit:
pass
finally:
self.quitting = True
sys.settrace(None)
return res
def set_trace():
"""Start debugging with a Bdb instance from the caller's frame."""
Bdb().set_trace()
class Breakpoint:
"""Breakpoint class.
Implements temporary breakpoints, ignore counts, disabling and
(re)-enabling, and conditionals.
Breakpoints are indexed by number through bpbynumber and by
the (file, line) tuple using bplist. The former points to a
single instance of class Breakpoint. The latter points to a
list of such instances since there may be more than one
breakpoint per line.
When creating a breakpoint, its associated filename should be
in canonical form. If funcname is defined, a breakpoint hit will be
counted when the first line of that function is executed. A
conditional breakpoint always counts a hit.
"""
# XXX Keeping state in the class is a mistake -- this means
# you cannot have more than one active Bdb instance.
next = 1 # Next bp to be assigned
bplist = {} # indexed by (file, lineno) tuple
bpbynumber = [None] # Each entry is None or an instance of Bpt
# index 0 is unused, except for marking an
# effective break .... see effective()
def __init__(self, file, line, temporary=False, cond=None, funcname=None):
self.funcname = funcname
# Needed if funcname is not None.
self.func_first_executable_line = None
self.file = file # This better be in canonical form!
self.line = line
self.temporary = temporary
self.cond = cond
self.enabled = True
self.ignore = 0
self.hits = 0
self.number = Breakpoint.next
Breakpoint.next += 1
# Build the two lists
self.bpbynumber.append(self)
if (file, line) in self.bplist:
self.bplist[file, line].append(self)
else:
self.bplist[file, line] = [self]
@staticmethod
def clearBreakpoints():
Breakpoint.next = 1
Breakpoint.bplist = {}
Breakpoint.bpbynumber = [None]
def deleteMe(self):
"""Delete the breakpoint from the list associated to a file:line.
If it is the last breakpoint in that position, it also deletes
the entry for the file:line.
"""
index = (self.file, self.line)
self.bpbynumber[self.number] = None # No longer in list
self.bplist[index].remove(self)
if not self.bplist[index]:
# No more bp for this f:l combo
del self.bplist[index]
def enable(self):
"""Mark the breakpoint as enabled."""
self.enabled = True
def disable(self):
"""Mark the breakpoint as disabled."""
self.enabled = False
def bpprint(self, out=None):
"""Print the output of bpformat().
The optional out argument directs where the output is sent
and defaults to standard output.
"""
if out is None:
out = sys.stdout
print(self.bpformat(), file=out)
def bpformat(self):
"""Return a string with information about the breakpoint.
The information includes the breakpoint number, temporary
status, file:line position, break condition, number of times to
ignore, and number of times hit.
"""
if self.temporary:
disp = 'del '
else:
disp = 'keep '
if self.enabled:
disp = disp + 'yes '
else:
disp = disp + 'no '
ret = '%-4dbreakpoint %s at %s:%d' % (self.number, disp,
self.file, self.line)
if self.cond:
ret += '\n\tstop only if %s' % (self.cond,)
if self.ignore:
ret += '\n\tignore next %d hits' % (self.ignore,)
if self.hits:
if self.hits > 1:
ss = 's'
else:
ss = ''
ret += '\n\tbreakpoint already hit %d time%s' % (self.hits, ss)
return ret
def __str__(self):
"Return a condensed description of the breakpoint."
return 'breakpoint %s at %s:%s' % (self.number, self.file, self.line)
# -----------end of Breakpoint class----------
def checkfuncname(b, frame):
"""Return True if break should happen here.
Whether a break should happen depends on the way that b (the breakpoint)
was set. If it was set via line number, check if b.line is the same as
the one in the frame. If it was set via function name, check if this is
the right function and if it is on the first executable line.
"""
if not b.funcname:
# Breakpoint was set via line number.
if b.line != frame.f_lineno:
# Breakpoint was set at a line with a def statement and the function
# defined is called: don't break.
return False
return True
# Breakpoint set via function name.
if frame.f_code.co_name != b.funcname:
# It's not a function call, but rather execution of def statement.
return False
# We are in the right frame.
if not b.func_first_executable_line:
# The function is entered for the 1st time.
b.func_first_executable_line = frame.f_lineno
if b.func_first_executable_line != frame.f_lineno:
# But we are not at the first line number: don't break.
return False
return True
# Determines if there is an effective (active) breakpoint at this
# line of code. Returns breakpoint number or 0 if none
def effective(file, line, frame):
"""Determine which breakpoint for this file:line is to be acted upon.
Called only if we know there is a breakpoint at this location. Return
the breakpoint that was triggered and a boolean that indicates if it is
ok to delete a temporary breakpoint. Return (None, None) if there is no
matching breakpoint.
"""
possibles = Breakpoint.bplist[file, line]
for b in possibles:
if not b.enabled:
continue
if not checkfuncname(b, frame):
continue
# Count every hit when bp is enabled
b.hits += 1
if not b.cond:
# If unconditional, and ignoring go on to next, else break
if b.ignore > 0:
b.ignore -= 1
continue
else:
# breakpoint and marker that it's ok to delete if temporary
return (b, True)
else:
# Conditional bp.
# Ignore count applies only to those bpt hits where the
# condition evaluates to true.
try:
val = eval(b.cond, frame.f_globals, frame.f_locals)
if val:
if b.ignore > 0:
b.ignore -= 1
# continue
else:
return (b, True)
# else:
# continue
except:
# if eval fails, most conservative thing is to stop on
# breakpoint regardless of ignore count. Don't delete
# temporary, as another hint to user.
return (b, False)
return (None, None)
# -------------------- testing --------------------
class Tdb(Bdb):
def user_call(self, frame, args):
name = frame.f_code.co_name
if not name: name = '???'
print('+++ call', name, args)
def user_line(self, frame):
import linecache
name = frame.f_code.co_name
if not name: name = '???'
fn = self.canonic(frame.f_code.co_filename)
line = linecache.getline(fn, frame.f_lineno, frame.f_globals)
print('+++', fn, frame.f_lineno, name, ':', line.strip())
def user_return(self, frame, retval):
print('+++ return', retval)
def user_exception(self, frame, exc_stuff):
print('+++ exception', exc_stuff)
self.set_continue()
def foo(n):
print('foo(', n, ')')
x = bar(n*10)
print('bar returned', x)
def bar(a):
print('bar(', a, ')')
return a/2
def test():
t = Tdb()
t.run('import bdb; bdb.foo(10)')

Wyświetl plik

@ -0,0 +1,3 @@
srctype = cpython
type = module
version = 0.0.1

Wyświetl plik

@ -0,0 +1,24 @@
import sys
# Remove current dir from sys.path, otherwise setuptools will peek up our
# module instead of system's.
sys.path.pop(0)
from setuptools import setup
sys.path.append("..")
import sdist_upip
setup(
name="micropython-bdb",
version="0.0.1",
description="CPython bdb module ported to MicroPython",
long_description="This is a module ported from CPython standard library to be compatible with\nMicroPython interpreter. Usually, this means applying small patches for\nfeatures not supported (yet, or at all) in MicroPython. Sometimes, heavier\nchanges are required. Note that CPython modules are written with availability\nof vast resources in mind, and may not work for MicroPython ports with\nlimited heap. If you are affected by such a case, please help reimplement\nthe module from scratch.",
url="https://github.com/micropython/micropython-lib",
author="CPython Developers",
author_email="python-dev@python.org",
maintainer="micropython-lib Developers",
maintainer_email="micro-python@googlegroups.com",
license="Python",
cmdclass={"sdist": sdist_upip.sdist},
py_modules=["bdb"],
)

Wyświetl plik

@ -85,13 +85,16 @@ class Cmd:
nohelp = "*** No help on %s"
use_rawinput = 1
def __init__(self, stdin=None, stdout=None):
def __init__(self, completekey='tab', stdin=None, stdout=None):
"""Instantiate a line-oriented interpreter framework.
The optional arguments stdin and stdout
specify alternate input and output file objects; if not specified,
sys.stdin and sys.stdout are used.
The optional argument completekey is not used, but provided for
compatibility with calling libraries.
"""
if stdin is not None:
self.stdin = stdin

Wyświetl plik

@ -0,0 +1,315 @@
"""Utilities needed to emulate Python's interactive interpreter.
"""
# Inspired by similar code by Jeff Epler and Fredrik Lundh.
import sys
import traceback
from codeop import CommandCompiler, compile_command
__all__ = ["InteractiveInterpreter", "InteractiveConsole", "interact",
"compile_command"]
class InteractiveInterpreter:
"""Base class for InteractiveConsole.
This class deals with parsing and interpreter state (the user's
namespace); it doesn't deal with input buffering or prompting or
input file naming (the filename is always passed in explicitly).
"""
def __init__(self, locals=None):
"""Constructor.
The optional 'locals' argument specifies the dictionary in
which code will be executed; it defaults to a newly created
dictionary with key "__name__" set to "__console__" and key
"__doc__" set to None.
"""
if locals is None:
locals = {"__name__": "__console__", "__doc__": None}
self.locals = locals
self.compile = CommandCompiler()
def runsource(self, source, filename="<input>", symbol="single"):
"""Compile and run some source in the interpreter.
Arguments are as for compile_command().
One of several things can happen:
1) The input is incorrect; compile_command() raised an
exception (SyntaxError or OverflowError). A syntax traceback
will be printed by calling the showsyntaxerror() method.
2) The input is incomplete, and more input is required;
compile_command() returned None. Nothing happens.
3) The input is complete; compile_command() returned a code
object. The code is executed by calling self.runcode() (which
also handles run-time exceptions, except for SystemExit).
The return value is True in case 2, False in the other cases (unless
an exception is raised). The return value can be used to
decide whether to use sys.ps1 or sys.ps2 to prompt the next
line.
"""
try:
code = self.compile(source, filename, symbol)
except (OverflowError, SyntaxError, ValueError):
# Case 1
self.showsyntaxerror(filename)
return False
if code is None:
# Case 2
return True
# Case 3
self.runcode(code)
return False
def runcode(self, code):
"""Execute a code object.
When an exception occurs, self.showtraceback() is called to
display a traceback. All exceptions are caught except
SystemExit, which is reraised.
A note about KeyboardInterrupt: this exception may occur
elsewhere in this code, and may not always be caught. The
caller should be prepared to deal with it.
"""
try:
exec(code, self.locals)
except SystemExit:
raise
except:
self.showtraceback()
def showsyntaxerror(self, filename=None):
"""Display the syntax error that just occurred.
This doesn't display a stack trace because there isn't one.
If a filename is given, it is stuffed in the exception instead
of what was there before (because Python's parser always uses
"<string>" when reading from a string).
The output is written by self.write(), below.
"""
type, value, tb = sys.exc_info()
sys.last_type = type
sys.last_value = value
sys.last_traceback = tb
if filename and type is SyntaxError:
# Work hard to stuff the correct filename in the exception
try:
msg, (dummy_filename, lineno, offset, line) = value.args
except ValueError:
# Not the format we expect; leave it alone
pass
else:
# Stuff in the right filename
value = SyntaxError(msg, (filename, lineno, offset, line))
sys.last_value = value
if sys.excepthook is sys.__excepthook__:
lines = traceback.format_exception_only(type, value)
self.write(''.join(lines))
else:
# If someone has set sys.excepthook, we let that take precedence
# over self.write
sys.excepthook(type, value, tb)
def showtraceback(self):
"""Display the exception that just occurred.
We remove the first stack item because it is our own code.
The output is written by self.write(), below.
"""
sys.last_type, sys.last_value, last_tb = ei = sys.exc_info()
sys.last_traceback = last_tb
try:
lines = traceback.format_exception(ei[0], ei[1], last_tb.tb_next)
if sys.excepthook is sys.__excepthook__:
self.write(''.join(lines))
else:
# If someone has set sys.excepthook, we let that take precedence
# over self.write
sys.excepthook(ei[0], ei[1], last_tb)
finally:
last_tb = ei = None
def write(self, data):
"""Write a string.
The base implementation writes to sys.stderr; a subclass may
replace this with a different implementation.
"""
sys.stderr.write(data)
class InteractiveConsole(InteractiveInterpreter):
"""Closely emulate the behavior of the interactive Python interpreter.
This class builds on InteractiveInterpreter and adds prompting
using the familiar sys.ps1 and sys.ps2, and input buffering.
"""
def __init__(self, locals=None, filename="<console>"):
"""Constructor.
The optional locals argument will be passed to the
InteractiveInterpreter base class.
The optional filename argument should specify the (file)name
of the input stream; it will show up in tracebacks.
"""
InteractiveInterpreter.__init__(self, locals)
self.filename = filename
self.resetbuffer()
def resetbuffer(self):
"""Reset the input buffer."""
self.buffer = []
def interact(self, banner=None, exitmsg=None):
"""Closely emulate the interactive Python console.
The optional banner argument specifies the banner to print
before the first interaction; by default it prints a banner
similar to the one printed by the real Python interpreter,
followed by the current class name in parentheses (so as not
to confuse this with the real interpreter -- since it's so
close!).
The optional exitmsg argument specifies the exit message
printed when exiting. Pass the empty string to suppress
printing an exit message. If exitmsg is not given or None,
a default message is printed.
"""
## MPY: Older versions of mpy don't support setting ps1 & ps2.
ps1 = ">>> "
ps2 = "... "
try:
ps1 = sys.ps1
ps2 = sys.ps2
except:
pass
cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
if banner is None:
self.write("Python %s on %s\n%s\n(%s)\n" %
(sys.version, sys.platform, cprt,
self.__class__.__name__))
elif banner:
self.write("%s\n" % str(banner))
more = 0
while 1:
try:
if more:
prompt = ps2
else:
prompt = ps1
try:
line = self.raw_input(prompt)
except EOFError:
self.write("\n")
break
else:
more = self.push(line)
except KeyboardInterrupt:
self.write("\nKeyboardInterrupt\n")
self.resetbuffer()
more = 0
if exitmsg is None:
self.write('now exiting %s...\n' % self.__class__.__name__)
elif exitmsg != '':
self.write('%s\n' % exitmsg)
def push(self, line):
"""Push a line to the interpreter.
The line should not have a trailing newline; it may have
internal newlines. The line is appended to a buffer and the
interpreter's runsource() method is called with the
concatenated contents of the buffer as source. If this
indicates that the command was executed or invalid, the buffer
is reset; otherwise, the command is incomplete, and the buffer
is left as it was after the line was appended. The return
value is 1 if more input is required, 0 if the line was dealt
with in some way (this is the same as runsource()).
"""
self.buffer.append(line)
source = "\n".join(self.buffer)
more = self.runsource(source, self.filename)
if not more:
self.resetbuffer()
return more
def raw_input(self, prompt=""):
"""Write a prompt and read a line.
The returned line does not include the trailing newline.
When the user enters the EOF key sequence, EOFError is raised.
The base implementation uses the built-in function
input(); a subclass may replace this with a different
implementation.
"""
return input(prompt)
def interact(banner=None, readfunc=None, local=None, exitmsg=None):
"""Closely emulate the interactive Python interpreter.
This is a backwards compatible interface to the InteractiveConsole
class. When readfunc is not specified, it attempts to import the
readline module to enable GNU readline if it is available.
Arguments (all optional, all default to None):
banner -- passed to InteractiveConsole.interact()
readfunc -- if not None, replaces InteractiveConsole.raw_input()
local -- passed to InteractiveInterpreter.__init__()
exitmsg -- passed to InteractiveConsole.interact()
"""
console = InteractiveConsole(local)
if readfunc is not None:
console.raw_input = readfunc
else:
try:
import readline
except ImportError:
pass
console.interact(banner, exitmsg)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-q', action='store_true',
help="don't print version and copyright messages")
args = parser.parse_args()
if args.q or sys.flags.quiet:
banner = ''
else:
banner = None
interact(banner)

Wyświetl plik

@ -0,0 +1,3 @@
srctype = cpython
type = module
version = 0.0.1

Wyświetl plik

@ -0,0 +1,24 @@
import sys
# Remove current dir from sys.path, otherwise setuptools will peek up our
# module instead of system's.
sys.path.pop(0)
from setuptools import setup
sys.path.append("..")
import sdist_upip
setup(
name="micropython-code",
version="0.0.1",
description="CPython code module ported to MicroPython",
long_description="This is a module ported from CPython standard library to be compatible with\nMicroPython interpreter. Usually, this means applying small patches for\nfeatures not supported (yet, or at all) in MicroPython. Sometimes, heavier\nchanges are required. Note that CPython modules are written with availability\nof vast resources in mind, and may not work for MicroPython ports with\nlimited heap. If you are affected by such a case, please help reimplement\nthe module from scratch.",
url="https://github.com/micropython/micropython-lib",
author="CPython Developers",
author_email="python-dev@python.org",
maintainer="micropython-lib Developers",
maintainer_email="micro-python@googlegroups.com",
license="Python",
cmdclass={"sdist": sdist_upip.sdist},
py_modules=["code"],
)

Wyświetl plik

@ -0,0 +1,155 @@
r"""Utilities to compile possibly incomplete Python source code.
This module provides two interfaces, broadly similar to the builtin
function compile(), which take program text, a filename and a 'mode'
and:
- Return code object if the command is complete and valid
- Return None if the command is incomplete
- Raise SyntaxError, ValueError or OverflowError if the command is a
syntax error (OverflowError and ValueError can be produced by
malformed literals).
The two interfaces are:
compile_command(source, filename, symbol):
Compiles a single command in the manner described above.
CommandCompiler():
Instances of this class have __call__ methods identical in
signature to compile_command; the difference is that if the
instance compiles program text containing a __future__ statement,
the instance 'remembers' and compiles all subsequent program texts
with the statement in force.
The module also provides another class:
Compile():
Instances of this class act like the built-in function compile,
but with 'memory' in the sense described above.
"""
# import __future__
import warnings
## MPY: Future flags arent set on compiled code so just remove the checks
# _features = [getattr(__future__, fname)
# for fname in __future__.all_feature_names]
__all__ = ["compile_command", "Compile", "CommandCompiler"]
# The following flags match the values from Include/cpython/compile.h
# Caveat emptor: These flags are undocumented on purpose and depending
# on their effect outside the standard library is **unsupported**.
PyCF_DONT_IMPLY_DEDENT = 0x200
PyCF_ALLOW_INCOMPLETE_INPUT = 0x4000
def _maybe_compile(compiler, source, filename, symbol):
# Check for source consisting of only blank lines and comments.
for line in source.split("\n"):
line = line.strip()
if line and line[0] != '#':
break # Leave it alone.
else:
if symbol != "eval":
source = "pass" # Replace it with a 'pass' statement
try:
return compiler(source, filename, symbol)
except SyntaxError: # Let other compile() errors propagate.
pass
# Catch syntax warnings after the first compile
# to emit warnings (SyntaxWarning, DeprecationWarning) at most once.
with warnings.catch_warnings():
warnings.simplefilter("error")
try:
compiler(source + "\n", filename, symbol)
except SyntaxError as e:
if "incomplete input" in str(e):
return None
raise
def _is_syntax_error(err1, err2):
rep1 = repr(err1)
rep2 = repr(err2)
if "was never closed" in rep1 and "was never closed" in rep2:
return False
if rep1 == rep2:
return True
return False
def _compile(source, filename, symbol):
return compile(source, filename, symbol, PyCF_DONT_IMPLY_DEDENT | PyCF_ALLOW_INCOMPLETE_INPUT)
def compile_command(source, filename="<input>", symbol="single"):
r"""Compile a command and determine whether it is incomplete.
Arguments:
source -- the source string; may contain \n characters
filename -- optional filename from which source was read; default
"<input>"
symbol -- optional grammar start symbol; "single" (default), "exec"
or "eval"
Return value / exceptions raised:
- Return a code object if the command is complete and valid
- Return None if the command is incomplete
- Raise SyntaxError, ValueError or OverflowError if the command is a
syntax error (OverflowError and ValueError can be produced by
malformed literals).
"""
return _maybe_compile(_compile, source, filename, symbol)
class Compile:
"""Instances of this class behave much like the built-in compile
function, but if one is used to compile text containing a future
statement, it "remembers" and compiles all subsequent program texts
with the statement in force."""
def __init__(self):
self.flags = PyCF_DONT_IMPLY_DEDENT | PyCF_ALLOW_INCOMPLETE_INPUT
def __call__(self, source, filename, symbol):
codeob = compile(source, filename, symbol, self.flags, True)
## MPY: Future flags arent set on compiled code so just remove the checks
# for feature in _features:
# if codeob.co_flags & feature.compiler_flag:
# self.flags |= feature.compiler_flag
return codeob
class CommandCompiler:
"""Instances of this class have __call__ methods identical in
signature to compile_command; the difference is that if the
instance compiles program text containing a __future__ statement,
the instance 'remembers' and compiles all subsequent program texts
with the statement in force."""
def __init__(self,):
self.compiler = Compile()
def __call__(self, source, filename="<input>", symbol="single"):
r"""Compile a command and determine whether it is incomplete.
Arguments:
source -- the source string; may contain \n characters
filename -- optional filename from which source was read;
default "<input>"
symbol -- optional grammar start symbol; "single" (default) or
"eval"
Return value / exceptions raised:
- Return a code object if the command is complete and valid
- Return None if the command is incomplete
- Raise SyntaxError, ValueError or OverflowError if the command is a
syntax error (OverflowError and ValueError can be produced by
malformed literals).
"""
return _maybe_compile(self.compiler, source, filename, symbol)

Wyświetl plik

@ -0,0 +1,3 @@
srctype = cpython
type = module
version = 0.0.1

Wyświetl plik

@ -0,0 +1,24 @@
import sys
# Remove current dir from sys.path, otherwise setuptools will peek up our
# module instead of system's.
sys.path.pop(0)
from setuptools import setup
sys.path.append("..")
import sdist_upip
setup(
name="micropython-codeop",
version="0.0.1",
description="CPython codeop module ported to MicroPython",
long_description="This is a module ported from CPython standard library to be compatible with\nMicroPython interpreter. Usually, this means applying small patches for\nfeatures not supported (yet, or at all) in MicroPython. Sometimes, heavier\nchanges are required. Note that CPython modules are written with availability\nof vast resources in mind, and may not work for MicroPython ports with\nlimited heap. If you are affected by such a case, please help reimplement\nthe module from scratch.",
url="https://github.com/micropython/micropython-lib",
author="CPython Developers",
author_email="python-dev@python.org",
maintainer="micropython-lib Developers",
maintainer_email="micro-python@googlegroups.com",
license="Python",
cmdclass={"sdist": sdist_upip.sdist},
py_modules=["codeop"],
)

Wyświetl plik

@ -0,0 +1,182 @@
"""Cache lines from Python source files.
This is intended to read lines from modules imported -- hence if a filename
is not found, it will look down the module search path for a file by
that name.
"""
import functools
import sys
import os
import tokenize
__all__ = ["getline", "clearcache", "checkcache", "lazycache"]
# The cache. Maps filenames to either a thunk which will provide source code,
# or a tuple (size, mtime, lines, fullname) once loaded.
cache = {}
def clearcache():
"""Clear the cache entirely."""
cache.clear()
def getline(filename, lineno, module_globals=None):
"""Get a line for a Python source file from the cache.
Update the cache if it doesn't contain an entry for this file already."""
lines = getlines(filename, module_globals)
if 1 <= lineno <= len(lines):
return lines[lineno - 1]
return ''
def getlines(filename, module_globals=None):
"""Get the lines for a Python source file from the cache.
Update the cache if it doesn't contain an entry for this file already."""
if filename in cache:
entry = cache[filename]
if len(entry) != 1:
return cache[filename][2]
try:
return updatecache(filename, module_globals)
except MemoryError:
clearcache()
return []
def checkcache(filename=None):
"""Discard cache entries that are out of date.
(This is not checked upon each call!)"""
if filename is None:
filenames = list(cache.keys())
elif filename in cache:
filenames = [filename]
else:
return
for filename in filenames:
entry = cache[filename]
if len(entry) == 1:
# lazy cache entry, leave it lazy.
continue
size, mtime, lines, fullname = entry
if mtime is None:
continue # no-op for files loaded via a __loader__
try:
stat = os.stat(fullname)
except OSError:
cache.pop(filename, None)
continue
if size != stat.st_size or mtime != stat.st_mtime:
cache.pop(filename, None)
def updatecache(filename, module_globals=None):
"""Update a cache entry and return its list of lines.
If something's wrong, print a message, discard the cache entry,
and return an empty list."""
if filename in cache:
if len(cache[filename]) != 1:
cache.pop(filename, None)
if not filename or (filename.startswith('<') and filename.endswith('>')):
return []
fullname = filename
try:
stat = os.stat(fullname)
except OSError:
basename = filename
# Realise a lazy loader based lookup if there is one
# otherwise try to lookup right now.
if lazycache(filename, module_globals):
try:
data = cache[filename][0]()
except (ImportError, OSError):
pass
else:
if data is None:
# No luck, the PEP302 loader cannot find the source
# for this module.
return []
cache[filename] = (
len(data),
None,
[line + '\n' for line in data.splitlines()],
fullname
)
return cache[filename][2]
# Try looking through the module search path, which is only useful
# when handling a relative filename.
if os.path.isabs(filename):
return []
for dirname in sys.path:
try:
fullname = os.path.join(dirname, basename)
except (TypeError, AttributeError):
# Not sufficiently string-like to do anything useful with.
continue
try:
stat = os.stat(fullname)
break
except OSError:
pass
else:
return []
try:
with tokenize.open(fullname) as fp:
lines = fp.readlines()
except OSError:
return []
if lines and not lines[-1].endswith('\n'):
lines[-1] += '\n'
size, mtime = stat.st_size, stat.st_mtime
cache[filename] = size, mtime, lines, fullname
return lines
def lazycache(filename, module_globals):
"""Seed the cache for filename with module_globals.
The module loader will be asked for the source only when getlines is
called, not immediately.
If there is an entry in the cache already, it is not altered.
:return: True if a lazy load is registered in the cache,
otherwise False. To register such a load a module loader with a
get_source method must be found, the filename must be a cacheable
filename, and the filename must not be already cached.
"""
if filename in cache:
if len(cache[filename]) == 1:
return True
else:
return False
if not filename or (filename.startswith('<') and filename.endswith('>')):
return False
# Try for a __loader__, if available
if module_globals and '__name__' in module_globals:
name = module_globals['__name__']
if (loader := module_globals.get('__loader__')) is None:
if spec := module_globals.get('__spec__'):
try:
loader = spec.loader
except AttributeError:
pass
get_source = getattr(loader, 'get_source', None)
if name and get_source:
get_lines = functools.partial(get_source, name)
cache[filename] = (get_lines,)
return True
return False

Wyświetl plik

@ -0,0 +1,3 @@
srctype = cpython
type = module
version = 0.0.1

Wyświetl plik

@ -0,0 +1,24 @@
import sys
# Remove current dir from sys.path, otherwise setuptools will peek up our
# module instead of system's.
sys.path.pop(0)
from setuptools import setup
sys.path.append("..")
import sdist_upip
setup(
name="micropython-linecache",
version="0.0.1",
description="CPython linecache module ported to MicroPython",
long_description="This is a module ported from CPython standard library to be compatible with\nMicroPython interpreter. Usually, this means applying small patches for\nfeatures not supported (yet, or at all) in MicroPython. Sometimes, heavier\nchanges are required. Note that CPython modules are written with availability\nof vast resources in mind, and may not work for MicroPython ports with\nlimited heap. If you are affected by such a case, please help reimplement\nthe module from scratch.",
url="https://github.com/micropython/micropython-lib",
author="CPython Developers",
author_email="python-dev@python.org",
maintainer="micropython-lib Developers",
maintainer_email="micro-python@googlegroups.com",
license="Python",
cmdclass={"sdist": sdist_upip.sdist},
py_modules=["linecache"],
)

Wyświetl plik

@ -12,6 +12,10 @@ def normpath(s):
return s
def realpath(s):
return s
def abspath(s):
if s[0] != "/":
return os.getcwd() + "/" + s

Wyświetl plik

@ -1,8 +1,40 @@
# Replace built-in os module.
from uos import *
# Include built-in os module.
import sys
__path = sys.path
try:
sys.path.clear()
from os import *
finally:
sys.path.extend(__path)
# Provide optional dependencies (which may be installed separately).
try:
from . import path
except ImportError:
pass
from collections import namedtuple
# https://docs.python.org/3/library/os.html#os.stat_result
stat_result = namedtuple(
"stat_result",
(
"st_mode",
"st_ino",
"st_dev",
"st_nlink",
"st_uid",
"st_gid",
"st_size",
"st_atime",
"st_mtime",
"st_ctime",
),
)
__os_stat = stat
def stat(path):
return stat_result(*__os_stat(path))

Wyświetl plik

@ -0,0 +1,3 @@
srctype = cpython
type = module
version = 0.0.1

Plik diff jest za duży Load Diff

Wyświetl plik

@ -0,0 +1,24 @@
import sys
# Remove current dir from sys.path, otherwise setuptools will peek up our
# module instead of system's.
sys.path.pop(0)
from setuptools import setup
sys.path.append("..")
import sdist_upip
setup(
name="micropython-pdb",
version="0.0.1",
description="CPython pdb module ported to MicroPython",
long_description="This is a module ported from CPython standard library to be compatible with\nMicroPython interpreter. Usually, this means applying small patches for\nfeatures not supported (yet, or at all) in MicroPython. Sometimes, heavier\nchanges are required. Note that CPython modules are written with availability\nof vast resources in mind, and may not work for MicroPython ports with\nlimited heap. If you are affected by such a case, please help reimplement\nthe module from scratch.",
url="https://github.com/micropython/micropython-lib",
author="CPython Developers",
author_email="python-dev@python.org",
maintainer="micropython-lib Developers",
maintainer_email="micro-python@googlegroups.com",
license="Python",
cmdclass={"sdist": sdist_upip.sdist},
py_modules=["pdb"],
)

Wyświetl plik

@ -0,0 +1,5 @@
srctype = micropython-lib
type = module
version = 1.0
author = Andrew Leech
long_desc = Minimal tokenize stub supporting open() in utf8 encoding.

Wyświetl plik

@ -0,0 +1,8 @@
from builtins import open as _builtin_open
def open(filename):
"""Open a file in read only text mode using utf8.
"""
return _builtin_open(filename, "r", encoding="utf8")