HLammers 2025-07-05 17:38:13 +02:00 zatwierdzone przez GitHub
commit 8a924e51ab
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
1 zmienionych plików z 24 dodań i 15 usunięć

Wyświetl plik

@ -3,10 +3,11 @@
# These contain the classes and utilities that are needed to
# implement a USB device, not any complete USB drivers.
#
# MIT license; Copyright (c) 2022-2024 Angus Gratton
# MIT license; Copyright (c) 2022-2024 Angus Gratton, 2025 Harm Lammers
from micropython import const
import machine
import struct
import time
try:
from _thread import get_ident
@ -108,7 +109,6 @@ class _Device:
device_class=0,
device_subclass=0,
device_protocol=0,
config_str=None,
max_power_ma=None,
remote_wakeup=False,
):
@ -166,7 +166,11 @@ class _Device:
# Keep track of the interface and endpoint indexes
itf_num = builtin_driver.itf_max
ep_num = max(builtin_driver.ep_max, 1) # Endpoint 0 always reserved for control
while len(strs) < builtin_driver.str_max:
while (
len(strs) < builtin_driver.str_max - 1
): # This is possibly unnecessary or wrong because
# https://docs.micropython.org/en/latest/library/machine.USBDevice.html
# states all string values except index 0 should be plain ASCII
strs.append(None) # Reserve other string indexes used by builtin drivers
initial_cfg = builtin_driver.desc_cfg or (b"\x00" * _STD_DESC_CONFIG_LEN)
@ -204,10 +208,11 @@ class _Device:
)
# Configuration string is optional but supported
iConfiguration = 0
if configuration_str:
iConfiguration = len(strs)
strs.append(configuration_str)
else:
iConfiguration = 0
if max_power_ma is not None:
# Convert from mA to the units used in the descriptor
@ -665,6 +670,7 @@ class Descriptor:
bInterfaceSubClass=_INTERFACE_SUBCLASS_NONE,
bInterfaceProtocol=_PROTOCOL_NONE,
iInterface=0,
bAlternateSetting=0,
):
# Utility function to append a standard Interface descriptor, with
# the properties specified in the parameter list.
@ -680,7 +686,7 @@ class Descriptor:
_STD_DESC_INTERFACE_LEN, # bLength
_STD_DESC_INTERFACE_TYPE, # bDescriptorType
bInterfaceNumber,
0, # bAlternateSetting, not currently supported
bAlternateSetting,
bNumEndpoints,
bInterfaceClass,
bInterfaceSubClass,
@ -791,17 +797,18 @@ class Buffer:
# approximate a Python-based single byte ringbuffer.
#
def __init__(self, length):
self._l = length
self._b = memoryview(bytearray(length))
# number of bytes in buffer read to read, starting at index 0. Updated
# number of bytes in buffer ready to read, starting at index 0. Updated
# by both producer & consumer.
self._n = 0
# start index of a pending write into the buffer, if any. equals
# start index of a pending write into the buffer, if any. Equals
# len(self._b) if no write is pending. Updated by producer only.
self._w = length
def writable(self):
# Number of writable bytes in the buffer. Assumes no pending write is outstanding.
return len(self._b) - self._n
return self._l - self._n
def readable(self):
# Number of readable bytes in the buffer. Assumes no pending read is outstanding.
@ -815,15 +822,15 @@ class Buffer:
# this many bytes long.
#
# (No critical section needed as self._w is only updated by the producer.)
self._w = self._n
end = (self._w + wmax) if wmax else len(self._b)
return self._b[self._w : end]
self._w = (_w := self._n)
end = (_w + wmax) if wmax else self._l
return self._b[_w:end]
def finish_write(self, nbytes):
# Called by the producer to indicate it wrote nbytes into the buffer.
ist = machine.disable_irq()
try:
assert nbytes <= len(self._b) - self._w # can't say we wrote more than was pended
assert nbytes <= self._l - self._w # can't say we wrote more than was pended
if self._n == self._w:
# no data was read while the write was happening, so the buffer is already in place
# (this is the fast path)
@ -834,13 +841,14 @@ class Buffer:
#
# As this updates self._n we have to do it in the critical
# section, so do it byte by byte to avoid allocating.
_b = self._b
while nbytes > 0:
self._b[self._n] = self._b[self._w]
_b[self._n] = _b[self._w]
self._n += 1
self._w += 1
nbytes -= 1
self._w = len(self._b)
self._w = self._l
finally:
machine.enable_irq(ist)
@ -866,10 +874,11 @@ class Buffer:
assert nbytes <= self._n # can't say we read more than was available
i = 0
self._n -= nbytes
_b = self._b
while i < self._n:
# consumer only read part of the buffer, so shuffle remaining
# read data back towards index 0 to avoid fragmentation
self._b[i] = self._b[i + nbytes]
_b[i] = _b[i + nbytes]
i += 1
finally:
machine.enable_irq(ist)