kopia lustrzana https://github.com/micropython/micropython-lib
Porównaj commity
3 Commity
fb56269b62
...
8d48f8c6b6
Autor | SHA1 | Data |
---|---|---|
stinos | 8d48f8c6b6 | |
Angus Gratton | 583bc0da70 | |
stijn | 5e939e6876 |
|
@ -0,0 +1,136 @@
|
|||
# USB Drivers
|
||||
|
||||
These packages allow implementing USB functionality on a MicroPython system
|
||||
using pure Python code.
|
||||
|
||||
Currently only USB device is implemented, not USB host.
|
||||
|
||||
## USB Device support
|
||||
|
||||
### Support
|
||||
|
||||
USB Device support depends on the low-level
|
||||
[machine.USBDevice](https://docs.micropython.org/en/latest/library/machine.USBDevice.html)
|
||||
class. This class is new and not supported on all ports, so please check the
|
||||
documentation for your MicroPython version. It is possible to implement a USB
|
||||
device using only the low-level USBDevice class. However, the packages here are
|
||||
higher level and easier to use.
|
||||
|
||||
For more information about how to install packages, or "freeze" them into a
|
||||
firmware image, consult the [MicroPython documentation on "Package
|
||||
management"](https://docs.micropython.org/en/latest/reference/packages.html).
|
||||
|
||||
### Examples
|
||||
|
||||
The [examples/device](examples/device) directory in this repo has a range of
|
||||
examples. After installing necessary packages, you can download an example and
|
||||
run it with `mpremote run EXAMPLE_FILENAME.py` ([mpremote
|
||||
docs](https://docs.micropython.org/en/latest/reference/mpremote.html#mpremote-command-run)).
|
||||
|
||||
#### Unexpected serial disconnects
|
||||
|
||||
If you normally connect to your MicroPython device over a USB serial port ("USB
|
||||
CDC"), then running a USB example will disconnect mpremote when the new USB
|
||||
device configuration activates and the serial port has to temporarily
|
||||
disconnect. It is likely that mpremote will print an error. The example should
|
||||
still start running, if necessary then you can reconnect with mpremote and type
|
||||
Ctrl-B to restore the MicroPython REPL and/or Ctrl-C to stop the running
|
||||
example.
|
||||
|
||||
If you use `mpremote run` again while a different USB device configuration is
|
||||
already active, then the USB serial port may disconnect immediately before the
|
||||
example runs. This is because mpremote has to soft-reset MicroPython, and when
|
||||
the existing USB device is reset then the entire USB port needs to reset. If
|
||||
this happens, run the same `mpremote run` command again.
|
||||
|
||||
We plan to add features to `mpremote` so that this limitation is less
|
||||
disruptive. Other tools that communicate with MicroPython over the serial port
|
||||
will encounter similar issues when runtime USB is in use.
|
||||
|
||||
### Initialising runtime USB
|
||||
|
||||
The overall pattern for enabling USB devices at runtime is:
|
||||
|
||||
1. Instantiate the Interface objects for your desired USB device.
|
||||
2. Call `usb.device.get()` to get the singleton object for the high-level USB device.
|
||||
3. Call `init(...)` to pass the desired interfaces as arguments, plus any custom
|
||||
keyword arguments to configure the overall device.
|
||||
|
||||
An example, similar to [mouse_example.py](examples/device/mouse_example.py):
|
||||
|
||||
```py
|
||||
m = usb.device.mouse.MouseInterface()
|
||||
usb.device.get().init(m, builtin_driver=True)
|
||||
```
|
||||
|
||||
Setting `builtin_driver=True` means that any built-in USB serial port will still
|
||||
be available. Otherwise, you may permanently lose access to MicroPython until
|
||||
the next time the device resets.
|
||||
|
||||
See [Unexpected serial disconnects](#Unexpected-serial-disconnects), above, for
|
||||
an explanation of possible errors or disconnects when the runtime USB device
|
||||
initialises.
|
||||
|
||||
Placing the call to `usb.device.get().init()` into the `boot.py` of the
|
||||
MicroPython file system allows the runtime USB device to initialise immediately
|
||||
on boot, before any built-in USB. This is a feature (not a bug) and allows you
|
||||
full control over the USB device, for example to only enable USB HID and prevent
|
||||
REPL access to the system.
|
||||
|
||||
However, note that calling this function on boot without `builtin_driver=True`
|
||||
will make the MicroPython USB serial interface permanently inaccessible until
|
||||
you "safe mode boot" (on supported boards) or completely erase the flash of your
|
||||
device.
|
||||
|
||||
### Package usb-device
|
||||
|
||||
This base package contains the common implementation components for the other
|
||||
packages, and can be used to implement new and different USB interface support.
|
||||
All of the other `usb-device-<name>` packages depend on this package, and it
|
||||
will be automatically installed as needed.
|
||||
|
||||
Specicially, this package provides the `usb.device.get()` function for accessing
|
||||
the Device singleton object, and the `usb.device.core` module which contains the
|
||||
low-level classes and utility functions for implementing new USB interface
|
||||
drivers in Python. The best examples of how to use the core classes is the
|
||||
source code of the other USB device packages.
|
||||
|
||||
### Package usb-device-keyboard
|
||||
|
||||
This package provides the `usb.device.keyboard` module. See
|
||||
[keyboard_example.py](examples/device/keyboard_example.py) for an example
|
||||
program.
|
||||
|
||||
### Package usb-device-mouse
|
||||
|
||||
This package provides the `usb.device.mouse` module. See
|
||||
[mouse_example.py](examples/device/mouse_example.py) for an example program.
|
||||
|
||||
### Package usb-device-hid
|
||||
|
||||
This package provides the `usb.device.hid` module. USB HID (Human Interface
|
||||
Device) class allows creating a wide variety of device types. The most common
|
||||
are mouse and keyboard, which have their own packages in micropython-lib.
|
||||
However, using the usb-device-hid package directly allows creation of any kind
|
||||
of HID device.
|
||||
|
||||
See [hid_custom_keypad_example.py](examples/device/hid_custom_keypad_example.py)
|
||||
for an example of a Keypad HID device with a custom HID descriptor.
|
||||
|
||||
### Package usb-device-cdc
|
||||
|
||||
This package provides the `usb.device.cdc` module. USB CDC (Communications
|
||||
Device Class) is most commonly used for virtual serial port USB interfaces, and
|
||||
that is what is supported here.
|
||||
|
||||
The example [cdc_repl_example.py](examples/device/cdc_repl_example.py)
|
||||
demonstrates how to add a second USB serial interface and duplicate the
|
||||
MicroPython REPL between the two.
|
||||
|
||||
### Package usb-device-midi
|
||||
|
||||
This package provides the `usb.device.midi` module. This allows implementing
|
||||
USB MIDI devices in MicroPython.
|
||||
|
||||
The example [midi_example.py](examples/device/midi_example.py) demonstrates how
|
||||
to create a simple MIDI device to send MIDI data to and from the USB host.
|
|
@ -0,0 +1,47 @@
|
|||
# MicroPython USB CDC REPL example
|
||||
#
|
||||
# Example demonstrating how to use os.dupterm() to provide the
|
||||
# MicroPython REPL on a dynamic CDCInterface() serial port.
|
||||
#
|
||||
# To run this example:
|
||||
#
|
||||
# 1. Make sure `usb-device-cdc` is installed via: mpremote mip install usb-device-cdc
|
||||
#
|
||||
# 2. Run the example via: mpremote run cdc_repl_example.py
|
||||
#
|
||||
# 3. mpremote will exit with an error after the previous step, because when the
|
||||
# example runs the existing USB device disconnects and then re-enumerates with
|
||||
# the second serial port. If you check (for example by running mpremote connect
|
||||
# list) then you should now see two USB serial devices.
|
||||
#
|
||||
# 4. Connect to one of the new ports: mpremote connect PORTNAME
|
||||
#
|
||||
# It may be necessary to type Ctrl-B to exit the raw REPL mode and resume the
|
||||
# interactive REPL after mpremote connects.
|
||||
#
|
||||
# MIT license; Copyright (c) 2023-2024 Angus Gratton
|
||||
import os
|
||||
import time
|
||||
import usb.device
|
||||
from usb.device.cdc import CDCInterface
|
||||
|
||||
cdc = CDCInterface()
|
||||
cdc.init(timeout=0) # zero timeout makes this non-blocking, suitable for os.dupterm()
|
||||
|
||||
# pass builtin_driver=True so that we get the built-in USB-CDC alongside,
|
||||
# if it's available.
|
||||
usb.device.get().init(cdc, builtin_driver=True)
|
||||
|
||||
print("Waiting for USB host to configure the interface...")
|
||||
|
||||
# wait for host enumerate as a CDC device...
|
||||
while not cdc.is_open():
|
||||
time.sleep_ms(100)
|
||||
|
||||
# Note: This example doesn't wait for the host to access the new CDC port,
|
||||
# which could be done by polling cdc.dtr, as this will block the REPL
|
||||
# from resuming while this code is still executing.
|
||||
|
||||
print("CDC port enumerated, duplicating REPL...")
|
||||
|
||||
old_term = os.dupterm(cdc)
|
|
@ -0,0 +1,144 @@
|
|||
# MicroPython USB HID custom Keypad example
|
||||
#
|
||||
# This example demonstrates creating a custom HID device with its own
|
||||
# HID descriptor, in this case for a USB number keypad.
|
||||
#
|
||||
# For higher level examples that require less code to use, see mouse_example.py
|
||||
# and keyboard_example.py
|
||||
#
|
||||
# To run this example:
|
||||
#
|
||||
# 1. Make sure `usb-device-hid` is installed via: mpremote mip install usb-device-hid
|
||||
#
|
||||
# 2. Run the example via: mpremote run hid_custom_keypad_example.py
|
||||
#
|
||||
# 3. mpremote will exit with an error after the previous step, because when the
|
||||
# example runs the existing USB device disconnects and then re-enumerates with
|
||||
# the custom HID interface present. At this point, the example is running.
|
||||
#
|
||||
# 4. To see output from the example, re-connect: mpremote connect PORTNAME
|
||||
#
|
||||
# MIT license; Copyright (c) 2023 Dave Wickham, 2023-2024 Angus Gratton
|
||||
from micropython import const
|
||||
import time
|
||||
import usb.device
|
||||
from usb.device.hid import HIDInterface
|
||||
|
||||
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
|
||||
|
||||
|
||||
def keypad_example():
|
||||
k = KeypadInterface()
|
||||
|
||||
usb.device.get().init(k, builtin_driver=True)
|
||||
|
||||
while not k.is_open():
|
||||
time.sleep_ms(100)
|
||||
|
||||
while True:
|
||||
time.sleep(2)
|
||||
print("Press NumLock...")
|
||||
k.send_key("<NumLock>")
|
||||
time.sleep_ms(100)
|
||||
k.send_key()
|
||||
time.sleep(1)
|
||||
# continue
|
||||
print("Press ...")
|
||||
for _ in range(3):
|
||||
time.sleep(0.1)
|
||||
k.send_key(".")
|
||||
time.sleep(0.1)
|
||||
k.send_key()
|
||||
print("Starting again...")
|
||||
|
||||
|
||||
class KeypadInterface(HIDInterface):
|
||||
# Very basic synchronous USB keypad HID interface
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
_KEYPAD_REPORT_DESC,
|
||||
set_report_buf=bytearray(1),
|
||||
protocol=_INTERFACE_PROTOCOL_KEYBOARD,
|
||||
interface_str="MicroPython Keypad",
|
||||
)
|
||||
self.numlock = False
|
||||
|
||||
def on_set_report(self, report_data, _report_id, _report_type):
|
||||
report = report_data[0]
|
||||
b = bool(report & 1)
|
||||
if b != self.numlock:
|
||||
print("Numlock: ", b)
|
||||
self.numlock = b
|
||||
|
||||
def send_key(self, key=None):
|
||||
if key is None:
|
||||
self.send_report(b"\x00")
|
||||
else:
|
||||
self.send_report(_key_to_id(key).to_bytes(1, "big"))
|
||||
|
||||
|
||||
# See HID Usages and Descriptions 1.4, section 10 Keyboard/Keypad Page (0x07)
|
||||
#
|
||||
# This keypad example has a contiguous series of keys (KEYPAD_KEY_IDS) starting
|
||||
# from the NumLock/Clear keypad key (0x53), but you can send any Key IDs from
|
||||
# the table in the HID Usages specification.
|
||||
_KEYPAD_KEY_OFFS = const(0x53)
|
||||
|
||||
_KEYPAD_KEY_IDS = [
|
||||
"<NumLock>",
|
||||
"/",
|
||||
"*",
|
||||
"-",
|
||||
"+",
|
||||
"<Enter>",
|
||||
"1",
|
||||
"2",
|
||||
"3",
|
||||
"4",
|
||||
"5",
|
||||
"6",
|
||||
"7",
|
||||
"8",
|
||||
"9",
|
||||
"0",
|
||||
".",
|
||||
]
|
||||
|
||||
|
||||
def _key_to_id(key):
|
||||
# This is a little slower than making a dict for lookup, but uses
|
||||
# less memory and O(n) can be fast enough when n is small.
|
||||
return _KEYPAD_KEY_IDS.index(key) + _KEYPAD_KEY_OFFS
|
||||
|
||||
|
||||
# HID Report descriptor for a numeric keypad
|
||||
#
|
||||
# fmt: off
|
||||
_KEYPAD_REPORT_DESC = (
|
||||
b'\x05\x01' # Usage Page (Generic Desktop)
|
||||
b'\x09\x07' # Usage (Keypad)
|
||||
b'\xA1\x01' # Collection (Application)
|
||||
b'\x05\x07' # Usage Page (Keypad)
|
||||
b'\x19\x00' # Usage Minimum (0)
|
||||
b'\x29\xFF' # Usage Maximum (ff)
|
||||
b'\x15\x00' # Logical Minimum (0)
|
||||
b'\x25\xFF' # Logical Maximum (ff)
|
||||
b'\x95\x01' # Report Count (1),
|
||||
b'\x75\x08' # Report Size (8),
|
||||
b'\x81\x00' # Input (Data, Array, Absolute)
|
||||
b'\x05\x08' # Usage page (LEDs)
|
||||
b'\x19\x01' # Usage Minimum (1)
|
||||
b'\x29\x01' # Usage Maximum (1),
|
||||
b'\x95\x01' # Report Count (1),
|
||||
b'\x75\x01' # Report Size (1),
|
||||
b'\x91\x02' # Output (Data, Variable, Absolute)
|
||||
b'\x95\x01' # Report Count (1),
|
||||
b'\x75\x07' # Report Size (7),
|
||||
b'\x91\x01' # Output (Constant) - padding bits
|
||||
b'\xC0' # End Collection
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
keypad_example()
|
|
@ -0,0 +1,97 @@
|
|||
# MicroPython USB Keyboard example
|
||||
#
|
||||
# To run this example:
|
||||
#
|
||||
# 1. Check the KEYS assignment below, and connect buttons or switches to the
|
||||
# assigned GPIOs. You can change the entries as needed, look up the reference
|
||||
# for your board to see what pins are available. Note that the example uses
|
||||
# "active low" logic, so pressing a switch or button should switch the
|
||||
# connected pin to Ground (0V).
|
||||
#
|
||||
# 2. Make sure `usb-device-keyboard` is installed via: mpremote mip install usb-device-keyboard
|
||||
#
|
||||
# 3. Run the example via: mpremote run keyboard_example.py
|
||||
#
|
||||
# 4. mpremote will exit with an error after the previous step, because when the
|
||||
# example runs the existing USB device disconnects and then re-enumerates with
|
||||
# the keyboard interface present. At this point, the example is running.
|
||||
#
|
||||
# 5. The example doesn't print anything to the serial port, but to stop it first
|
||||
# re-connect: mpremote connect PORTNAME
|
||||
#
|
||||
# 6. Type Ctrl-C to interrupt the running example and stop it. You may have to
|
||||
# also type Ctrl-B to restore the interactive REPL.
|
||||
#
|
||||
# To implement a keyboard with different USB HID characteristics, copy the
|
||||
# usb-device-keyboard/usb/device/keyboard.py file into your own project and modify
|
||||
# KeyboardInterface.
|
||||
#
|
||||
# MIT license; Copyright (c) 2024 Angus Gratton
|
||||
import usb.device
|
||||
from usb.device.keyboard import KeyboardInterface, KeyCode, LEDCode
|
||||
from machine import Pin
|
||||
import time
|
||||
|
||||
# Tuples mapping Pin inputs to the KeyCode each input generates
|
||||
#
|
||||
# (Big keyboards usually multiplex multiple keys per input with a scan matrix,
|
||||
# but this is a simple example.)
|
||||
KEYS = (
|
||||
(Pin.cpu.GPIO10, KeyCode.CAPS_LOCK),
|
||||
(Pin.cpu.GPIO11, KeyCode.LEFT_SHIFT),
|
||||
(Pin.cpu.GPIO12, KeyCode.M),
|
||||
(Pin.cpu.GPIO13, KeyCode.P),
|
||||
# ... add more pin to KeyCode mappings here if needed
|
||||
)
|
||||
|
||||
# Tuples mapping Pin outputs to the LEDCode that turns the output on
|
||||
LEDS = (
|
||||
(Pin.board.LED, LEDCode.CAPS_LOCK),
|
||||
# ... add more pin to LEDCode mappings here if needed
|
||||
)
|
||||
|
||||
|
||||
class ExampleKeyboard(KeyboardInterface):
|
||||
def on_led_update(self, led_mask):
|
||||
# print(hex(led_mask))
|
||||
for pin, code in LEDS:
|
||||
# Set the pin high if 'code' bit is set in led_mask
|
||||
pin(code & led_mask)
|
||||
|
||||
|
||||
def keyboard_example():
|
||||
# Initialise all the pins as active-low inputs with pullup resistors
|
||||
for pin, _ in KEYS:
|
||||
pin.init(Pin.IN, Pin.PULL_UP)
|
||||
|
||||
# Initialise all the LEDs as active-high outputs
|
||||
for pin, _ in LEDS:
|
||||
pin.init(Pin.OUT, value=0)
|
||||
|
||||
# Register the keyboard interface and re-enumerate
|
||||
k = ExampleKeyboard()
|
||||
usb.device.get().init(k, builtin_driver=True)
|
||||
|
||||
print("Entering keyboard loop...")
|
||||
|
||||
keys = [] # Keys held down, reuse the same list object
|
||||
prev_keys = [None] # Previous keys, starts with a dummy value so first
|
||||
# iteration will always send
|
||||
while True:
|
||||
if k.is_open():
|
||||
keys.clear()
|
||||
for pin, code in KEYS:
|
||||
if not pin(): # active-low
|
||||
keys.append(code)
|
||||
if keys != prev_keys:
|
||||
# print(keys)
|
||||
k.send_keys(keys)
|
||||
prev_keys.clear()
|
||||
prev_keys.extend(keys)
|
||||
|
||||
# This simple example scans each input in an infinite loop, but a more
|
||||
# complex implementation would probably use a timer or similar.
|
||||
time.sleep_ms(1)
|
||||
|
||||
|
||||
keyboard_example()
|
|
@ -0,0 +1,78 @@
|
|||
# MicroPython USB MIDI example
|
||||
#
|
||||
# This example demonstrates creating a custom MIDI device.
|
||||
#
|
||||
# To run this example:
|
||||
#
|
||||
# 1. Make sure `usb-device-midi` is installed via: mpremote mip install usb-device-midi
|
||||
#
|
||||
# 2. Run the example via: mpremote run midi_example.py
|
||||
#
|
||||
# 3. mpremote will exit with an error after the previous step, because when the
|
||||
# example runs the existing USB device disconnects and then re-enumerates with
|
||||
# the MIDI interface present. At this point, the example is running.
|
||||
#
|
||||
# 4. To see output from the example, re-connect: mpremote connect PORTNAME
|
||||
#
|
||||
#
|
||||
# MIT license; Copyright (c) 2023-2024 Angus Gratton
|
||||
import usb.device
|
||||
from usb.device.midi import MIDIInterface
|
||||
import time
|
||||
|
||||
|
||||
class MIDIExample(MIDIInterface):
|
||||
# Very simple example event handler functions, showing how to receive note
|
||||
# and control change messages sent from the host to the device.
|
||||
#
|
||||
# If you need to send MIDI data to the host, then it's fine to instantiate
|
||||
# MIDIInterface class directly.
|
||||
|
||||
def on_open(self):
|
||||
super().on_open()
|
||||
print("Device opened by host")
|
||||
|
||||
def on_note_on(self, channel, pitch, vel):
|
||||
print(f"RX Note On channel {channel} pitch {pitch} velocity {vel}")
|
||||
|
||||
def on_note_off(self, channel, pitch, vel):
|
||||
print(f"RX Note Off channel {channel} pitch {pitch} velocity {vel}")
|
||||
|
||||
def on_control_change(self, channel, controller, value):
|
||||
print(f"RX Control channel {channel} controller {controller} value {value}")
|
||||
|
||||
|
||||
m = MIDIExample()
|
||||
# Remove builtin_driver=True if you don't want the MicroPython serial REPL available.
|
||||
usb.device.get().init(m, builtin_driver=True)
|
||||
|
||||
print("Waiting for USB host to configure the interface...")
|
||||
|
||||
while not m.is_open():
|
||||
time.sleep_ms(100)
|
||||
|
||||
print("Starting MIDI loop...")
|
||||
|
||||
# TX constants
|
||||
CHANNEL = 0
|
||||
PITCH = 60
|
||||
CONTROLLER = 64
|
||||
|
||||
control_val = 0
|
||||
|
||||
while m.is_open():
|
||||
time.sleep(1)
|
||||
print(f"TX Note On channel {CHANNEL} pitch {PITCH}")
|
||||
m.note_on(CHANNEL, PITCH) # Velocity is an optional third argument
|
||||
time.sleep(0.5)
|
||||
print(f"TX Note Off channel {CHANNEL} pitch {PITCH}")
|
||||
m.note_off(CHANNEL, PITCH)
|
||||
time.sleep(1)
|
||||
print(f"TX Control channel {CHANNEL} controller {CONTROLLER} value {control_val}")
|
||||
m.control_change(CHANNEL, CONTROLLER, control_val)
|
||||
control_val += 1
|
||||
if control_val == 0x7F:
|
||||
control_val = 0
|
||||
time.sleep(1)
|
||||
|
||||
print("USB host has reset device, example done.")
|
|
@ -0,0 +1,52 @@
|
|||
# MicroPython USB Mouse example
|
||||
#
|
||||
# To run this example:
|
||||
#
|
||||
# 1. Make sure `usb-device-mouse` is installed via: mpremote mip install usb-device-mouse
|
||||
#
|
||||
# 2. Run the example via: mpremote run mouse_example.py
|
||||
#
|
||||
# 3. mpremote will exit with an error after the previous step, because when the
|
||||
# example runs the existing USB device disconnects and then re-enumerates with
|
||||
# the mouse interface present. At this point, the example is running.
|
||||
#
|
||||
# 4. You should see the mouse move and right click. At this point, the example
|
||||
# is finished executing.
|
||||
#
|
||||
# To implement a more complex mouse with more buttons or other custom interface
|
||||
# features, copy the usb-device-mouse/usb/device/mouse.py file into your own
|
||||
# project and modify MouseInterface.
|
||||
#
|
||||
# MIT license; Copyright (c) 2023-2024 Angus Gratton
|
||||
import time
|
||||
import usb.device
|
||||
from usb.device.mouse import MouseInterface
|
||||
|
||||
|
||||
def mouse_example():
|
||||
m = MouseInterface()
|
||||
|
||||
# Note: builtin_driver=True means that if there's a USB-CDC REPL
|
||||
# available then it will appear as well as the HID device.
|
||||
usb.device.get().init(m, builtin_driver=True)
|
||||
|
||||
# wait for host to enumerate as a HID device...
|
||||
while not m.is_open():
|
||||
time.sleep_ms(100)
|
||||
|
||||
time.sleep_ms(2000)
|
||||
|
||||
print("Moving...")
|
||||
m.move_by(-100, 0)
|
||||
m.move_by(-100, 0)
|
||||
time.sleep_ms(500)
|
||||
|
||||
print("Clicking...")
|
||||
m.click_right(True)
|
||||
time.sleep_ms(200)
|
||||
m.click_right(False)
|
||||
|
||||
print("Done!")
|
||||
|
||||
|
||||
mouse_example()
|
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.1.0")
|
||||
require("usb-device")
|
||||
package("usb")
|
|
@ -0,0 +1,437 @@
|
|||
# MicroPython USB CDC module
|
||||
# MIT license; Copyright (c) 2022 Martin Fischer, 2023-2024 Angus Gratton
|
||||
import io
|
||||
import time
|
||||
import errno
|
||||
import machine
|
||||
import struct
|
||||
from micropython import const
|
||||
|
||||
from .core import Interface, Buffer, split_bmRequestType
|
||||
|
||||
_EP_IN_FLAG = const(1 << 7)
|
||||
|
||||
# Control transfer stages
|
||||
_STAGE_IDLE = const(0)
|
||||
_STAGE_SETUP = const(1)
|
||||
_STAGE_DATA = const(2)
|
||||
_STAGE_ACK = const(3)
|
||||
|
||||
# Request types
|
||||
_REQ_TYPE_STANDARD = const(0x0)
|
||||
_REQ_TYPE_CLASS = const(0x1)
|
||||
_REQ_TYPE_VENDOR = const(0x2)
|
||||
_REQ_TYPE_RESERVED = const(0x3)
|
||||
|
||||
_DEV_CLASS_MISC = const(0xEF)
|
||||
_CS_DESC_TYPE = const(0x24) # CS Interface type communication descriptor
|
||||
|
||||
# CDC control interface definitions
|
||||
_INTERFACE_CLASS_CDC = const(2)
|
||||
_INTERFACE_SUBCLASS_CDC = const(2) # Abstract Control Mode
|
||||
_PROTOCOL_NONE = const(0) # no protocol
|
||||
|
||||
# CDC descriptor subtype
|
||||
# see also CDC120.pdf, table 13
|
||||
_CDC_FUNC_DESC_HEADER = const(0)
|
||||
_CDC_FUNC_DESC_CALL_MANAGEMENT = const(1)
|
||||
_CDC_FUNC_DESC_ABSTRACT_CONTROL = const(2)
|
||||
_CDC_FUNC_DESC_UNION = const(6)
|
||||
|
||||
# CDC class requests, table 13, PSTN subclass
|
||||
_SET_LINE_CODING_REQ = const(0x20)
|
||||
_GET_LINE_CODING_REQ = const(0x21)
|
||||
_SET_CONTROL_LINE_STATE = const(0x22)
|
||||
_SEND_BREAK_REQ = const(0x23)
|
||||
|
||||
_LINE_CODING_STOP_BIT_1 = const(0)
|
||||
_LINE_CODING_STOP_BIT_1_5 = const(1)
|
||||
_LINE_CODING_STOP_BIT_2 = const(2)
|
||||
|
||||
_LINE_CODING_PARITY_NONE = const(0)
|
||||
_LINE_CODING_PARITY_ODD = const(1)
|
||||
_LINE_CODING_PARITY_EVEN = const(2)
|
||||
_LINE_CODING_PARITY_MARK = const(3)
|
||||
_LINE_CODING_PARITY_SPACE = const(4)
|
||||
|
||||
_LINE_STATE_DTR = const(1)
|
||||
_LINE_STATE_RTS = const(2)
|
||||
|
||||
_PARITY_BITS_REPR = "NOEMS"
|
||||
_STOP_BITS_REPR = ("1", "1.5", "2")
|
||||
|
||||
# Other definitions
|
||||
_CDC_VERSION = const(0x0120) # release number in binary-coded decimal
|
||||
|
||||
# Number of endpoints in each interface
|
||||
_CDC_CONTROL_EP_NUM = const(1)
|
||||
_CDC_DATA_EP_NUM = const(2)
|
||||
|
||||
# CDC data interface definitions
|
||||
_CDC_ITF_DATA_CLASS = const(0xA)
|
||||
_CDC_ITF_DATA_SUBCLASS = const(0)
|
||||
_CDC_ITF_DATA_PROT = const(0) # no protocol
|
||||
|
||||
# Length of the bulk transfer endpoints. Maybe should be configurable?
|
||||
_BULK_EP_LEN = const(64)
|
||||
|
||||
# MicroPython error constants (negated as IOBase.ioctl uses negative return values for error codes)
|
||||
# these must match values in py/mperrno.h
|
||||
_MP_EINVAL = const(-22)
|
||||
_MP_ETIMEDOUT = const(-110)
|
||||
|
||||
# MicroPython stream ioctl requests, same as py/stream.h
|
||||
_MP_STREAM_FLUSH = const(1)
|
||||
_MP_STREAM_POLL = const(3)
|
||||
|
||||
# MicroPython ioctl poll values, same as py/stream.h
|
||||
_MP_STREAM_POLL_WR = const(0x04)
|
||||
_MP_STREAM_POLL_RD = const(0x01)
|
||||
_MP_STREAM_POLL_HUP = const(0x10)
|
||||
|
||||
|
||||
class CDCInterface(io.IOBase, Interface):
|
||||
# USB CDC serial device class, designed to resemble machine.UART
|
||||
# with some additional methods.
|
||||
#
|
||||
# Relies on multiple inheritance so it can be an io.IOBase for stream
|
||||
# functions and also a Interface (actually an Interface Association
|
||||
# Descriptor holding two interfaces.)
|
||||
def __init__(self, **kwargs):
|
||||
# io.IOBase has no __init__()
|
||||
Interface.__init__(self)
|
||||
|
||||
# Callbacks for particular control changes initiated by the host
|
||||
self.break_cb = None # Host sent a "break" condition
|
||||
self.line_state_cb = None
|
||||
self.line_coding_cb = None
|
||||
|
||||
self._line_state = 0 # DTR & RTS
|
||||
# Set a default line coding of 115200/8N1
|
||||
self._line_coding = bytearray(b"\x00\xc2\x01\x00\x00\x00\x08")
|
||||
|
||||
self._wb = () # Optional write Buffer (IN endpoint), set by CDC.init()
|
||||
self._rb = () # Optional read Buffer (OUT endpoint), set by CDC.init()
|
||||
self._timeout = 1000 # set from CDC.init() as well
|
||||
|
||||
# one control interface endpoint, two data interface endpoints
|
||||
self.ep_c_in = self.ep_d_in = self.ep_d_out = None
|
||||
|
||||
self._c_itf = None # Number of control interface, data interface is one more
|
||||
|
||||
self.init(**kwargs)
|
||||
|
||||
def init(
|
||||
self, baudrate=9600, bits=8, parity="N", stop=1, timeout=None, txbuf=256, rxbuf=256, flow=0
|
||||
):
|
||||
# Configure the CDC serial port. Note that many of these settings like
|
||||
# baudrate, bits, parity, stop don't change the USB-CDC device behavior
|
||||
# at all, only the "line coding" as communicated from/to the USB host.
|
||||
|
||||
# Store initial line coding parameters in the USB CDC binary format
|
||||
# (there is nothing implemented to further change these from Python
|
||||
# code, the USB host sets them.)
|
||||
struct.pack_into(
|
||||
"<LBBB",
|
||||
self._line_coding,
|
||||
0,
|
||||
baudrate,
|
||||
_STOP_BITS_REPR.index(str(stop)),
|
||||
_PARITY_BITS_REPR.index(parity),
|
||||
bits,
|
||||
)
|
||||
|
||||
if flow != 0:
|
||||
raise NotImplementedError # UART flow control currently not supported
|
||||
|
||||
if not (txbuf and rxbuf):
|
||||
raise ValueError # Buffer sizes are required
|
||||
|
||||
self._timeout = timeout
|
||||
self._wb = Buffer(txbuf)
|
||||
self._rb = Buffer(rxbuf)
|
||||
|
||||
###
|
||||
### Line State & Line Coding State property getters
|
||||
###
|
||||
|
||||
@property
|
||||
def rts(self):
|
||||
return bool(self._line_state & _LINE_STATE_RTS)
|
||||
|
||||
@property
|
||||
def dtr(self):
|
||||
return bool(self._line_state & _LINE_STATE_DTR)
|
||||
|
||||
# Line Coding Representation
|
||||
# Byte 0-3 Byte 4 Byte 5 Byte 6
|
||||
# dwDTERate bCharFormat bParityType bDataBits
|
||||
|
||||
@property
|
||||
def baudrate(self):
|
||||
return struct.unpack("<LBBB", self._line_coding)[0]
|
||||
|
||||
@property
|
||||
def stop_bits(self):
|
||||
return _STOP_BITS_REPR[self._line_coding[4]]
|
||||
|
||||
@property
|
||||
def parity(self):
|
||||
return _PARITY_BITS_REPR[self._line_coding[5]]
|
||||
|
||||
@property
|
||||
def data_bits(self):
|
||||
return self._line_coding[6]
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.baudrate}/{self.data_bits}{self.parity}{self.stop_bits} rts={self.rts} dtr={self.dtr}"
|
||||
|
||||
###
|
||||
### Set callbacks for operations initiated by the host
|
||||
###
|
||||
|
||||
def set_break_cb(self, cb):
|
||||
self.break_cb = cb
|
||||
|
||||
def set_line_state_cb(self, cb):
|
||||
self.line_state_cb = cb
|
||||
|
||||
def set_line_coding_cb(self, cb):
|
||||
self.line_coding_cb = cb
|
||||
|
||||
###
|
||||
### USB Interface Implementation
|
||||
###
|
||||
|
||||
def desc_cfg(self, desc, itf_num, ep_num, strs):
|
||||
# CDC needs a Interface Association Descriptor (IAD) wrapping two interfaces: Control & Data interfaces
|
||||
desc.interface_assoc(itf_num, 2, _INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC)
|
||||
|
||||
# Now add the Control interface descriptor
|
||||
self._c_itf = itf_num
|
||||
desc.interface(itf_num, _CDC_CONTROL_EP_NUM, _INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC)
|
||||
|
||||
# Append the CDC class-specific interface descriptor
|
||||
# see CDC120-track, p20
|
||||
desc.pack(
|
||||
"<BBBH",
|
||||
5, # bFunctionLength
|
||||
_CS_DESC_TYPE, # bDescriptorType
|
||||
_CDC_FUNC_DESC_HEADER, # bDescriptorSubtype
|
||||
_CDC_VERSION, # cdc version
|
||||
)
|
||||
|
||||
# CDC-PSTN table3 "Call Management"
|
||||
# set to No
|
||||
desc.pack(
|
||||
"<BBBBB",
|
||||
5, # bFunctionLength
|
||||
_CS_DESC_TYPE, # bDescriptorType
|
||||
_CDC_FUNC_DESC_CALL_MANAGEMENT, # bDescriptorSubtype
|
||||
0, # bmCapabilities - XXX no call managment so far
|
||||
itf_num + 1, # bDataInterface - interface 1
|
||||
)
|
||||
|
||||
# CDC-PSTN table4 "Abstract Control"
|
||||
# set to support line_coding and send_break
|
||||
desc.pack(
|
||||
"<BBBB",
|
||||
4, # bFunctionLength
|
||||
_CS_DESC_TYPE, # bDescriptorType
|
||||
_CDC_FUNC_DESC_ABSTRACT_CONTROL, # bDescriptorSubtype
|
||||
0x6, # bmCapabilities D1, D2
|
||||
)
|
||||
|
||||
# CDC-PSTN "Union"
|
||||
# set control interface / data interface number
|
||||
desc.pack(
|
||||
"<BBBBB",
|
||||
5, # bFunctionLength
|
||||
_CS_DESC_TYPE, # bDescriptorType
|
||||
_CDC_FUNC_DESC_UNION, # bDescriptorSubtype
|
||||
itf_num, # bControlInterface
|
||||
itf_num + 1, # bSubordinateInterface0 (data class itf number)
|
||||
)
|
||||
|
||||
# Single control IN endpoint (currently unused in this implementation)
|
||||
self.ep_c_in = ep_num | _EP_IN_FLAG
|
||||
desc.endpoint(self.ep_c_in, "interrupt", 8, 16)
|
||||
|
||||
# Now add the data interface
|
||||
desc.interface(
|
||||
itf_num + 1,
|
||||
_CDC_DATA_EP_NUM,
|
||||
_CDC_ITF_DATA_CLASS,
|
||||
_CDC_ITF_DATA_SUBCLASS,
|
||||
_CDC_ITF_DATA_PROT,
|
||||
)
|
||||
|
||||
# Two data endpoints, bulk OUT and IN
|
||||
self.ep_d_out = ep_num + 1
|
||||
self.ep_d_in = (ep_num + 1) | _EP_IN_FLAG
|
||||
desc.endpoint(self.ep_d_out, "bulk", _BULK_EP_LEN, 0)
|
||||
desc.endpoint(self.ep_d_in, "bulk", _BULK_EP_LEN, 0)
|
||||
|
||||
def num_itfs(self):
|
||||
return 2
|
||||
|
||||
def num_eps(self):
|
||||
return 2 # total after masking out _EP_IN_FLAG
|
||||
|
||||
def on_open(self):
|
||||
super().on_open()
|
||||
# kick off any transfers that may have queued while the device was not open
|
||||
self._rd_xfer()
|
||||
self._wr_xfer()
|
||||
|
||||
def on_interface_control_xfer(self, stage, request):
|
||||
# Handle class-specific interface control transfers
|
||||
bmRequestType, bRequest, wValue, wIndex, wLength = struct.unpack("BBHHH", request)
|
||||
recipient, req_type, req_dir = split_bmRequestType(bmRequestType)
|
||||
|
||||
if wIndex != self._c_itf:
|
||||
return False # Only for the control interface (may be redundant check?)
|
||||
|
||||
if req_type != _REQ_TYPE_CLASS:
|
||||
return False # Unsupported request type
|
||||
|
||||
if stage == _STAGE_SETUP:
|
||||
if bRequest in (_SET_LINE_CODING_REQ, _GET_LINE_CODING_REQ):
|
||||
return self._line_coding # Buffer to read or write
|
||||
|
||||
# Continue on other supported requests, stall otherwise
|
||||
return bRequest in (_SET_CONTROL_LINE_STATE, _SEND_BREAK_REQ)
|
||||
|
||||
if stage == _STAGE_ACK:
|
||||
if bRequest == _SET_LINE_CODING_REQ:
|
||||
if self.line_coding_cb:
|
||||
self.line_coding_cb(self._line_coding)
|
||||
elif bRequest == _SET_CONTROL_LINE_STATE:
|
||||
self._line_state = wValue
|
||||
if self.line_state_cb:
|
||||
self.line_state_cb(wValue)
|
||||
elif bRequest == _SEND_BREAK_REQ:
|
||||
if self.break_cb:
|
||||
self.break_cb(wValue)
|
||||
|
||||
return True # allow DATA/ACK stages to complete normally
|
||||
|
||||
def _wr_xfer(self):
|
||||
# Submit a new data IN transfer from the _wb buffer, if needed
|
||||
if self.is_open() and not self.xfer_pending(self.ep_d_in) and self._wb.readable():
|
||||
self.submit_xfer(self.ep_d_in, self._wb.pend_read(), self._wr_cb)
|
||||
|
||||
def _wr_cb(self, ep, res, num_bytes):
|
||||
# Whenever a data IN transfer ends
|
||||
if res == 0:
|
||||
self._wb.finish_read(num_bytes)
|
||||
self._wr_xfer()
|
||||
|
||||
def _rd_xfer(self):
|
||||
# Keep an active data OUT transfer to read data from the host,
|
||||
# whenever the receive buffer has room for new data
|
||||
if self.is_open() and not self.xfer_pending(self.ep_d_out) and self._rb.writable():
|
||||
# Can only submit up to the endpoint length per transaction, otherwise we won't
|
||||
# get any transfer callback until the full transaction completes.
|
||||
self.submit_xfer(self.ep_d_out, self._rb.pend_write(_BULK_EP_LEN), self._rd_cb)
|
||||
|
||||
def _rd_cb(self, ep, res, num_bytes):
|
||||
# Whenever a data OUT transfer ends
|
||||
if res == 0:
|
||||
self._rb.finish_write(num_bytes)
|
||||
self._rd_xfer()
|
||||
|
||||
###
|
||||
### io.IOBase stream implementation
|
||||
###
|
||||
|
||||
def write(self, buf):
|
||||
# use a memoryview to track how much of 'buf' we've written so far
|
||||
# (unfortunately, this means a 1 block allocation for each write, but it's otherwise allocation free.)
|
||||
start = time.ticks_ms()
|
||||
mv = memoryview(buf)
|
||||
while True:
|
||||
# Keep pushing buf into _wb into it's all gone
|
||||
nbytes = self._wb.write(mv)
|
||||
self._wr_xfer() # make sure a transfer is running from _wb
|
||||
|
||||
if nbytes == len(mv):
|
||||
return len(buf) # Success
|
||||
|
||||
mv = mv[nbytes:]
|
||||
|
||||
# check for timeout
|
||||
if time.ticks_diff(time.ticks_ms(), start) >= self._timeout:
|
||||
return len(buf) - len(mv)
|
||||
|
||||
machine.idle()
|
||||
|
||||
def read(self, size):
|
||||
start = time.ticks_ms()
|
||||
|
||||
# Allocate a suitable buffer to read into
|
||||
if size >= 0:
|
||||
b = bytearray(size)
|
||||
else:
|
||||
# for size == -1, return however many bytes are ready
|
||||
b = bytearray(self._rb.readable())
|
||||
|
||||
n = self._readinto(b, start)
|
||||
if not n:
|
||||
return None
|
||||
if n < len(b):
|
||||
return b[:n]
|
||||
return b
|
||||
|
||||
def readinto(self, b):
|
||||
return self._readinto(b, time.ticks_ms())
|
||||
|
||||
def _readinto(self, b, start):
|
||||
if len(b) == 0:
|
||||
return 0
|
||||
|
||||
n = 0
|
||||
m = memoryview(b)
|
||||
while n < len(b):
|
||||
# copy out of the read buffer if there is anything available
|
||||
if self._rb.readable():
|
||||
n += self._rb.readinto(m if n == 0 else m[n:])
|
||||
self._rd_xfer() # if _rd was previously full, no transfer will be running
|
||||
if n == len(b):
|
||||
break # Done, exit before we call machine.idle()
|
||||
|
||||
if time.ticks_diff(time.ticks_ms(), start) >= self._timeout:
|
||||
break # Timed out
|
||||
|
||||
machine.idle()
|
||||
|
||||
return n or None
|
||||
|
||||
def ioctl(self, req, arg):
|
||||
if req == _MP_STREAM_POLL:
|
||||
return (
|
||||
(_MP_STREAM_POLL_WR if (arg & _MP_STREAM_POLL_WR) and self._wb.writable() else 0)
|
||||
| (_MP_STREAM_POLL_RD if (arg & _MP_STREAM_POLL_RD) and self._rb.readable() else 0)
|
||||
|
|
||||
# using the USB level "open" (i.e. connected to host) for !HUP, not !DTR (port is open)
|
||||
(_MP_STREAM_POLL_HUP if (arg & _MP_STREAM_POLL_HUP) and not self.is_open() else 0)
|
||||
)
|
||||
elif req == _MP_STREAM_FLUSH:
|
||||
start = time.ticks_ms()
|
||||
# Wait until write buffer contains no bytes for the lower TinyUSB layer to "read"
|
||||
while self._wb.readable():
|
||||
if not self.is_open():
|
||||
return _MP_EINVAL
|
||||
if time.ticks_diff(time.ticks_ms(), start) > self._timeout:
|
||||
return _MP_ETIMEDOUT
|
||||
machine.idle()
|
||||
return 0
|
||||
|
||||
return _MP_EINVAL
|
||||
|
||||
def flush(self):
|
||||
# a C implementation of this exists in stream.c, but it's not in io.IOBase
|
||||
# and can't immediately be called from here (AFAIK)
|
||||
r = self.ioctl(_MP_STREAM_FLUSH, 0)
|
||||
if r:
|
||||
raise OSError(r)
|
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.1.0")
|
||||
require("usb-device")
|
||||
package("usb")
|
|
@ -0,0 +1,232 @@
|
|||
# MicroPython USB hid module
|
||||
#
|
||||
# This implements a base HIDInterface class that can be used directly,
|
||||
# or subclassed into more specific HID interface types.
|
||||
#
|
||||
# MIT license; Copyright (c) 2023 Angus Gratton
|
||||
from micropython import const
|
||||
import machine
|
||||
import struct
|
||||
import time
|
||||
from .core import Interface, Descriptor, split_bmRequestType
|
||||
|
||||
_EP_IN_FLAG = const(1 << 7)
|
||||
|
||||
# Control transfer stages
|
||||
_STAGE_IDLE = const(0)
|
||||
_STAGE_SETUP = const(1)
|
||||
_STAGE_DATA = const(2)
|
||||
_STAGE_ACK = const(3)
|
||||
|
||||
# Request types
|
||||
_REQ_TYPE_STANDARD = const(0x0)
|
||||
_REQ_TYPE_CLASS = const(0x1)
|
||||
_REQ_TYPE_VENDOR = const(0x2)
|
||||
_REQ_TYPE_RESERVED = const(0x3)
|
||||
|
||||
# Descriptor types
|
||||
_DESC_HID_TYPE = const(0x21)
|
||||
_DESC_REPORT_TYPE = const(0x22)
|
||||
_DESC_PHYSICAL_TYPE = const(0x23)
|
||||
|
||||
# Interface and protocol identifiers
|
||||
_INTERFACE_CLASS = const(0x03)
|
||||
_INTERFACE_SUBCLASS_NONE = const(0x00)
|
||||
_INTERFACE_SUBCLASS_BOOT = const(0x01)
|
||||
|
||||
_INTERFACE_PROTOCOL_NONE = const(0x00)
|
||||
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
|
||||
_INTERFACE_PROTOCOL_MOUSE = const(0x02)
|
||||
|
||||
# bRequest values for HID control requests
|
||||
_REQ_CONTROL_GET_REPORT = const(0x01)
|
||||
_REQ_CONTROL_GET_IDLE = const(0x02)
|
||||
_REQ_CONTROL_GET_PROTOCOL = const(0x03)
|
||||
_REQ_CONTROL_GET_DESCRIPTOR = const(0x06)
|
||||
_REQ_CONTROL_SET_REPORT = const(0x09)
|
||||
_REQ_CONTROL_SET_IDLE = const(0x0A)
|
||||
_REQ_CONTROL_SET_PROTOCOL = const(0x0B)
|
||||
|
||||
# Standard descriptor lengths
|
||||
_STD_DESC_INTERFACE_LEN = const(9)
|
||||
_STD_DESC_ENDPOINT_LEN = const(7)
|
||||
|
||||
|
||||
class HIDInterface(Interface):
|
||||
# Abstract base class to implement a USB device HID interface in Python.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
report_descriptor,
|
||||
extra_descriptors=[],
|
||||
set_report_buf=None,
|
||||
protocol=_INTERFACE_PROTOCOL_NONE,
|
||||
interface_str=None,
|
||||
):
|
||||
# Construct a new HID interface.
|
||||
#
|
||||
# - report_descriptor is the only mandatory argument, which is the binary
|
||||
# data consisting of the HID Report Descriptor. See Device Class
|
||||
# Definition for Human Interface Devices (HID) v1.11 section 6.2.2 Report
|
||||
# Descriptor, p23.
|
||||
#
|
||||
# - extra_descriptors is an optional argument holding additional HID
|
||||
# descriptors, to append after the mandatory report descriptor. Most
|
||||
# HID devices do not use these.
|
||||
#
|
||||
# - set_report_buf is an optional writable buffer object (i.e.
|
||||
# bytearray), where SET_REPORT requests from the host can be
|
||||
# written. Only necessary if the report_descriptor contains Output
|
||||
# entries. If set, the size must be at least the size of the largest
|
||||
# Output entry.
|
||||
#
|
||||
# - protocol can be set to a specific value as per HID v1.11 section 4.3 Protocols, p9.
|
||||
#
|
||||
# - interface_str is an optional string descriptor to associate with the HID USB interface.
|
||||
super().__init__()
|
||||
self.report_descriptor = report_descriptor
|
||||
self.extra_descriptors = extra_descriptors
|
||||
self._set_report_buf = set_report_buf
|
||||
self.protocol = protocol
|
||||
self.interface_str = interface_str
|
||||
|
||||
self._int_ep = None # set during enumeration
|
||||
|
||||
def get_report(self):
|
||||
return False
|
||||
|
||||
def on_set_report(self, report_data, report_id, report_type):
|
||||
# Override this function in order to handle SET REPORT requests from the host,
|
||||
# where it sends data to the HID device.
|
||||
#
|
||||
# This function will only be called if the Report descriptor contains at least one Output entry,
|
||||
# and the set_report_buf argument is provided to the constructor.
|
||||
#
|
||||
# Return True to complete the control transfer normally, False to abort it.
|
||||
return True
|
||||
|
||||
def busy(self):
|
||||
# Returns True if the interrupt endpoint is busy (i.e. existing transfer is pending)
|
||||
return self.is_open() and self.xfer_pending(self._int_ep)
|
||||
|
||||
def send_report(self, report_data, timeout_ms=100):
|
||||
# Helper function to send a HID report in the typical USB interrupt
|
||||
# endpoint associated with a HID interface.
|
||||
#
|
||||
# Returns True if successful, False if HID device is not active or timeout
|
||||
# is reached without being able to queue the report for sending.
|
||||
deadline = time.ticks_add(time.ticks_ms(), timeout_ms)
|
||||
while self.busy():
|
||||
if time.ticks_diff(deadline, time.ticks_ms()) <= 0:
|
||||
return False
|
||||
machine.idle()
|
||||
if not self.is_open():
|
||||
return False
|
||||
self.submit_xfer(self._int_ep, report_data)
|
||||
|
||||
def desc_cfg(self, desc, itf_num, ep_num, strs):
|
||||
# Add the standard interface descriptor
|
||||
desc.interface(
|
||||
itf_num,
|
||||
1,
|
||||
_INTERFACE_CLASS,
|
||||
_INTERFACE_SUBCLASS_NONE,
|
||||
self.protocol,
|
||||
len(strs) if self.interface_str else 0,
|
||||
)
|
||||
|
||||
if self.interface_str:
|
||||
strs.append(self.interface_str)
|
||||
|
||||
# As per HID v1.11 section 7.1 Standard Requests, return the contents of
|
||||
# the standard HID descriptor before the associated endpoint descriptor.
|
||||
self.get_hid_descriptor(desc)
|
||||
|
||||
# Add the typical single USB interrupt endpoint descriptor associated
|
||||
# with a HID interface.
|
||||
self._int_ep = ep_num | _EP_IN_FLAG
|
||||
desc.endpoint(self._int_ep, "interrupt", 8, 8)
|
||||
|
||||
self.idle_rate = 0
|
||||
self.protocol = 0
|
||||
|
||||
def num_eps(self):
|
||||
return 1
|
||||
|
||||
def get_hid_descriptor(self, desc=None):
|
||||
# Append a full USB HID descriptor from the object's report descriptor
|
||||
# and optional additional descriptors.
|
||||
#
|
||||
# See HID Specification Version 1.1, Section 6.2.1 HID Descriptor p22
|
||||
|
||||
l = 9 + 3 * len(self.extra_descriptors) # total length
|
||||
|
||||
if desc is None:
|
||||
desc = Descriptor(bytearray(l))
|
||||
|
||||
desc.pack(
|
||||
"<BBHBBBH",
|
||||
l, # bLength
|
||||
_DESC_HID_TYPE, # bDescriptorType
|
||||
0x111, # bcdHID
|
||||
0, # bCountryCode
|
||||
len(self.extra_descriptors) + 1, # bNumDescriptors
|
||||
0x22, # bDescriptorType, Report
|
||||
len(self.report_descriptor), # wDescriptorLength, Report
|
||||
)
|
||||
# Fill in any additional descriptor type/length pairs
|
||||
#
|
||||
# TODO: unclear if this functionality is ever used, may be easier to not
|
||||
# support in base class
|
||||
for dt, dd in self.extra_descriptors:
|
||||
desc.pack("<BH", dt, len(dd))
|
||||
|
||||
return desc.b
|
||||
|
||||
def on_interface_control_xfer(self, stage, request):
|
||||
# Handle standard and class-specific interface control transfers for HID devices.
|
||||
bmRequestType, bRequest, wValue, _, wLength = struct.unpack("BBHHH", request)
|
||||
|
||||
recipient, req_type, _ = split_bmRequestType(bmRequestType)
|
||||
|
||||
if stage == _STAGE_SETUP:
|
||||
if req_type == _REQ_TYPE_STANDARD:
|
||||
# HID Spec p48: 7.1 Standard Requests
|
||||
if bRequest == _REQ_CONTROL_GET_DESCRIPTOR:
|
||||
desc_type = wValue >> 8
|
||||
if desc_type == _DESC_HID_TYPE:
|
||||
return self.get_hid_descriptor()
|
||||
if desc_type == _DESC_REPORT_TYPE:
|
||||
return self.report_descriptor
|
||||
elif req_type == _REQ_TYPE_CLASS:
|
||||
# HID Spec p50: 7.2 Class-Specific Requests
|
||||
if bRequest == _REQ_CONTROL_GET_REPORT:
|
||||
print("GET_REPORT?")
|
||||
return False # Unsupported for now
|
||||
if bRequest == _REQ_CONTROL_GET_IDLE:
|
||||
return bytes([self.idle_rate])
|
||||
if bRequest == _REQ_CONTROL_GET_PROTOCOL:
|
||||
return bytes([self.protocol])
|
||||
if bRequest in (_REQ_CONTROL_SET_IDLE, _REQ_CONTROL_SET_PROTOCOL):
|
||||
return True
|
||||
if bRequest == _REQ_CONTROL_SET_REPORT:
|
||||
return self._set_report_buf # If None, request will stall
|
||||
return False # Unsupported request
|
||||
|
||||
if stage == _STAGE_ACK:
|
||||
if req_type == _REQ_TYPE_CLASS:
|
||||
if bRequest == _REQ_CONTROL_SET_IDLE:
|
||||
self.idle_rate = wValue >> 8
|
||||
elif bRequest == _REQ_CONTROL_SET_PROTOCOL:
|
||||
self.protocol = wValue
|
||||
elif bRequest == _REQ_CONTROL_SET_REPORT:
|
||||
report_id = wValue & 0xFF
|
||||
report_type = wValue >> 8
|
||||
report_data = self._set_report_buf
|
||||
if wLength < len(report_data):
|
||||
# need to truncate the response in the callback if we got less bytes
|
||||
# than allowed for in the buffer
|
||||
report_data = memoryview(self._set_report_buf)[:wLength]
|
||||
self.on_set_report(report_data, report_id, report_type)
|
||||
|
||||
return True # allow DATA/ACK stages to complete normally
|
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.1.0")
|
||||
require("usb-device-hid")
|
||||
package("usb")
|
|
@ -0,0 +1,233 @@
|
|||
# MIT license; Copyright (c) 2023-2024 Angus Gratton
|
||||
from micropython import const
|
||||
import time
|
||||
import usb.device
|
||||
from usb.device.hid import HIDInterface
|
||||
|
||||
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
|
||||
|
||||
_KEY_ARRAY_LEN = const(6) # Size of HID key array, must match report descriptor
|
||||
_KEY_REPORT_LEN = const(_KEY_ARRAY_LEN + 2) # Modifier Byte + Reserved Byte + Array entries
|
||||
|
||||
|
||||
class KeyboardInterface(HIDInterface):
|
||||
# Synchronous USB keyboard HID interface
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
_KEYBOARD_REPORT_DESC,
|
||||
set_report_buf=bytearray(1),
|
||||
protocol=_INTERFACE_PROTOCOL_KEYBOARD,
|
||||
interface_str="MicroPython Keyboard",
|
||||
)
|
||||
self._key_reports = [
|
||||
bytearray(_KEY_REPORT_LEN),
|
||||
bytearray(_KEY_REPORT_LEN),
|
||||
] # Ping/pong report buffers
|
||||
self.numlock = False
|
||||
|
||||
def on_set_report(self, report_data, _report_id, _report_type):
|
||||
self.on_led_update(report_data[0])
|
||||
|
||||
def on_led_update(self, led_mask):
|
||||
# Override to handle keyboard LED updates. led_mask is bitwise ORed
|
||||
# together values as defined in LEDCode.
|
||||
pass
|
||||
|
||||
def send_keys(self, down_keys, timeout_ms=100):
|
||||
# Update the state of the keyboard by sending a report with down_keys
|
||||
# set, where down_keys is an iterable (list or similar) of integer
|
||||
# values such as the values defined in KeyCode.
|
||||
#
|
||||
# Will block for up to timeout_ms if a previous report is still
|
||||
# pending to be sent to the host. Returns True on success.
|
||||
|
||||
r, s = self._key_reports # next report buffer to send, spare report buffer
|
||||
r[0] = 0 # modifier byte
|
||||
i = 2 # index for next key array item to write to
|
||||
for k in down_keys:
|
||||
if k < 0: # Modifier key
|
||||
r[0] |= -k
|
||||
elif i < _KEY_REPORT_LEN:
|
||||
r[i] = k
|
||||
i += 1
|
||||
else: # Excess rollover! Can't report
|
||||
r[0] = 0
|
||||
for i in range(2, _KEY_REPORT_LEN):
|
||||
r[i] = 0xFF
|
||||
break
|
||||
|
||||
while i < _KEY_REPORT_LEN:
|
||||
r[i] = 0
|
||||
i += 1
|
||||
|
||||
if self.send_report(r, timeout_ms):
|
||||
# Swap buffers if the previous one is newly queued to send, so
|
||||
# any subsequent call can't modify that buffer mid-send
|
||||
self._key_reports[0] = s
|
||||
self._key_reports[1] = r
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# HID keyboard report descriptor
|
||||
#
|
||||
# From p69 of http://www.usb.org/developers/devclass_docs/HID1_11.pdf
|
||||
#
|
||||
# fmt: off
|
||||
_KEYBOARD_REPORT_DESC = (
|
||||
b'\x05\x01' # Usage Page (Generic Desktop),
|
||||
b'\x09\x06' # Usage (Keyboard),
|
||||
b'\xA1\x01' # Collection (Application),
|
||||
b'\x05\x07' # Usage Page (Key Codes);
|
||||
b'\x19\xE0' # Usage Minimum (224),
|
||||
b'\x29\xE7' # Usage Maximum (231),
|
||||
b'\x15\x00' # Logical Minimum (0),
|
||||
b'\x25\x01' # Logical Maximum (1),
|
||||
b'\x75\x01' # Report Size (1),
|
||||
b'\x95\x08' # Report Count (8),
|
||||
b'\x81\x02' # Input (Data, Variable, Absolute), ;Modifier byte
|
||||
b'\x95\x01' # Report Count (1),
|
||||
b'\x75\x08' # Report Size (8),
|
||||
b'\x81\x01' # Input (Constant), ;Reserved byte
|
||||
b'\x95\x05' # Report Count (5),
|
||||
b'\x75\x01' # Report Size (1),
|
||||
b'\x05\x08' # Usage Page (Page# for LEDs),
|
||||
b'\x19\x01' # Usage Minimum (1),
|
||||
b'\x29\x05' # Usage Maximum (5),
|
||||
b'\x91\x02' # Output (Data, Variable, Absolute), ;LED report
|
||||
b'\x95\x01' # Report Count (1),
|
||||
b'\x75\x03' # Report Size (3),
|
||||
b'\x91\x01' # Output (Constant), ;LED report padding
|
||||
b'\x95\x06' # Report Count (6),
|
||||
b'\x75\x08' # Report Size (8),
|
||||
b'\x15\x00' # Logical Minimum (0),
|
||||
b'\x25\x65' # Logical Maximum(101),
|
||||
b'\x05\x07' # Usage Page (Key Codes),
|
||||
b'\x19\x00' # Usage Minimum (0),
|
||||
b'\x29\x65' # Usage Maximum (101),
|
||||
b'\x81\x00' # Input (Data, Array), ;Key arrays (6 bytes)
|
||||
b'\xC0' # End Collection
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
# Standard HID keycodes, as a pseudo-enum class for easy access
|
||||
#
|
||||
# Modifier keys are encoded as negative values
|
||||
class KeyCode:
|
||||
A = 4
|
||||
B = 5
|
||||
C = 6
|
||||
D = 7
|
||||
E = 8
|
||||
F = 9
|
||||
G = 10
|
||||
H = 11
|
||||
I = 12
|
||||
J = 13
|
||||
K = 14
|
||||
L = 15
|
||||
M = 16
|
||||
N = 17
|
||||
O = 18
|
||||
P = 19
|
||||
Q = 20
|
||||
R = 21
|
||||
S = 22
|
||||
T = 23
|
||||
U = 24
|
||||
V = 25
|
||||
W = 26
|
||||
X = 27
|
||||
Y = 28
|
||||
Z = 29
|
||||
N1 = 30 # Standard number row keys
|
||||
N2 = 31
|
||||
N3 = 32
|
||||
N4 = 33
|
||||
N5 = 34
|
||||
N6 = 35
|
||||
N7 = 36
|
||||
N8 = 37
|
||||
N9 = 38
|
||||
N0 = 39
|
||||
ENTER = 40
|
||||
ESCAPE = 41
|
||||
BACKSPACE = 42
|
||||
TAB = 43
|
||||
SPACE = 44
|
||||
MINUS = 45 # - _
|
||||
EQUAL = 46 # = +
|
||||
OPEN_BRACKET = 47 # [ {
|
||||
CLOSE_BRACKET = 48 # ] }
|
||||
BACKSLASH = 49 # \ |
|
||||
HASH = 50 # # ~
|
||||
COLON = 51 # ; :
|
||||
QUOTE = 52 # ' "
|
||||
TILDE = 53 # ` ~
|
||||
COMMA = 54 # , <
|
||||
DOT = 55 # . >
|
||||
SLASH = 56 # / ?
|
||||
CAPS_LOCK = 57
|
||||
F1 = 58
|
||||
F2 = 59
|
||||
F3 = 60
|
||||
F4 = 61
|
||||
F5 = 62
|
||||
F6 = 63
|
||||
F7 = 64
|
||||
F8 = 65
|
||||
F9 = 66
|
||||
F10 = 67
|
||||
F11 = 68
|
||||
F12 = 69
|
||||
PRINTSCREEN = 70
|
||||
SCROLL_LOCK = 71
|
||||
PAUSE = 72
|
||||
INSERT = 73
|
||||
HOME = 74
|
||||
PAGEUP = 75
|
||||
DELETE = 76
|
||||
END = 77
|
||||
PAGEDOWN = 78
|
||||
RIGHT = 79 # Arrow keys
|
||||
LEFT = 80
|
||||
DOWN = 81
|
||||
UP = 82
|
||||
KP_NUM_LOCK = 83
|
||||
KP_DIVIDE = 84
|
||||
KP_AT = 85
|
||||
KP_MULTIPLY = 85
|
||||
KP_MINUS = 86
|
||||
KP_PLUS = 87
|
||||
KP_ENTER = 88
|
||||
KP_1 = 89
|
||||
KP_2 = 90
|
||||
KP_3 = 91
|
||||
KP_4 = 92
|
||||
KP_5 = 93
|
||||
KP_6 = 94
|
||||
KP_7 = 95
|
||||
KP_8 = 96
|
||||
KP_9 = 97
|
||||
KP_0 = 98
|
||||
|
||||
# HID modifier values (negated to allow them to be passed along with the normal keys)
|
||||
LEFT_CTRL = -0x01
|
||||
LEFT_SHIFT = -0x02
|
||||
LEFT_ALT = -0x04
|
||||
LEFT_UI = -0x08
|
||||
RIGHT_CTRL = -0x10
|
||||
RIGHT_SHIFT = -0x20
|
||||
RIGHT_ALT = -0x40
|
||||
RIGHT_UI = -0x80
|
||||
|
||||
|
||||
# HID LED values
|
||||
class LEDCode:
|
||||
NUM_LOCK = 0x01
|
||||
CAPS_LOCK = 0x02
|
||||
SCROLL_LOCK = 0x04
|
||||
COMPOSE = 0x08
|
||||
KANA = 0x10
|
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.1.0")
|
||||
require("usb-device")
|
||||
package("usb")
|
|
@ -0,0 +1,306 @@
|
|||
# MicroPython USB MIDI module
|
||||
# MIT license; Copyright (c) 2023 Paul Hamshere, 2023-2024 Angus Gratton
|
||||
from micropython import const, schedule
|
||||
import struct
|
||||
|
||||
from .core import Interface, Buffer
|
||||
|
||||
_EP_IN_FLAG = const(1 << 7)
|
||||
|
||||
_INTERFACE_CLASS_AUDIO = const(0x01)
|
||||
_INTERFACE_SUBCLASS_AUDIO_CONTROL = const(0x01)
|
||||
_INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING = const(0x03)
|
||||
|
||||
# Audio subclass extends the standard endpoint descriptor
|
||||
# with two extra bytes
|
||||
_STD_DESC_AUDIO_ENDPOINT_LEN = const(9)
|
||||
_CLASS_DESC_ENDPOINT_LEN = const(5)
|
||||
|
||||
_STD_DESC_ENDPOINT_TYPE = const(0x5)
|
||||
|
||||
_JACK_TYPE_EMBEDDED = const(0x01)
|
||||
_JACK_TYPE_EXTERNAL = const(0x02)
|
||||
|
||||
_JACK_IN_DESC_LEN = const(6)
|
||||
_JACK_OUT_DESC_LEN = const(9)
|
||||
|
||||
# MIDI Status bytes. For Channel messages these are only the upper 4 bits, ORed with the channel number.
|
||||
# As per https://www.midi.org/specifications-old/item/table-1-summary-of-midi-message
|
||||
_MIDI_NOTE_OFF = const(0x80)
|
||||
_MIDI_NOTE_ON = const(0x90)
|
||||
_MIDI_POLY_KEYPRESS = const(0xA0)
|
||||
_MIDI_CONTROL_CHANGE = const(0xB0)
|
||||
|
||||
# USB-MIDI CINs (Code Index Numbers), as per USB MIDI Table 4-1
|
||||
_CIN_SYS_COMMON_2BYTE = const(0x2)
|
||||
_CIN_SYS_COMMON_3BYTE = const(0x3)
|
||||
_CIN_SYSEX_START = const(0x4)
|
||||
_CIN_SYSEX_END_1BYTE = const(0x5)
|
||||
_CIN_SYSEX_END_2BYTE = const(0x6)
|
||||
_CIN_SYSEX_END_3BYTE = const(0x7)
|
||||
_CIN_NOTE_OFF = const(0x8)
|
||||
_CIN_NOTE_ON = const(0x9)
|
||||
_CIN_POLY_KEYPRESS = const(0xA)
|
||||
_CIN_CONTROL_CHANGE = const(0xB)
|
||||
_CIN_PROGRAM_CHANGE = const(0xC)
|
||||
_CIN_CHANNEL_PRESSURE = const(0xD)
|
||||
_CIN_PITCH_BEND = const(0xE)
|
||||
_CIN_SINGLE_BYTE = const(0xF) # Not currently supported
|
||||
|
||||
# Jack IDs for a simple bidrectional MIDI device(!)
|
||||
_EMB_IN_JACK_ID = const(1)
|
||||
_EXT_IN_JACK_ID = const(2)
|
||||
_EMB_OUT_JACK_ID = const(3)
|
||||
_EXT_OUT_JACK_ID = const(4)
|
||||
|
||||
# Data flows, as modelled by USB-MIDI and this hypothetical interface, are as follows:
|
||||
# Device RX = USB OUT EP => _EMB_IN_JACK => _EMB_OUT_JACK
|
||||
# Device TX = _EXT_IN_JACK => _EMB_OUT_JACK => USB IN EP
|
||||
|
||||
|
||||
class MIDIInterface(Interface):
|
||||
# Base class to implement a USB MIDI device in Python.
|
||||
#
|
||||
# To be compliant this also regisers a dummy USB Audio interface, but that
|
||||
# interface isn't otherwise used.
|
||||
|
||||
def __init__(self, rxlen=16, txlen=16):
|
||||
# Arguments are size of transmit and receive buffers in bytes.
|
||||
|
||||
super().__init__()
|
||||
self.ep_out = None # Set during enumeration. RX direction (host to device)
|
||||
self.ep_in = None # TX direction (device to host)
|
||||
self._rx = Buffer(rxlen)
|
||||
self._tx = Buffer(txlen)
|
||||
|
||||
# Callbacks for handling received MIDI messages.
|
||||
#
|
||||
# Subclasses can choose between overriding on_midi_event
|
||||
# and handling all MIDI events manually, or overriding the
|
||||
# functions for note on/off and control change, only.
|
||||
|
||||
def on_midi_event(self, cin, midi0, midi1, midi2):
|
||||
ch = midi0 & 0x0F
|
||||
if cin == _CIN_NOTE_ON:
|
||||
self.on_note_on(ch, midi1, midi2)
|
||||
elif cin == _CIN_NOTE_OFF:
|
||||
self.on_note_off(ch, midi1, midi2)
|
||||
elif cin == _CIN_CONTROL_CHANGE:
|
||||
self.on_control_change(ch, midi1, midi2)
|
||||
|
||||
def on_note_on(self, channel, pitch, vel):
|
||||
pass # Override to handle Note On messages
|
||||
|
||||
def on_note_off(self, channel, pitch, vel):
|
||||
pass # Override to handle Note On messages
|
||||
|
||||
def on_control_change(self, channel, controller, value):
|
||||
pass # Override to handle Control Change messages
|
||||
|
||||
# Helper functions for sending common MIDI messages
|
||||
|
||||
def note_on(self, channel, pitch, vel=0x40):
|
||||
self.send_event(_CIN_NOTE_ON, _MIDI_NOTE_ON | channel, pitch, vel)
|
||||
|
||||
def note_off(self, channel, pitch, vel=0x40):
|
||||
self.send_event(_CIN_NOTE_OFF, _MIDI_NOTE_OFF | channel, pitch, vel)
|
||||
|
||||
def control_change(self, channel, controller, value):
|
||||
self.send_event(_CIN_CONTROL_CHANGE, _MIDI_CONTROL_CHANGE | channel, controller, value)
|
||||
|
||||
def send_event(self, cin, midi0, midi1=0, midi2=0):
|
||||
# Queue a MIDI Event Packet to send to the host.
|
||||
#
|
||||
# CIN = USB-MIDI Code Index Number, see USB MIDI 1.0 section 4 "USB-MIDI Event Packets"
|
||||
#
|
||||
# Remaining arguments are 0-3 MIDI data bytes.
|
||||
#
|
||||
# Note this function returns when the MIDI Event Packet has been queued,
|
||||
# not when it's been received by the host.
|
||||
#
|
||||
# Returns False if the TX buffer is full and the MIDI Event could not be queued.
|
||||
w = self._tx.pend_write()
|
||||
if len(w) < 4:
|
||||
return False # TX buffer is full. TODO: block here?
|
||||
w[0] = cin # leave cable number as 0?
|
||||
w[1] = midi0
|
||||
w[2] = midi1
|
||||
w[3] = midi2
|
||||
self._tx.finish_write(4)
|
||||
self._tx_xfer()
|
||||
return True
|
||||
|
||||
def _tx_xfer(self):
|
||||
# Keep an active IN transfer to send data to the host, whenever
|
||||
# there is data to send.
|
||||
if self.is_open() and not self.xfer_pending(self.ep_in) and self._tx.readable():
|
||||
self.submit_xfer(self.ep_in, self._tx.pend_read(), self._tx_cb)
|
||||
|
||||
def _tx_cb(self, ep, res, num_bytes):
|
||||
if res == 0:
|
||||
self._tx.finish_read(num_bytes)
|
||||
self._tx_xfer()
|
||||
|
||||
def _rx_xfer(self):
|
||||
# Keep an active OUT transfer to receive MIDI events from the host
|
||||
if self.is_open() and not self.xfer_pending(self.ep_out) and self._rx.writable():
|
||||
self.submit_xfer(self.ep_out, self._rx.pend_write(), self._rx_cb)
|
||||
|
||||
def _rx_cb(self, ep, res, num_bytes):
|
||||
if res == 0:
|
||||
self._rx.finish_write(num_bytes)
|
||||
schedule(self._on_rx, None)
|
||||
self._rx_xfer()
|
||||
|
||||
def on_open(self):
|
||||
super().on_open()
|
||||
# kick off any transfers that may have queued while the device was not open
|
||||
self._tx_xfer()
|
||||
self._rx_xfer()
|
||||
|
||||
def _on_rx(self, _):
|
||||
# Receive MIDI events. Called via micropython.schedule, outside of the USB callback function.
|
||||
m = self._rx.pend_read()
|
||||
i = 0
|
||||
while i <= len(m) - 4:
|
||||
cin = m[i] & 0x0F
|
||||
self.on_midi_event(cin, m[i + 1], m[i + 2], m[i + 3])
|
||||
i += 4
|
||||
self._rx.finish_read(i)
|
||||
|
||||
def desc_cfg(self, desc, itf_num, ep_num, strs):
|
||||
# Start by registering a USB Audio Control interface, that is required to point to the
|
||||
# actual MIDI interface
|
||||
desc.interface(itf_num, 0, _INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_CONTROL)
|
||||
|
||||
# Append the class-specific AudioControl interface descriptor
|
||||
desc.pack(
|
||||
"<BBBHHBB",
|
||||
9, # bLength
|
||||
0x24, # bDescriptorType CS_INTERFACE
|
||||
0x01, # bDescriptorSubtype MS_HEADER
|
||||
0x0100, # BcdADC
|
||||
0x0009, # wTotalLength
|
||||
0x01, # bInCollection,
|
||||
itf_num + 1, # baInterfaceNr - points to the MIDI Streaming interface
|
||||
)
|
||||
|
||||
# Next add the MIDI Streaming interface descriptor
|
||||
desc.interface(
|
||||
itf_num + 1, 2, _INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING
|
||||
)
|
||||
|
||||
# Append the class-specific interface descriptors
|
||||
|
||||
# Midi Streaming interface descriptor
|
||||
desc.pack(
|
||||
"<BBBHH",
|
||||
7, # bLength
|
||||
0x24, # bDescriptorType CS_INTERFACE
|
||||
0x01, # bDescriptorSubtype MS_HEADER
|
||||
0x0100, # BcdADC
|
||||
# wTotalLength: of all class-specific descriptors
|
||||
7
|
||||
+ 2
|
||||
* (
|
||||
_JACK_IN_DESC_LEN
|
||||
+ _JACK_OUT_DESC_LEN
|
||||
+ _STD_DESC_AUDIO_ENDPOINT_LEN
|
||||
+ _CLASS_DESC_ENDPOINT_LEN
|
||||
),
|
||||
)
|
||||
|
||||
# The USB MIDI standard 1.0 allows modelling a baffling range of MIDI
|
||||
# devices with different permutations of Jack descriptors, with a lot of
|
||||
# scope for indicating internal connections in the device (as
|
||||
# "virtualised" by the USB MIDI standard). Much of the options don't
|
||||
# really change the USB behaviour but provide metadata to the host.
|
||||
#
|
||||
# As observed elsewhere online, the standard ends up being pretty
|
||||
# complex and unclear in parts, but there is a clear simple example in
|
||||
# an Appendix. So nearly everyone implements the device from the
|
||||
# Appendix as-is, even when it's not a good fit for their application,
|
||||
# and ignores the rest of the standard.
|
||||
#
|
||||
# For now, this is what this class does as well.
|
||||
|
||||
_jack_in_desc(desc, _JACK_TYPE_EMBEDDED, _EMB_IN_JACK_ID)
|
||||
_jack_in_desc(desc, _JACK_TYPE_EXTERNAL, _EXT_IN_JACK_ID)
|
||||
_jack_out_desc(desc, _JACK_TYPE_EMBEDDED, _EMB_OUT_JACK_ID, _EXT_IN_JACK_ID, 1)
|
||||
_jack_out_desc(desc, _JACK_TYPE_EXTERNAL, _EXT_OUT_JACK_ID, _EMB_IN_JACK_ID, 1)
|
||||
|
||||
# One MIDI endpoint in each direction, plus the
|
||||
# associated CS descriptors
|
||||
|
||||
self.ep_out = ep_num
|
||||
self.ep_in = ep_num | _EP_IN_FLAG
|
||||
|
||||
# rx side, USB "in" endpoint and embedded MIDI IN Jacks
|
||||
_audio_endpoint(desc, self.ep_in, _EMB_OUT_JACK_ID)
|
||||
|
||||
# tx side, USB "out" endpoint and embedded MIDI OUT jacks
|
||||
_audio_endpoint(desc, self.ep_out, _EMB_IN_JACK_ID)
|
||||
|
||||
def num_itfs(self):
|
||||
return 2
|
||||
|
||||
def num_eps(self):
|
||||
return 1
|
||||
|
||||
|
||||
def _jack_in_desc(desc, bJackType, bJackID):
|
||||
# Utility function appends a "JACK IN" descriptor with
|
||||
# specified bJackType and bJackID
|
||||
desc.pack(
|
||||
"<BBBBBB",
|
||||
_JACK_IN_DESC_LEN, # bLength
|
||||
0x24, # bDescriptorType CS_INTERFACE
|
||||
0x02, # bDescriptorSubtype MIDI_IN_JACK
|
||||
bJackType,
|
||||
bJackID,
|
||||
0x00, # iJack, no string descriptor support yet
|
||||
)
|
||||
|
||||
|
||||
def _jack_out_desc(desc, bJackType, bJackID, bSourceId, bSourcePin):
|
||||
# Utility function appends a "JACK IN" descriptor with
|
||||
# specified bJackType and bJackID
|
||||
desc.pack(
|
||||
"<BBBBBBBBB",
|
||||
_JACK_OUT_DESC_LEN, # bLength
|
||||
0x24, # bDescriptorType CS_INTERFACE
|
||||
0x03, # bDescriptorSubtype MIDI_OUT_JACK
|
||||
bJackType,
|
||||
bJackID,
|
||||
0x01, # bNrInputPins
|
||||
bSourceId, # baSourceID(1)
|
||||
bSourcePin, # baSourcePin(1)
|
||||
0x00, # iJack, no string descriptor support yet
|
||||
)
|
||||
|
||||
|
||||
def _audio_endpoint(desc, bEndpointAddress, emb_jack_id):
|
||||
# Append a standard USB endpoint descriptor and the USB class endpoint descriptor
|
||||
# for this endpoint.
|
||||
#
|
||||
# Audio Class devices extend the standard endpoint descriptor with two extra bytes,
|
||||
# so we can't easily call desc.endpoint() for the first part.
|
||||
desc.pack(
|
||||
# Standard USB endpoint descriptor (plus audio tweaks)
|
||||
"<BBBBHBBB"
|
||||
# Class endpoint descriptor
|
||||
"BBBBB",
|
||||
_STD_DESC_AUDIO_ENDPOINT_LEN, # wLength
|
||||
_STD_DESC_ENDPOINT_TYPE, # bDescriptorType
|
||||
bEndpointAddress,
|
||||
2, # bmAttributes, bulk
|
||||
64, # wMaxPacketSize
|
||||
0, # bInterval
|
||||
0, # bRefresh (unused)
|
||||
0, # bSynchInterval (unused)
|
||||
_CLASS_DESC_ENDPOINT_LEN, # bLength
|
||||
0x25, # bDescriptorType CS_ENDPOINT
|
||||
0x01, # bDescriptorSubtype MS_GENERAL
|
||||
1, # bNumEmbMIDIJack
|
||||
emb_jack_id, # BaAssocJackID(1)
|
||||
)
|
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.1.0")
|
||||
require("usb-device-hid")
|
||||
package("usb")
|
|
@ -0,0 +1,100 @@
|
|||
# MicroPython USB Mouse module
|
||||
#
|
||||
# MIT license; Copyright (c) 2023-2024 Angus Gratton
|
||||
from micropython import const
|
||||
import struct
|
||||
import machine
|
||||
|
||||
from usb.device.hid import HIDInterface
|
||||
|
||||
_INTERFACE_PROTOCOL_MOUSE = const(0x02)
|
||||
|
||||
|
||||
class MouseInterface(HIDInterface):
|
||||
# A basic three button USB mouse HID interface
|
||||
def __init__(self, interface_str="MicroPython Mouse"):
|
||||
super().__init__(
|
||||
_MOUSE_REPORT_DESC,
|
||||
protocol=_INTERFACE_PROTOCOL_MOUSE,
|
||||
interface_str=interface_str,
|
||||
)
|
||||
self._l = False # Left button
|
||||
self._m = False # Middle button
|
||||
self._r = False # Right button
|
||||
self._buf = bytearray(3)
|
||||
|
||||
def send_report(self, dx=0, dy=0):
|
||||
b = 0
|
||||
if self._l:
|
||||
b |= 1 << 0
|
||||
if self._r:
|
||||
b |= 1 << 1
|
||||
if self._m:
|
||||
b |= 1 << 2
|
||||
|
||||
# Wait for any pending report to be sent to the host
|
||||
# before updating contents of _buf.
|
||||
#
|
||||
# This loop can be removed if you don't care about possibly missing a
|
||||
# transient report, the final report buffer contents will always be the
|
||||
# last one sent to the host (it just might lose one of the ones in the
|
||||
# middle).
|
||||
while self.busy():
|
||||
machine.idle()
|
||||
|
||||
struct.pack_into("Bbb", self._buf, 0, b, dx, dy)
|
||||
|
||||
return super().send_report(self._buf)
|
||||
|
||||
def click_left(self, down=True):
|
||||
self._l = down
|
||||
return self.send_report()
|
||||
|
||||
def click_middle(self, down=True):
|
||||
self._m = down
|
||||
return self.send_report()
|
||||
|
||||
def click_right(self, down=True):
|
||||
self._r = down
|
||||
return self.send_report()
|
||||
|
||||
def move_by(self, dx, dy):
|
||||
if not -127 <= dx <= 127:
|
||||
raise ValueError("dx")
|
||||
if not -127 <= dy <= 127:
|
||||
raise ValueError("dy")
|
||||
return self.send_report(dx, dy)
|
||||
|
||||
|
||||
# Basic 3-button mouse HID Report Descriptor.
|
||||
# This is based on Appendix E.10 of the HID v1.11 document.
|
||||
# fmt: off
|
||||
_MOUSE_REPORT_DESC = (
|
||||
b'\x05\x01' # Usage Page (Generic Desktop)
|
||||
b'\x09\x02' # Usage (Mouse)
|
||||
b'\xA1\x01' # Collection (Application)
|
||||
b'\x09\x01' # Usage (Pointer)
|
||||
b'\xA1\x00' # Collection (Physical)
|
||||
b'\x05\x09' # Usage Page (Buttons)
|
||||
b'\x19\x01' # Usage Minimum (01),
|
||||
b'\x29\x03' # Usage Maximun (03),
|
||||
b'\x15\x00' # Logical Minimum (0),
|
||||
b'\x25\x01' # Logical Maximum (1),
|
||||
b'\x95\x03' # Report Count (3),
|
||||
b'\x75\x01' # Report Size (1),
|
||||
b'\x81\x02' # Input (Data, Variable, Absolute), ;3 button bits
|
||||
b'\x95\x01' # Report Count (1),
|
||||
b'\x75\x05' # Report Size (5),
|
||||
b'\x81\x01' # Input (Constant), ;5 bit padding
|
||||
b'\x05\x01' # Usage Page (Generic Desktop),
|
||||
b'\x09\x30' # Usage (X),
|
||||
b'\x09\x31' # Usage (Y),
|
||||
b'\x15\x81' # Logical Minimum (-127),
|
||||
b'\x25\x7F' # Logical Maximum (127),
|
||||
b'\x75\x08' # Report Size (8),
|
||||
b'\x95\x02' # Report Count (2),
|
||||
b'\x81\x06' # Input (Data, Variable, Relative), ;2 position bytes (X & Y)
|
||||
b'\xC0' # End Collection
|
||||
b'\xC0' # End Collection
|
||||
)
|
||||
# fmt: on
|
|
@ -0,0 +1,2 @@
|
|||
metadata(version="0.1.0")
|
||||
package("usb")
|
|
@ -0,0 +1,97 @@
|
|||
# Tests for the Buffer class included in usb.device.core
|
||||
#
|
||||
# The easiest way to run this is using unix port. From the parent usb-device
|
||||
# directory, run as:
|
||||
#
|
||||
# $ micropython -m tests.test_core_buffer
|
||||
#
|
||||
import micropython
|
||||
from usb.device import core
|
||||
|
||||
if not hasattr(core.machine, "disable_irq"):
|
||||
# Inject a fake machine module which allows testing on the unix port, and as
|
||||
# a bonus have tests fail if the buffer allocates inside a critical section.
|
||||
class FakeMachine:
|
||||
def disable_irq(self):
|
||||
return micropython.heap_lock()
|
||||
|
||||
def enable_irq(self, was_locked):
|
||||
if not was_locked:
|
||||
micropython.heap_unlock()
|
||||
|
||||
core.machine = FakeMachine()
|
||||
|
||||
|
||||
b = core.Buffer(16)
|
||||
|
||||
# Check buffer is empty
|
||||
assert b.readable() == 0
|
||||
assert b.writable() == 16
|
||||
|
||||
# Single write then read
|
||||
w = b.pend_write()
|
||||
assert len(w) == 16
|
||||
w[:8] = b"12345678"
|
||||
b.finish_write(8)
|
||||
|
||||
# Empty again
|
||||
assert b.readable() == 8
|
||||
assert b.writable() == 8
|
||||
|
||||
r = b.pend_read()
|
||||
assert len(r) == 8
|
||||
assert r == b"12345678"
|
||||
b.finish_read(8)
|
||||
|
||||
# Empty buffer again
|
||||
assert b.readable() == 0
|
||||
assert b.writable() == 16
|
||||
|
||||
# Single write then split reads
|
||||
b.write(b"abcdefghijklmnop")
|
||||
assert b.writable() == 0 # full buffer
|
||||
|
||||
r = b.pend_read()
|
||||
assert r == b"abcdefghijklmnop"
|
||||
b.finish_read(2)
|
||||
|
||||
r = b.pend_read()
|
||||
assert r == b"cdefghijklmnop"
|
||||
b.finish_read(3)
|
||||
|
||||
# write to end of buffer
|
||||
b.write(b"AB")
|
||||
|
||||
r = b.pend_read()
|
||||
assert r == b"fghijklmnopAB"
|
||||
|
||||
# write while a read is pending
|
||||
b.write(b"XY")
|
||||
|
||||
# previous pend_read() memoryview should be the same
|
||||
assert r == b"fghijklmnopAB"
|
||||
|
||||
b.finish_read(4)
|
||||
r = b.pend_read()
|
||||
assert r == b"jklmnopABXY" # four bytes consumed from head, one new byte at tail
|
||||
|
||||
# read while a write is pending
|
||||
w = b.pend_write()
|
||||
assert len(w) == 5
|
||||
r = b.pend_read()
|
||||
assert len(r) == 11
|
||||
b.finish_read(3)
|
||||
w[:2] = b"12"
|
||||
b.finish_write(2)
|
||||
|
||||
# Expected final state of buffer
|
||||
tmp = bytearray(b.readable())
|
||||
assert b.readinto(tmp) == len(tmp)
|
||||
assert tmp == b"mnopABXY12"
|
||||
|
||||
# Now buffer is empty again
|
||||
assert b.readable() == 0
|
||||
assert b.readinto(tmp) == 0
|
||||
assert b.writable() == 16
|
||||
|
||||
print("All Buffer tests passed")
|
|
@ -0,0 +1,2 @@
|
|||
from . import core
|
||||
from .core import get # Singleton _Device getter
|
|
@ -0,0 +1,851 @@
|
|||
# MicroPython Library runtime USB device implementation
|
||||
#
|
||||
# These contain the classes and utilities that are needed to
|
||||
# implement a USB device, not any complete USB drivers.
|
||||
#
|
||||
# MIT license; Copyright (c) 2022-2024 Angus Gratton
|
||||
from micropython import const
|
||||
import machine
|
||||
import struct
|
||||
|
||||
_EP_IN_FLAG = const(1 << 7)
|
||||
|
||||
# USB descriptor types
|
||||
_STD_DESC_DEV_TYPE = const(0x1)
|
||||
_STD_DESC_CONFIG_TYPE = const(0x2)
|
||||
_STD_DESC_STRING_TYPE = const(0x3)
|
||||
_STD_DESC_INTERFACE_TYPE = const(0x4)
|
||||
_STD_DESC_ENDPOINT_TYPE = const(0x5)
|
||||
_STD_DESC_INTERFACE_ASSOC = const(0xB)
|
||||
|
||||
_ITF_ASSOCIATION_DESC_TYPE = const(0xB) # Interface Association descriptor
|
||||
|
||||
# Standard USB descriptor lengths
|
||||
_STD_DESC_CONFIG_LEN = const(9)
|
||||
_STD_DESC_ENDPOINT_LEN = const(7)
|
||||
_STD_DESC_INTERFACE_LEN = const(9)
|
||||
|
||||
_DESC_OFFSET_LEN = const(0)
|
||||
_DESC_OFFSET_TYPE = const(1)
|
||||
|
||||
_DESC_OFFSET_INTERFACE_NUM = const(2) # for _STD_DESC_INTERFACE_TYPE
|
||||
_DESC_OFFSET_ENDPOINT_NUM = const(2) # for _STD_DESC_ENDPOINT_TYPE
|
||||
|
||||
# Standard control request bmRequest fields, can extract by calling split_bmRequestType()
|
||||
_REQ_RECIPIENT_DEVICE = const(0x0)
|
||||
_REQ_RECIPIENT_INTERFACE = const(0x1)
|
||||
_REQ_RECIPIENT_ENDPOINT = const(0x2)
|
||||
_REQ_RECIPIENT_OTHER = const(0x3)
|
||||
|
||||
# Offsets into the standard configuration descriptor, to fixup
|
||||
_OFFS_CONFIG_iConfiguration = const(6)
|
||||
|
||||
_INTERFACE_CLASS_VENDOR = const(0xFF)
|
||||
_INTERFACE_SUBCLASS_NONE = const(0x00)
|
||||
_PROTOCOL_NONE = const(0x00)
|
||||
|
||||
# These need to match the constants in tusb_config.h
|
||||
_USB_STR_MANUF = const(0x01)
|
||||
_USB_STR_PRODUCT = const(0x02)
|
||||
_USB_STR_SERIAL = const(0x03)
|
||||
|
||||
# Error constant to match mperrno.h
|
||||
_MP_EINVAL = const(22)
|
||||
|
||||
_dev = None # Singleton _Device instance
|
||||
|
||||
|
||||
def get():
|
||||
# Getter to access the singleton instance of the
|
||||
# MicroPython _Device object
|
||||
#
|
||||
# (note this isn't the low-level machine.USBDevice object, the low-level object is
|
||||
# get()._usbd.)
|
||||
global _dev
|
||||
if not _dev:
|
||||
_dev = _Device()
|
||||
return _dev
|
||||
|
||||
|
||||
class _Device:
|
||||
# Class that implements the Python parts of the MicroPython USBDevice.
|
||||
#
|
||||
# This class should only be instantiated by the singleton getter
|
||||
# function usb.device.get(), never directly.
|
||||
def __init__(self):
|
||||
self._itfs = {} # Mapping from interface number to interface object, set by init()
|
||||
self._eps = {} # Mapping from endpoint address to interface object, set by _open_cb()
|
||||
self._ep_cbs = {} # Mapping from endpoint address to Optional[xfer callback]
|
||||
self._usbd = machine.USBDevice() # low-level API
|
||||
|
||||
def init(self, *itfs, **kwargs):
|
||||
# Helper function to configure the USB device and activate it in a single call
|
||||
self.active(False)
|
||||
self.config(*itfs, **kwargs)
|
||||
self.active(True)
|
||||
|
||||
def config( # noqa: PLR0913
|
||||
self,
|
||||
*itfs,
|
||||
builtin_driver=False,
|
||||
manufacturer_str=None,
|
||||
product_str=None,
|
||||
serial_str=None,
|
||||
configuration_str=None,
|
||||
id_vendor=None,
|
||||
id_product=None,
|
||||
bcd_device=None,
|
||||
device_class=0,
|
||||
device_subclass=0,
|
||||
device_protocol=0,
|
||||
config_str=None,
|
||||
max_power_ma=None,
|
||||
):
|
||||
# Configure the USB device with a set of interfaces, and optionally reconfiguring the
|
||||
# device and configuration descriptor fields
|
||||
|
||||
_usbd = self._usbd
|
||||
|
||||
if self.active():
|
||||
raise OSError(_MP_EINVAL) # Must set active(False) first
|
||||
|
||||
# Convenience: Allow builtin_driver to be True, False or one of
|
||||
# the machine.USBDevice.BUILTIN_ constants
|
||||
if isinstance(builtin_driver, bool):
|
||||
builtin_driver = _usbd.BUILTIN_DEFAULT if builtin_driver else _usbd.BUILTIN_NONE
|
||||
_usbd.builtin_driver = builtin_driver
|
||||
|
||||
# Putting None for any strings that should fall back to the "built-in" value
|
||||
# Indexes in this list depends on _USB_STR_MANUF, _USB_STR_PRODUCT, _USB_STR_SERIAL
|
||||
strs = [None, manufacturer_str, product_str, serial_str]
|
||||
|
||||
# Build the device descriptor
|
||||
FMT = "<BBHBBBBHHHBBBB"
|
||||
# read the static descriptor fields
|
||||
f = struct.unpack(FMT, builtin_driver.desc_dev)
|
||||
|
||||
def maybe_set(value, idx):
|
||||
# Override a numeric descriptor value or keep builtin value f[idx] if 'value' is None
|
||||
if value is not None:
|
||||
return value
|
||||
return f[idx]
|
||||
|
||||
# Either copy each descriptor field directly from the builtin device descriptor, or 'maybe'
|
||||
# set it to the custom value from the object
|
||||
desc_dev = struct.pack(
|
||||
FMT,
|
||||
f[0], # bLength
|
||||
f[1], # bDescriptorType
|
||||
f[2], # bcdUSB
|
||||
device_class, # bDeviceClass
|
||||
device_subclass, # bDeviceSubClass
|
||||
device_protocol, # bDeviceProtocol
|
||||
f[6], # bMaxPacketSize0, TODO: allow overriding this value?
|
||||
maybe_set(id_vendor, 7), # idVendor
|
||||
maybe_set(id_product, 8), # idProduct
|
||||
maybe_set(bcd_device, 9), # bcdDevice
|
||||
_USB_STR_MANUF, # iManufacturer
|
||||
_USB_STR_PRODUCT, # iProduct
|
||||
_USB_STR_SERIAL, # iSerialNumber
|
||||
1,
|
||||
) # bNumConfigurations
|
||||
|
||||
# Iterate interfaces to build the configuration descriptor
|
||||
|
||||
# Keep track of the interface and endpoint indexes
|
||||
itf_num = builtin_driver.itf_max
|
||||
ep_num = max(builtin_driver.ep_max, 1) # Endpoint 0 always reserved for control
|
||||
while len(strs) < builtin_driver.str_max:
|
||||
strs.append(None) # Reserve other string indexes used by builtin drivers
|
||||
initial_cfg = builtin_driver.desc_cfg or (b"\x00" * _STD_DESC_CONFIG_LEN)
|
||||
|
||||
self._itfs = {}
|
||||
|
||||
# Determine the total length of the configuration descriptor, by making dummy
|
||||
# calls to build the config descriptor
|
||||
desc = Descriptor(None)
|
||||
desc.extend(initial_cfg)
|
||||
for itf in itfs:
|
||||
itf.desc_cfg(desc, 0, 0, [])
|
||||
|
||||
# Allocate the real Descriptor helper to write into it, starting
|
||||
# after the standard configuration descriptor
|
||||
desc = Descriptor(bytearray(desc.o))
|
||||
desc.extend(initial_cfg)
|
||||
for itf in itfs:
|
||||
itf.desc_cfg(desc, itf_num, ep_num, strs)
|
||||
|
||||
for _ in range(itf.num_itfs()):
|
||||
self._itfs[itf_num] = itf # Mapping from interface numbers to interfaces
|
||||
itf_num += 1
|
||||
|
||||
ep_num += itf.num_eps()
|
||||
|
||||
# Go back and update the Standard Configuration Descriptor
|
||||
# header at the start with values based on the complete
|
||||
# descriptor.
|
||||
#
|
||||
# See USB 2.0 specification section 9.6.3 p264 for details.
|
||||
bmAttributes = (
|
||||
(1 << 7) # Reserved
|
||||
| (0 if max_power_ma else (1 << 6)) # Self-Powered
|
||||
# Remote Wakeup not currently supported
|
||||
)
|
||||
|
||||
# Configuration string is optional but supported
|
||||
iConfiguration = 0
|
||||
if configuration_str:
|
||||
iConfiguration = len(strs)
|
||||
strs.append(configuration_str)
|
||||
|
||||
if max_power_ma is not None:
|
||||
# Convert from mA to the units used in the descriptor
|
||||
max_power_ma //= 2
|
||||
else:
|
||||
try:
|
||||
# Default to whatever value the builtin driver reports
|
||||
max_power_ma = _usbd.BUILTIN_DEFAULT.desc_cfg[8]
|
||||
except IndexError:
|
||||
# If no built-in driver, default to 250mA
|
||||
max_power_ma = 125
|
||||
|
||||
desc.pack_into(
|
||||
"<BBHBBBBB",
|
||||
0,
|
||||
_STD_DESC_CONFIG_LEN, # bLength
|
||||
_STD_DESC_CONFIG_TYPE, # bDescriptorType
|
||||
len(desc.b), # wTotalLength
|
||||
itf_num,
|
||||
1, # bConfigurationValue
|
||||
iConfiguration,
|
||||
bmAttributes,
|
||||
max_power_ma,
|
||||
)
|
||||
|
||||
_usbd.config(
|
||||
desc_dev,
|
||||
desc.b,
|
||||
strs,
|
||||
self._open_itf_cb,
|
||||
self._reset_cb,
|
||||
self._control_xfer_cb,
|
||||
self._xfer_cb,
|
||||
)
|
||||
|
||||
def active(self, *optional_value):
|
||||
# Thin wrapper around the USBDevice active() function.
|
||||
#
|
||||
# Note: active only means the USB device is available, not that it has
|
||||
# actually been connected to and configured by a USB host. Use the
|
||||
# Interface.is_open() function to check if the host has configured an
|
||||
# interface of the device.
|
||||
return self._usbd.active(*optional_value)
|
||||
|
||||
def _open_itf_cb(self, desc):
|
||||
# Singleton callback from TinyUSB custom class driver, when USB host does
|
||||
# Set Configuration. Called once per interface or IAD.
|
||||
|
||||
# Note that even if the configuration descriptor contains an IAD, 'desc'
|
||||
# starts from the first interface descriptor in the IAD and not the IAD
|
||||
# descriptor.
|
||||
|
||||
itf_num = desc[_DESC_OFFSET_INTERFACE_NUM]
|
||||
itf = self._itfs[itf_num]
|
||||
|
||||
# Scan the full descriptor:
|
||||
# - Build _eps and _ep_addr from the endpoint descriptors
|
||||
# - Find the highest numbered interface provided to the callback
|
||||
# (which will be the first interface, unless we're scanning
|
||||
# multiple interfaces inside an IAD.)
|
||||
offs = 0
|
||||
max_itf = itf_num
|
||||
while offs < len(desc):
|
||||
dl = desc[offs + _DESC_OFFSET_LEN]
|
||||
dt = desc[offs + _DESC_OFFSET_TYPE]
|
||||
if dt == _STD_DESC_ENDPOINT_TYPE:
|
||||
ep_addr = desc[offs + _DESC_OFFSET_ENDPOINT_NUM]
|
||||
self._eps[ep_addr] = itf
|
||||
self._ep_cbs[ep_addr] = None
|
||||
elif dt == _STD_DESC_INTERFACE_TYPE:
|
||||
max_itf = max(max_itf, desc[offs + _DESC_OFFSET_INTERFACE_NUM])
|
||||
offs += dl
|
||||
|
||||
# If 'desc' is not the inside of an Interface Association Descriptor but
|
||||
# 'itf' object still represents multiple USB interfaces (i.e. MIDI),
|
||||
# defer calling 'itf.on_open()' until this callback fires for the
|
||||
# highest numbered USB interface.
|
||||
#
|
||||
# This means on_open() is only called once, and that it can
|
||||
# safely submit transfers on any of the USB interfaces' endpoints.
|
||||
if self._itfs.get(max_itf + 1, None) != itf:
|
||||
itf.on_open()
|
||||
|
||||
def _reset_cb(self):
|
||||
# Callback when the USB device is reset by the host
|
||||
|
||||
# Allow interfaces to respond to the reset
|
||||
for itf in self._itfs.values():
|
||||
itf.on_reset()
|
||||
|
||||
# Rebuilt when host re-enumerates
|
||||
self._eps = {}
|
||||
self._ep_cbs = {}
|
||||
|
||||
def _submit_xfer(self, ep_addr, data, done_cb=None):
|
||||
# Singleton function to submit a USB transfer (of any type except control).
|
||||
#
|
||||
# Generally, drivers should call Interface.submit_xfer() instead. See
|
||||
# that function for documentation about the possible parameter values.
|
||||
if ep_addr not in self._eps:
|
||||
raise ValueError("ep_addr")
|
||||
if self._ep_cbs[ep_addr]:
|
||||
raise RuntimeError("xfer_pending")
|
||||
|
||||
# USBDevice callback may be called immediately, before Python execution
|
||||
# continues, so set it first.
|
||||
#
|
||||
# To allow xfer_pending checks to work, store True instead of None.
|
||||
self._ep_cbs[ep_addr] = done_cb or True
|
||||
return self._usbd.submit_xfer(ep_addr, data)
|
||||
|
||||
def _xfer_cb(self, ep_addr, result, xferred_bytes):
|
||||
# Singleton callback from TinyUSB custom class driver when a transfer completes.
|
||||
cb = self._ep_cbs.get(ep_addr, None)
|
||||
self._ep_cbs[ep_addr] = None
|
||||
if callable(cb):
|
||||
cb(ep_addr, result, xferred_bytes)
|
||||
|
||||
def _control_xfer_cb(self, stage, request):
|
||||
# Singleton callback from TinyUSB custom class driver when a control
|
||||
# transfer is in progress.
|
||||
#
|
||||
# stage determines appropriate responses (possible values
|
||||
# utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK).
|
||||
#
|
||||
# The TinyUSB class driver framework only calls this function for
|
||||
# particular types of control transfer, other standard control transfers
|
||||
# are handled by TinyUSB itself.
|
||||
wIndex = request[4] + (request[5] << 8)
|
||||
recipient, _, _ = split_bmRequestType(request[0])
|
||||
|
||||
itf = None
|
||||
result = None
|
||||
|
||||
if recipient == _REQ_RECIPIENT_DEVICE:
|
||||
itf = self._itfs.get(wIndex & 0xFFFF, None)
|
||||
if itf:
|
||||
result = itf.on_device_control_xfer(stage, request)
|
||||
elif recipient == _REQ_RECIPIENT_INTERFACE:
|
||||
itf = self._itfs.get(wIndex & 0xFFFF, None)
|
||||
if itf:
|
||||
result = itf.on_interface_control_xfer(stage, request)
|
||||
elif recipient == _REQ_RECIPIENT_ENDPOINT:
|
||||
ep_num = wIndex & 0xFFFF
|
||||
itf = self._eps.get(ep_num, None)
|
||||
if itf:
|
||||
result = itf.on_endpoint_control_xfer(stage, request)
|
||||
|
||||
if not itf:
|
||||
# At time this code was written, only the control transfers shown
|
||||
# above are passed to the class driver callback. See
|
||||
# invoke_class_control() in tinyusb usbd.c
|
||||
raise RuntimeError(f"Unexpected control request type {request[0]:#x}")
|
||||
|
||||
# Expecting any of the following possible replies from
|
||||
# on_NNN_control_xfer():
|
||||
#
|
||||
# True - Continue transfer, no data
|
||||
# False - STALL transfer
|
||||
# Object with buffer interface - submit this data for the control transfer
|
||||
return result
|
||||
|
||||
|
||||
class Interface:
|
||||
# Abstract base class to implement USB Interface (and associated endpoints),
|
||||
# or a collection of USB Interfaces, in Python
|
||||
#
|
||||
# (Despite the name an object of type Interface can represent multiple
|
||||
# associated interfaces, with or without an Interface Association Descriptor
|
||||
# prepended to them. Override num_itfs() if assigning >1 USB interface.)
|
||||
|
||||
def __init__(self):
|
||||
self._open = False
|
||||
|
||||
def desc_cfg(self, desc, itf_num, ep_num, strs):
|
||||
# Function to build configuration descriptor contents for this interface
|
||||
# or group of interfaces. This is called on each interface from
|
||||
# USBDevice.init().
|
||||
#
|
||||
# This function should insert:
|
||||
#
|
||||
# - At least one standard Interface descriptor (can call
|
||||
# - desc.interface()).
|
||||
#
|
||||
# Plus, optionally:
|
||||
#
|
||||
# - One or more endpoint descriptors (can call desc.endpoint()).
|
||||
# - An Interface Association Descriptor, prepended before.
|
||||
# - Other class-specific configuration descriptor data.
|
||||
#
|
||||
# This function is called twice per call to USBDevice.init(). The first
|
||||
# time the values of all arguments are dummies that are used only to
|
||||
# calculate the total length of the descriptor. Therefore, anything this
|
||||
# function does should be idempotent and it should add the same
|
||||
# descriptors each time. If saving interface numbers or endpoint numbers
|
||||
# for later
|
||||
#
|
||||
# Parameters:
|
||||
#
|
||||
# - desc - Descriptor helper to write the configuration descriptor bytes into.
|
||||
# The first time this function is called 'desc' is a dummy object
|
||||
# with no backing buffer (exists to count the number of bytes needed).
|
||||
#
|
||||
# - itf_num - First bNumInterfaces value to assign. The descriptor
|
||||
# should contain the same number of interfaces returned by num_itfs(),
|
||||
# starting from this value.
|
||||
#
|
||||
# - ep_num - Address of the first available endpoint number to use for
|
||||
# endpoint descriptor addresses. Subclasses should save the
|
||||
# endpoint addresses selected, to look up later (although note the first
|
||||
# time this function is called, the values will be dummies.)
|
||||
#
|
||||
# - strs - list of string descriptors for this USB device. This function
|
||||
# can append to this list, and then insert the index of the new string
|
||||
# in the list into the configuration descriptor.
|
||||
raise NotImplementedError
|
||||
|
||||
def num_itfs(self):
|
||||
# Return the number of actual USB Interfaces represented by this object
|
||||
# (as set in desc_cfg().)
|
||||
#
|
||||
# Only needs to be overriden if implementing a Interface class that
|
||||
# represents more than one USB Interface descriptor (i.e. MIDI), or an
|
||||
# Interface Association Descriptor (i.e. USB-CDC).
|
||||
return 1
|
||||
|
||||
def num_eps(self):
|
||||
# Return the number of USB Endpoint numbers represented by this object
|
||||
# (as set in desc_cfg().)
|
||||
#
|
||||
# Note for each count returned by this function, the interface may
|
||||
# choose to have both an IN and OUT endpoint (i.e. IN flag is not
|
||||
# considered a value here.)
|
||||
#
|
||||
# This value can be zero, if the USB Host only communicates with this
|
||||
# interface using control transfers.
|
||||
return 0
|
||||
|
||||
def on_open(self):
|
||||
# Callback called when the USB host accepts the device configuration.
|
||||
#
|
||||
# Override this function to initiate any operations that the USB interface
|
||||
# should do when the USB device is configured to the host.
|
||||
self._open = True
|
||||
|
||||
def on_reset(self):
|
||||
# Callback called on every registered interface when the USB device is
|
||||
# reset by the host. This can happen when the USB device is unplugged,
|
||||
# or if the host triggers a reset for some other reason.
|
||||
#
|
||||
# Override this function to cancel any pending operations specific to
|
||||
# the interface (outstanding USB transfers are already cancelled).
|
||||
#
|
||||
# At this point, no USB functionality is available - on_open() will
|
||||
# be called later if/when the USB host re-enumerates and configures the
|
||||
# interface.
|
||||
self._open = False
|
||||
|
||||
def is_open(self):
|
||||
# Returns True if the interface has been configured by the host and is in
|
||||
# active use.
|
||||
return self._open
|
||||
|
||||
def on_device_control_xfer(self, stage, request):
|
||||
# Control transfer callback. Override to handle a non-standard device
|
||||
# control transfer where bmRequestType Recipient is Device, Type is
|
||||
# utils.REQ_TYPE_CLASS, and the lower byte of wIndex indicates this interface.
|
||||
#
|
||||
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
|
||||
#
|
||||
# This particular request type seems pretty uncommon for a device class
|
||||
# driver to need to handle, most hosts will not send this so most
|
||||
# implementations won't need to override it.
|
||||
#
|
||||
# Parameters:
|
||||
#
|
||||
# - stage is one of utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK.
|
||||
#
|
||||
# - request is a memoryview into a USB request packet, as per USB 2.0
|
||||
# specification 9.3 USB Device Requests, p250. the memoryview is only
|
||||
# valid while the callback is running.
|
||||
#
|
||||
# The function can call split_bmRequestType(request[0]) to split
|
||||
# bmRequestType into (Recipient, Type, Direction).
|
||||
#
|
||||
# Result, any of:
|
||||
#
|
||||
# - True to continue the request, False to STALL the endpoint.
|
||||
# - Buffer interface object to provide a buffer to the host as part of the
|
||||
# transfer, if applicable.
|
||||
return False
|
||||
|
||||
def on_interface_control_xfer(self, stage, request):
|
||||
# Control transfer callback. Override to handle a device control
|
||||
# transfer where bmRequestType Recipient is Interface, and the lower byte
|
||||
# of wIndex indicates this interface.
|
||||
#
|
||||
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
|
||||
#
|
||||
# bmRequestType Type field may have different values. It's not necessary
|
||||
# to handle the mandatory Standard requests (bmRequestType Type ==
|
||||
# utils.REQ_TYPE_STANDARD), if the driver returns False in these cases then
|
||||
# TinyUSB will provide the necessary responses.
|
||||
#
|
||||
# See on_device_control_xfer() for a description of the arguments and
|
||||
# possible return values.
|
||||
return False
|
||||
|
||||
def on_endpoint_control_xfer(self, stage, request):
|
||||
# Control transfer callback. Override to handle a device
|
||||
# control transfer where bmRequestType Recipient is Endpoint and
|
||||
# the lower byte of wIndex indicates an endpoint address associated
|
||||
# with this interface.
|
||||
#
|
||||
# bmRequestType Type will generally have any value except
|
||||
# utils.REQ_TYPE_STANDARD, as Standard endpoint requests are handled by
|
||||
# TinyUSB. The exception is the the Standard "Set Feature" request. This
|
||||
# is handled by Tiny USB but also passed through to the driver in case it
|
||||
# needs to change any internal state, but most drivers can ignore and
|
||||
# return False in this case.
|
||||
#
|
||||
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
|
||||
#
|
||||
# See on_device_control_xfer() for a description of the parameters and
|
||||
# possible return values.
|
||||
return False
|
||||
|
||||
def xfer_pending(self, ep_addr):
|
||||
# Return True if a transfer is already pending on ep_addr.
|
||||
#
|
||||
# Only one transfer can be submitted at a time.
|
||||
return _dev and bool(_dev._ep_cbs[ep_addr])
|
||||
|
||||
def submit_xfer(self, ep_addr, data, done_cb=None):
|
||||
# Submit a USB transfer (of any type except control)
|
||||
#
|
||||
# Parameters:
|
||||
#
|
||||
# - ep_addr. Address of the endpoint to submit the transfer on. Caller is
|
||||
# responsible for ensuring that ep_addr is correct and belongs to this
|
||||
# interface. Only one transfer can be active at a time on each endpoint.
|
||||
#
|
||||
# - data. Buffer containing data to send, or for data to be read into
|
||||
# (depending on endpoint direction).
|
||||
#
|
||||
# - done_cb. Optional callback function for when the transfer
|
||||
# completes. The callback is called with arguments (ep_addr, result,
|
||||
# xferred_bytes) where result is one of xfer_result_t enum (see top of
|
||||
# this file), and xferred_bytes is an integer.
|
||||
#
|
||||
# If the function returns, the transfer is queued.
|
||||
#
|
||||
# The function will raise RuntimeError under the following conditions:
|
||||
#
|
||||
# - The interface is not "open" (i.e. has not been enumerated and configured
|
||||
# by the host yet.)
|
||||
#
|
||||
# - A transfer is already pending on this endpoint (use xfer_pending() to check
|
||||
# before sending if needed.)
|
||||
#
|
||||
# - A DCD error occurred when queueing the transfer on the hardware.
|
||||
#
|
||||
#
|
||||
# Will raise TypeError if 'data' isn't he correct type of buffer for the
|
||||
# endpoint transfer direction.
|
||||
#
|
||||
# Note that done_cb may be called immediately, possibly before this
|
||||
# function has returned to the caller.
|
||||
if not self._open:
|
||||
raise RuntimeError("Not open")
|
||||
_dev._submit_xfer(ep_addr, data, done_cb)
|
||||
|
||||
def stall(self, ep_addr, *args):
|
||||
# Set or get the endpoint STALL state.
|
||||
#
|
||||
# To get endpoint stall stage, call with a single argument.
|
||||
# To set endpoint stall state, call with an additional boolean
|
||||
# argument to set or clear.
|
||||
#
|
||||
# Generally endpoint STALL is handled automatically, but there are some
|
||||
# device classes that need to explicitly stall or unstall an endpoint
|
||||
# under certain conditions.
|
||||
if not self._open or ep_addr not in self._eps:
|
||||
raise RuntimeError
|
||||
_dev._usbd.stall(ep_addr, *args)
|
||||
|
||||
|
||||
class Descriptor:
|
||||
# Wrapper class for writing a descriptor in-place into a provided buffer
|
||||
#
|
||||
# Doesn't resize the buffer.
|
||||
#
|
||||
# Can be initialised with b=None to perform a dummy pass that calculates the
|
||||
# length needed for the buffer.
|
||||
def __init__(self, b):
|
||||
self.b = b
|
||||
self.o = 0 # offset of data written to the buffer
|
||||
|
||||
def pack(self, fmt, *args):
|
||||
# Utility function to pack new data into the descriptor
|
||||
# buffer, starting at the current offset.
|
||||
#
|
||||
# Arguments are the same as struct.pack(), but it fills the
|
||||
# pre-allocated descriptor buffer (growing if needed), instead of
|
||||
# returning anything.
|
||||
self.pack_into(fmt, self.o, *args)
|
||||
|
||||
def pack_into(self, fmt, offs, *args):
|
||||
# Utility function to pack new data into the descriptor at offset 'offs'.
|
||||
#
|
||||
# If the data written is before 'offs' then self.o isn't incremented,
|
||||
# otherwise it's incremented to point at the end of the written data.
|
||||
end = offs + struct.calcsize(fmt)
|
||||
if self.b:
|
||||
struct.pack_into(fmt, self.b, offs, *args)
|
||||
self.o = max(self.o, end)
|
||||
|
||||
def extend(self, a):
|
||||
# Extend the descriptor with some bytes-like data
|
||||
if self.b:
|
||||
self.b[self.o : self.o + len(a)] = a
|
||||
self.o += len(a)
|
||||
|
||||
# TODO: At the moment many of these arguments are named the same as the relevant field
|
||||
# in the spec, as this is easier to understand. Can save some code size by collapsing them
|
||||
# down.
|
||||
|
||||
def interface(
|
||||
self,
|
||||
bInterfaceNumber,
|
||||
bNumEndpoints,
|
||||
bInterfaceClass=_INTERFACE_CLASS_VENDOR,
|
||||
bInterfaceSubClass=_INTERFACE_SUBCLASS_NONE,
|
||||
bInterfaceProtocol=_PROTOCOL_NONE,
|
||||
iInterface=0,
|
||||
):
|
||||
# Utility function to append a standard Interface descriptor, with
|
||||
# the properties specified in the parameter list.
|
||||
#
|
||||
# Defaults for bInterfaceClass, SubClass and Protocol are a "vendor"
|
||||
# device.
|
||||
#
|
||||
# Note that iInterface is a string index number. If set, it should be set
|
||||
# by the caller Interface to the result of self._get_str_index(s),
|
||||
# where 's' is a string found in self.strs.
|
||||
self.pack(
|
||||
"BBBBBBBBB",
|
||||
_STD_DESC_INTERFACE_LEN, # bLength
|
||||
_STD_DESC_INTERFACE_TYPE, # bDescriptorType
|
||||
bInterfaceNumber,
|
||||
0, # bAlternateSetting, not currently supported
|
||||
bNumEndpoints,
|
||||
bInterfaceClass,
|
||||
bInterfaceSubClass,
|
||||
bInterfaceProtocol,
|
||||
iInterface,
|
||||
)
|
||||
|
||||
def endpoint(self, bEndpointAddress, bmAttributes, wMaxPacketSize, bInterval=1):
|
||||
# Utility function to append a standard Endpoint descriptor, with
|
||||
# the properties specified in the parameter list.
|
||||
#
|
||||
# See USB 2.0 specification section 9.6.6 Endpoint p269
|
||||
#
|
||||
# As well as a numeric value, bmAttributes can be a string value to represent
|
||||
# common endpoint types: "control", "bulk", "interrupt".
|
||||
if bmAttributes == "control":
|
||||
bmAttributes = 0
|
||||
elif bmAttributes == "bulk":
|
||||
bmAttributes = 2
|
||||
elif bmAttributes == "interrupt":
|
||||
bmAttributes = 3
|
||||
|
||||
self.pack(
|
||||
"<BBBBHB",
|
||||
_STD_DESC_ENDPOINT_LEN,
|
||||
_STD_DESC_ENDPOINT_TYPE,
|
||||
bEndpointAddress,
|
||||
bmAttributes,
|
||||
wMaxPacketSize,
|
||||
bInterval,
|
||||
)
|
||||
|
||||
def interface_assoc(
|
||||
self,
|
||||
bFirstInterface,
|
||||
bInterfaceCount,
|
||||
bFunctionClass,
|
||||
bFunctionSubClass,
|
||||
bFunctionProtocol=_PROTOCOL_NONE,
|
||||
iFunction=0,
|
||||
):
|
||||
# Utility function to append an Interface Association descriptor,
|
||||
# with the properties specified in the parameter list.
|
||||
#
|
||||
# See USB ECN: Interface Association Descriptor.
|
||||
self.pack(
|
||||
"<BBBBBBBB",
|
||||
8,
|
||||
_ITF_ASSOCIATION_DESC_TYPE,
|
||||
bFirstInterface,
|
||||
bInterfaceCount,
|
||||
bFunctionClass,
|
||||
bFunctionSubClass,
|
||||
bFunctionProtocol,
|
||||
iFunction,
|
||||
)
|
||||
|
||||
|
||||
def split_bmRequestType(bmRequestType):
|
||||
# Utility function to split control transfer field bmRequestType into a tuple of 3 fields:
|
||||
#
|
||||
# Recipient
|
||||
# Type
|
||||
# Data transfer direction
|
||||
#
|
||||
# See USB 2.0 specification section 9.3 USB Device Requests and 9.3.1 bmRequestType, p248.
|
||||
return (
|
||||
bmRequestType & 0x1F,
|
||||
(bmRequestType >> 5) & 0x03,
|
||||
(bmRequestType >> 7) & 0x01,
|
||||
)
|
||||
|
||||
|
||||
class Buffer:
|
||||
# An interrupt-safe producer/consumer buffer that wraps a bytearray object.
|
||||
#
|
||||
# Kind of like a ring buffer, but supports the idea of returning a
|
||||
# memoryview for either read or write of multiple bytes (suitable for
|
||||
# passing to a buffer function without needing to allocate another buffer to
|
||||
# read into.)
|
||||
#
|
||||
# Consumer can call pend_read() to get a memoryview to read from, and then
|
||||
# finish_read(n) when done to indicate it read 'n' bytes from the
|
||||
# memoryview. There is also a readinto() convenience function.
|
||||
#
|
||||
# Producer must call pend_write() to get a memorybuffer to write into, and
|
||||
# then finish_write(n) when done to indicate it wrote 'n' bytes into the
|
||||
# memoryview. There is also a normal write() convenience function.
|
||||
#
|
||||
# - Only one producer and one consumer is supported.
|
||||
#
|
||||
# - Calling pend_read() and pend_write() is effectively idempotent, they can be
|
||||
# called more than once without a corresponding finish_x() call if necessary
|
||||
# (provided only one thread does this, as per the previous point.)
|
||||
#
|
||||
# - Calling finish_write() and finish_read() is hard interrupt safe (does
|
||||
# not allocate). pend_read() and pend_write() each allocate 1 block for
|
||||
# the memoryview that is returned.
|
||||
#
|
||||
# The buffer contents are always laid out as:
|
||||
#
|
||||
# - Slice [:_n] = bytes of valid data waiting to read
|
||||
# - Slice [_n:_w] = unused space
|
||||
# - Slice [_w:] = bytes of pending write buffer waiting to be written
|
||||
#
|
||||
# This buffer should be fast when most reads and writes are balanced and use
|
||||
# the whole buffer. When this doesn't happen, performance degrades to
|
||||
# approximate a Python-based single byte ringbuffer.
|
||||
#
|
||||
def __init__(self, length):
|
||||
self._b = memoryview(bytearray(length))
|
||||
# number of bytes in buffer read to read, starting at index 0. Updated
|
||||
# by both producer & consumer.
|
||||
self._n = 0
|
||||
# start index of a pending write into the buffer, if any. equals
|
||||
# len(self._b) if no write is pending. Updated by producer only.
|
||||
self._w = length
|
||||
|
||||
def writable(self):
|
||||
# Number of writable bytes in the buffer. Assumes no pending write is outstanding.
|
||||
return len(self._b) - self._n
|
||||
|
||||
def readable(self):
|
||||
# Number of readable bytes in the buffer. Assumes no pending read is outstanding.
|
||||
return self._n
|
||||
|
||||
def pend_write(self, wmax=None):
|
||||
# Returns a memoryview that the producer can write bytes into.
|
||||
# start the write at self._n, the end of data waiting to read
|
||||
#
|
||||
# If wmax is set then the memoryview is pre-sliced to be at most
|
||||
# this many bytes long.
|
||||
#
|
||||
# (No critical section needed as self._w is only updated by the producer.)
|
||||
self._w = self._n
|
||||
end = (self._w + wmax) if wmax else len(self._b)
|
||||
return self._b[self._w : end]
|
||||
|
||||
def finish_write(self, nbytes):
|
||||
# Called by the producer to indicate it wrote nbytes into the buffer.
|
||||
ist = machine.disable_irq()
|
||||
try:
|
||||
assert nbytes <= len(self._b) - self._w # can't say we wrote more than was pended
|
||||
if self._n == self._w:
|
||||
# no data was read while the write was happening, so the buffer is already in place
|
||||
# (this is the fast path)
|
||||
self._n += nbytes
|
||||
else:
|
||||
# Slow path: data was read while the write was happening, so
|
||||
# shuffle the newly written bytes back towards index 0 to avoid fragmentation
|
||||
#
|
||||
# As this updates self._n we have to do it in the critical
|
||||
# section, so do it byte by byte to avoid allocating.
|
||||
while nbytes > 0:
|
||||
self._b[self._n] = self._b[self._w]
|
||||
self._n += 1
|
||||
self._w += 1
|
||||
nbytes -= 1
|
||||
|
||||
self._w = len(self._b)
|
||||
finally:
|
||||
machine.enable_irq(ist)
|
||||
|
||||
def write(self, w):
|
||||
# Helper method for the producer to write into the buffer in one call
|
||||
pw = self.pend_write()
|
||||
to_w = min(len(w), len(pw))
|
||||
if to_w:
|
||||
pw[:to_w] = w[:to_w]
|
||||
self.finish_write(to_w)
|
||||
return to_w
|
||||
|
||||
def pend_read(self):
|
||||
# Return a memoryview slice that the consumer can read bytes from
|
||||
return self._b[: self._n]
|
||||
|
||||
def finish_read(self, nbytes):
|
||||
# Called by the consumer to indicate it read nbytes from the buffer.
|
||||
if not nbytes:
|
||||
return
|
||||
ist = machine.disable_irq()
|
||||
try:
|
||||
assert nbytes <= self._n # can't say we read more than was available
|
||||
i = 0
|
||||
self._n -= nbytes
|
||||
while i < self._n:
|
||||
# consumer only read part of the buffer, so shuffle remaining
|
||||
# read data back towards index 0 to avoid fragmentation
|
||||
self._b[i] = self._b[i + nbytes]
|
||||
i += 1
|
||||
finally:
|
||||
machine.enable_irq(ist)
|
||||
|
||||
def readinto(self, b):
|
||||
# Helper method for the consumer to read out of the buffer in one call
|
||||
pr = self.pend_read()
|
||||
to_r = min(len(pr), len(b))
|
||||
if to_r:
|
||||
b[:to_r] = pr[:to_r]
|
||||
self.finish_read(to_r)
|
||||
return to_r
|
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.1")
|
||||
|
||||
module("typing.py")
|
|
@ -0,0 +1,24 @@
|
|||
# This doesn't quite test everything but just serves to verify that basic syntax works,
|
||||
# which for MicroPython means everything typing-related should be ignored.
|
||||
|
||||
from typing import *
|
||||
|
||||
MyAlias = str
|
||||
Vector = List[float]
|
||||
Nested = Iterable[Tuple[int, ...]]
|
||||
UserId = NewType("UserId", int)
|
||||
T = TypeVar("T", int, float, complex)
|
||||
|
||||
hintedGlobal: Any = None
|
||||
|
||||
|
||||
def func_with_hints(c: int, b: MyAlias, a: Union[int, None], lst: List[float] = [0.0]) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
class ClassWithHints(Generic[T]):
|
||||
|
||||
a: int = 0
|
||||
|
||||
def foo(self, other: int) -> None:
|
||||
pass
|
|
@ -0,0 +1,154 @@
|
|||
def cast(type, val):
|
||||
return val
|
||||
|
||||
|
||||
def get_origin(type):
|
||||
return None
|
||||
|
||||
|
||||
def get_args(type):
|
||||
return ()
|
||||
|
||||
|
||||
def no_type_check(arg):
|
||||
return arg
|
||||
|
||||
|
||||
def overload(func):
|
||||
return None
|
||||
|
||||
|
||||
class _AnyCall:
|
||||
def __init__(*args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(*args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
_anyCall = _AnyCall()
|
||||
|
||||
|
||||
class _SubscriptableType:
|
||||
def __getitem__(self, arg):
|
||||
return _anyCall
|
||||
|
||||
|
||||
_Subscriptable = _SubscriptableType()
|
||||
|
||||
|
||||
def TypeVar(type, *types):
|
||||
return None
|
||||
|
||||
|
||||
def NewType(name, type):
|
||||
return type
|
||||
|
||||
|
||||
class Any:
|
||||
pass
|
||||
|
||||
|
||||
class BinaryIO:
|
||||
pass
|
||||
|
||||
|
||||
class ClassVar:
|
||||
pass
|
||||
|
||||
|
||||
class Final:
|
||||
pass
|
||||
|
||||
|
||||
class Hashable:
|
||||
pass
|
||||
|
||||
|
||||
class IO:
|
||||
pass
|
||||
|
||||
|
||||
class NoReturn:
|
||||
pass
|
||||
|
||||
|
||||
class Sized:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsInt:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsFloat:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsComplex:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsBytes:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsIndex:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsAbs:
|
||||
pass
|
||||
|
||||
|
||||
class SupportsRound:
|
||||
pass
|
||||
|
||||
|
||||
class TextIO:
|
||||
pass
|
||||
|
||||
|
||||
AnyStr = str
|
||||
Text = str
|
||||
Pattern = str
|
||||
Match = str
|
||||
TypedDict = dict
|
||||
|
||||
AbstractSet = _Subscriptable
|
||||
AsyncContextManager = _Subscriptable
|
||||
AsyncGenerator = _Subscriptable
|
||||
AsyncIterable = _Subscriptable
|
||||
AsyncIterator = _Subscriptable
|
||||
Awaitable = _Subscriptable
|
||||
Callable = _Subscriptable
|
||||
ChainMap = _Subscriptable
|
||||
Collection = _Subscriptable
|
||||
Container = _Subscriptable
|
||||
ContextManager = _Subscriptable
|
||||
Coroutine = _Subscriptable
|
||||
Counter = _Subscriptable
|
||||
DefaultDict = _Subscriptable
|
||||
Deque = _Subscriptable
|
||||
Dict = _Subscriptable
|
||||
FrozenSet = _Subscriptable
|
||||
Generator = _Subscriptable
|
||||
Generic = _Subscriptable
|
||||
Iterable = _Subscriptable
|
||||
Iterator = _Subscriptable
|
||||
List = _Subscriptable
|
||||
Literal = _Subscriptable
|
||||
Mapping = _Subscriptable
|
||||
MutableMapping = _Subscriptable
|
||||
MutableSequence = _Subscriptable
|
||||
MutableSet = _Subscriptable
|
||||
NamedTuple = _Subscriptable
|
||||
Optional = _Subscriptable
|
||||
OrderedDict = _Subscriptable
|
||||
Sequence = _Subscriptable
|
||||
Set = _Subscriptable
|
||||
Tuple = _Subscriptable
|
||||
Type = _Subscriptable
|
||||
Union = _Subscriptable
|
||||
|
||||
TYPE_CHECKING = False
|
Ładowanie…
Reference in New Issue