Merge pull request #2 from micropython/master

Update
pull/570/head
Patrick Joy 2024-01-05 22:37:30 +11:00 zatwierdzone przez GitHub
commit 06592f8246
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
69 zmienionych plików z 1893 dodań i 320 usunięć

Wyświetl plik

@ -1,16 +0,0 @@
name: Check code formatting
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- name: Install packages
run: source tools/ci.sh && ci_code_formatting_setup
- name: Run code formatting
run: source tools/ci.sh && ci_code_formatting_run
- name: Check code formatting
run: git diff --exit-code

Wyświetl plik

@ -0,0 +1,18 @@
name: Check commit message formatting
on: [push, pull_request]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: '100'
- uses: actions/setup-python@v4
- name: Check commit message formatting
run: source tools/ci.sh && ci_commit_formatting_run

Wyświetl plik

@ -1,10 +1,11 @@
# https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python # https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python
name: Python code lint with ruff name: Python code lint and formatting with ruff
on: [push, pull_request] on: [push, pull_request]
jobs: jobs:
ruff: ruff:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- run: pip install --user ruff - run: pip install --user ruff==0.1.2
- run: ruff --format=github . - run: ruff check --output-format=github .
- run: ruff format --diff .

Wyświetl plik

@ -1,11 +1,14 @@
repos: repos:
- repo: local - repo: local
hooks: hooks:
- id: codeformat - id: verifygitlog
name: MicroPython codeformat.py for changed files name: MicroPython git commit message format checker
entry: tools/codeformat.py -v -f entry: tools/verifygitlog.py --check-file --ignore-rebase
language: python language: python
verbose: true
stages: [commit-msg]
- repo: https://github.com/charliermarsh/ruff-pre-commit - repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.280 rev: v0.1.2
hooks: hooks:
- id: ruff - id: ruff
id: ruff-format

Wyświetl plik

@ -21,7 +21,7 @@ To use this library, you need to import the library and then start the REPL task
For example, in main.py: For example, in main.py:
```py ```py
import uasyncio as asyncio import asyncio
import aiorepl import aiorepl
async def demo(): async def demo():

Wyświetl plik

@ -1,10 +1,11 @@
# MIT license; Copyright (c) 2022 Jim Mussared # MIT license; Copyright (c) 2022 Jim Mussared
import micropython import micropython
from micropython import const
import re import re
import sys import sys
import time import time
import uasyncio as asyncio import asyncio
# Import statement (needs to be global, and does not return). # Import statement (needs to be global, and does not return).
_RE_IMPORT = re.compile("^import ([^ ]+)( as ([^ ]+))?") _RE_IMPORT = re.compile("^import ([^ ]+)( as ([^ ]+))?")
@ -18,6 +19,13 @@ _RE_ASSIGN = re.compile("[^=]=[^=]")
_HISTORY_LIMIT = const(5 + 1) _HISTORY_LIMIT = const(5 + 1)
CHAR_CTRL_A = const(1)
CHAR_CTRL_B = const(2)
CHAR_CTRL_C = const(3)
CHAR_CTRL_D = const(4)
CHAR_CTRL_E = const(5)
async def execute(code, g, s): async def execute(code, g, s):
if not code.strip(): if not code.strip():
return return
@ -38,13 +46,11 @@ async def __code():
{} {}
__exec_task = asyncio.create_task(__code()) __exec_task = asyncio.create_task(__code())
""".format( """.format(code)
code
)
async def kbd_intr_task(exec_task, s): async def kbd_intr_task(exec_task, s):
while True: while True:
if ord(await s.read(1)) == 0x03: if ord(await s.read(1)) == CHAR_CTRL_C:
exec_task.cancel() exec_task.cancel()
return return
@ -103,7 +109,9 @@ async def task(g=None, prompt="--> "):
while True: while True:
hist_b = 0 # How far back in the history are we currently. hist_b = 0 # How far back in the history are we currently.
sys.stdout.write(prompt) sys.stdout.write(prompt)
cmd = "" cmd: str = ""
paste = False
curs = 0 # cursor offset from end of cmd buffer
while True: while True:
b = await s.read(1) b = await s.read(1)
pc = c # save previous character pc = c # save previous character
@ -113,11 +121,19 @@ async def task(g=None, prompt="--> "):
if c < 0x20 or c > 0x7E: if c < 0x20 or c > 0x7E:
if c == 0x0A: if c == 0x0A:
# LF # LF
if paste:
sys.stdout.write(b)
cmd += b
continue
# If the previous character was also LF, and was less # If the previous character was also LF, and was less
# than 20 ms ago, this was likely due to CRLF->LFLF # than 20 ms ago, this was likely due to CRLF->LFLF
# conversion, so ignore this linefeed. # conversion, so ignore this linefeed.
if pc == 0x0A and time.ticks_diff(t, pt) < 20: if pc == 0x0A and time.ticks_diff(t, pt) < 20:
continue continue
if curs:
# move cursor to end of the line
sys.stdout.write("\x1B[{}C".format(curs))
curs = 0
sys.stdout.write("\n") sys.stdout.write("\n")
if cmd: if cmd:
# Push current command. # Push current command.
@ -134,31 +150,45 @@ async def task(g=None, prompt="--> "):
elif c == 0x08 or c == 0x7F: elif c == 0x08 or c == 0x7F:
# Backspace. # Backspace.
if cmd: if cmd:
if curs:
cmd = "".join((cmd[: -curs - 1], cmd[-curs:]))
sys.stdout.write(
"\x08\x1B[K"
) # move cursor back, erase to end of line
sys.stdout.write(cmd[-curs:]) # redraw line
sys.stdout.write("\x1B[{}D".format(curs)) # reset cursor location
else:
cmd = cmd[:-1] cmd = cmd[:-1]
sys.stdout.write("\x08 \x08") sys.stdout.write("\x08 \x08")
elif c == 0x02: elif c == CHAR_CTRL_A:
# Ctrl-B await raw_repl(s, g)
break
elif c == CHAR_CTRL_B:
continue continue
elif c == 0x03: elif c == CHAR_CTRL_C:
# Ctrl-C if paste:
if pc == 0x03 and time.ticks_diff(t, pt) < 20: break
# Two very quick Ctrl-C (faster than a human
# typing) likely means mpremote trying to
# escape.
asyncio.new_event_loop()
return
sys.stdout.write("\n") sys.stdout.write("\n")
break break
elif c == 0x04: elif c == CHAR_CTRL_D:
# Ctrl-D if paste:
result = await execute(cmd, g, s)
if result is not None:
sys.stdout.write(repr(result))
sys.stdout.write("\n")
break
sys.stdout.write("\n") sys.stdout.write("\n")
# Shutdown asyncio. # Shutdown asyncio.
asyncio.new_event_loop() asyncio.new_event_loop()
return return
elif c == CHAR_CTRL_E:
sys.stdout.write("paste mode; Ctrl-C to cancel, Ctrl-D to finish\n===\n")
paste = True
elif c == 0x1B: elif c == 0x1B:
# Start of escape sequence. # Start of escape sequence.
key = await s.read(2) key = await s.read(2)
if key in ("[A", "[B"): if key in ("[A", "[B"): # up, down
# Stash the current command. # Stash the current command.
hist[(hist_i - hist_b) % _HISTORY_LIMIT] = cmd hist[(hist_i - hist_b) % _HISTORY_LIMIT] = cmd
# Clear current command. # Clear current command.
@ -174,12 +204,122 @@ async def task(g=None, prompt="--> "):
# Update current command. # Update current command.
cmd = hist[(hist_i - hist_b) % _HISTORY_LIMIT] cmd = hist[(hist_i - hist_b) % _HISTORY_LIMIT]
sys.stdout.write(cmd) sys.stdout.write(cmd)
elif key == "[D": # left
if curs < len(cmd) - 1:
curs += 1
sys.stdout.write("\x1B")
sys.stdout.write(key)
elif key == "[C": # right
if curs:
curs -= 1
sys.stdout.write("\x1B")
sys.stdout.write(key)
elif key == "[H": # home
pcurs = curs
curs = len(cmd)
sys.stdout.write("\x1B[{}D".format(curs - pcurs)) # move cursor left
elif key == "[F": # end
pcurs = curs
curs = 0
sys.stdout.write("\x1B[{}C".format(pcurs)) # move cursor right
else: else:
# sys.stdout.write("\\x") # sys.stdout.write("\\x")
# sys.stdout.write(hex(c)) # sys.stdout.write(hex(c))
pass pass
else:
if curs:
# inserting into middle of line
cmd = "".join((cmd[:-curs], b, cmd[-curs:]))
sys.stdout.write(cmd[-curs - 1 :]) # redraw line to end
sys.stdout.write("\x1B[{}D".format(curs)) # reset cursor location
else: else:
sys.stdout.write(b) sys.stdout.write(b)
cmd += b cmd += b
finally: finally:
micropython.kbd_intr(3) micropython.kbd_intr(3)
async def raw_paste(s, g, window=512):
sys.stdout.write("R\x01") # supported
sys.stdout.write(bytearray([window & 0xFF, window >> 8, 0x01]).decode())
eof = False
idx = 0
buff = bytearray(window)
file = b""
while not eof:
for idx in range(window):
b = await s.read(1)
c = ord(b)
if c == CHAR_CTRL_C or c == CHAR_CTRL_D:
# end of file
sys.stdout.write(chr(CHAR_CTRL_D))
if c == CHAR_CTRL_C:
raise KeyboardInterrupt
file += buff[:idx]
eof = True
break
buff[idx] = c
if not eof:
file += buff
sys.stdout.write("\x01") # indicate window available to host
return file
async def raw_repl(s: asyncio.StreamReader, g: dict):
heading = "raw REPL; CTRL-B to exit\n"
line = ""
sys.stdout.write(heading)
while True:
line = ""
sys.stdout.write(">")
while True:
b = await s.read(1)
c = ord(b)
if c == CHAR_CTRL_A:
rline = line
line = ""
if len(rline) == 2 and ord(rline[0]) == CHAR_CTRL_E:
if rline[1] == "A":
line = await raw_paste(s, g)
break
else:
# reset raw REPL
sys.stdout.write(heading)
sys.stdout.write(">")
continue
elif c == CHAR_CTRL_B:
# exit raw REPL
sys.stdout.write("\n")
return 0
elif c == CHAR_CTRL_C:
# clear line
line = ""
elif c == CHAR_CTRL_D:
# entry finished
# indicate reception of command
sys.stdout.write("OK")
break
else:
# let through any other raw 8-bit value
line += b
if len(line) == 0:
# Normally used to trigger soft-reset but stay in raw mode.
# Fake it for aiorepl / mpremote.
sys.stdout.write("Ignored: soft reboot\n")
sys.stdout.write(heading)
try:
result = exec(line, g)
if result is not None:
sys.stdout.write(repr(result))
sys.stdout.write(chr(CHAR_CTRL_D))
except Exception as ex:
print(line)
sys.stdout.write(chr(CHAR_CTRL_D))
sys.print_exception(ex, sys.stdout)
sys.stdout.write(chr(CHAR_CTRL_D))

Wyświetl plik

@ -1,5 +1,5 @@
metadata( metadata(
version="0.1.1", version="0.2.0",
description="Provides an asynchronous REPL that can run concurrently with an asyncio, also allowing await expressions.", description="Provides an asynchronous REPL that can run concurrently with an asyncio, also allowing await expressions.",
) )

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.3.0") metadata(version="0.4.1")
require("aioble-core") require("aioble-core")

Wyświetl plik

@ -70,7 +70,7 @@ Alternatively, install the `aioble` package, which will install everything.
Usage Usage
----- -----
Passive scan for nearby devices for 5 seconds: (Observer) #### Passive scan for nearby devices for 5 seconds: (Observer)
```py ```py
async with aioble.scan(duration_ms=5000) as scanner: async with aioble.scan(duration_ms=5000) as scanner:
@ -87,7 +87,7 @@ async with aioble.scan(duration_ms=5000, interval_us=30000, window_us=30000, act
print(result, result.name(), result.rssi, result.services()) print(result, result.name(), result.rssi, result.services())
``` ```
Connect to a peripheral device: (Central) #### Connect to a peripheral device: (Central)
```py ```py
# Either from scan result # Either from scan result
@ -101,14 +101,14 @@ except asyncio.TimeoutError:
print('Timeout') print('Timeout')
``` ```
Register services and wait for connection: (Peripheral, Server) #### Register services and wait for connection: (Peripheral, Server)
```py ```py
_ENV_SENSE_UUID = bluetooth.UUID(0x181A) _ENV_SENSE_UUID = bluetooth.UUID(0x181A)
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E) _ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
_GENERIC_THERMOMETER = const(768) _GENERIC_THERMOMETER = const(768)
_ADV_INTERVAL_MS = const(250000) _ADV_INTERVAL_US = const(250000)
temp_service = aioble.Service(_ENV_SENSE_UUID) temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_char = aioble.Characteristic(temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True) temp_char = aioble.Characteristic(temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True)
@ -117,7 +117,7 @@ aioble.register_services(temp_service)
while True: while True:
connection = await aioble.advertise( connection = await aioble.advertise(
_ADV_INTERVAL_MS, _ADV_INTERVAL_US,
name="temp-sense", name="temp-sense",
services=[_ENV_SENSE_UUID], services=[_ENV_SENSE_UUID],
appearance=_GENERIC_THERMOMETER, appearance=_GENERIC_THERMOMETER,
@ -126,30 +126,95 @@ while True:
print("Connection from", device) print("Connection from", device)
``` ```
Update characteristic value: (Server) #### Update characteristic value: (Server)
```py ```py
# Write the local value.
temp_char.write(b'data') temp_char.write(b'data')
temp_char.notify(b'optional data')
await temp_char.indicate(timeout_ms=2000)
``` ```
Query the value of a characteristic: (Client) ```py
# Write the local value and notify/indicate subscribers.
temp_char.write(b'data', send_update=True)
```
#### Send notifications: (Server)
```py
# Notify with the current value.
temp_char.notify(connection)
```
```py
# Notify with a custom value.
temp_char.notify(connection, b'optional data')
```
#### Send indications: (Server)
```py
# Indicate with current value.
await temp_char.indicate(connection, timeout_ms=2000)
```
```py
# Indicate with custom value.
await temp_char.indicate(connection, b'optional data', timeout_ms=2000)
```
This will raise `GattError` if the indication is not acknowledged.
#### Wait for a write from the client: (Server)
```py
# Normal characteristic, returns the connection that did the write.
connection = await char.written(timeout_ms=2000)
```
```py
# Characteristic with capture enabled, also returns the value.
char = Characteristic(..., capture=True)
connection, data = await char.written(timeout_ms=2000)
```
#### Query the value of a characteristic: (Client)
```py ```py
temp_service = await connection.service(_ENV_SENSE_UUID) temp_service = await connection.service(_ENV_SENSE_UUID)
temp_char = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID) temp_char = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID)
data = await temp_char.read(timeout_ms=1000) data = await temp_char.read(timeout_ms=1000)
```
#### Wait for a notification/indication: (Client)
```py
# Notification
data = await temp_char.notified(timeout_ms=1000)
```
```py
# Indication
data = await temp_char.indicated(timeout_ms=1000)
```
#### Subscribe to a characteristic: (Client)
```py
# Subscribe for notification.
await temp_char.subscribe(notify=True) await temp_char.subscribe(notify=True)
while True: while True:
data = await temp_char.notified() data = await temp_char.notified()
``` ```
Open L2CAP channels: (Listener) ```py
# Subscribe for indication.
await temp_char.subscribe(indicate=True)
while True:
data = await temp_char.indicated()
```
#### Open L2CAP channels: (Listener)
```py ```py
channel = await connection.l2cap_accept(_L2CAP_PSN, _L2CAP_MTU) channel = await connection.l2cap_accept(_L2CAP_PSN, _L2CAP_MTU)
@ -158,7 +223,7 @@ n = channel.recvinto(buf)
channel.send(b'response') channel.send(b'response')
``` ```
Open L2CAP channels: (Initiator) #### Open L2CAP channels: (Initiator)
```py ```py
channel = await connection.l2cap_connect(_L2CAP_PSN, _L2CAP_MTU) channel = await connection.l2cap_connect(_L2CAP_PSN, _L2CAP_MTU)

Wyświetl plik

@ -257,7 +257,7 @@ class Characteristic(BaseCharacteristic):
raise ValueError("Not supported") raise ValueError("Not supported")
ble.gatts_notify(connection._conn_handle, self._value_handle, data) ble.gatts_notify(connection._conn_handle, self._value_handle, data)
async def indicate(self, connection, timeout_ms=1000): async def indicate(self, connection, data=None, timeout_ms=1000):
if not (self.flags & _FLAG_INDICATE): if not (self.flags & _FLAG_INDICATE):
raise ValueError("Not supported") raise ValueError("Not supported")
if self._indicate_connection is not None: if self._indicate_connection is not None:
@ -270,7 +270,7 @@ class Characteristic(BaseCharacteristic):
try: try:
with connection.timeout(timeout_ms): with connection.timeout(timeout_ms):
ble.gatts_indicate(connection._conn_handle, self._value_handle) ble.gatts_indicate(connection._conn_handle, self._value_handle, data)
await self._indicate_event.wait() await self._indicate_event.wait()
if self._indicate_status != 0: if self._indicate_status != 0:
raise GattError(self._indicate_status) raise GattError(self._indicate_status)
@ -290,8 +290,8 @@ class Characteristic(BaseCharacteristic):
class BufferedCharacteristic(Characteristic): class BufferedCharacteristic(Characteristic):
def __init__(self, service, uuid, max_len=20, append=False): def __init__(self, *args, max_len=20, append=False, **kwargs):
super().__init__(service, uuid, read=True) super().__init__(*args, **kwargs)
self._max_len = max_len self._max_len = max_len
self._append = append self._append = append

Wyświetl plik

@ -3,7 +3,7 @@
# code. This allows (for development purposes) all the files to live in the # code. This allows (for development purposes) all the files to live in the
# one directory. # one directory.
metadata(version="0.3.1") metadata(version="0.4.1")
# Default installation gives you everything. Install the individual # Default installation gives you everything. Install the individual
# components (or a combination of them) if you want a more minimal install. # components (or a combination of them) if you want a more minimal install.

Wyświetl plik

@ -0,0 +1,139 @@
# Test characteristic read/write/notify from both GATTS and GATTC.
import sys
sys.path.append("")
from micropython import const
import time, machine
import uasyncio as asyncio
import aioble
import bluetooth
TIMEOUT_MS = 5000
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR1_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR2_UUID = bluetooth.UUID("00000000-1111-2222-3333-555555555555")
CHAR3_UUID = bluetooth.UUID("00000000-1111-2222-3333-666666666666")
# Acting in peripheral role.
async def instance0_task():
service = aioble.Service(SERVICE_UUID)
characteristic1 = aioble.BufferedCharacteristic(service, CHAR1_UUID, write=True)
characteristic2 = aioble.BufferedCharacteristic(service, CHAR2_UUID, write=True, max_len=40)
characteristic3 = aioble.BufferedCharacteristic(
service, CHAR3_UUID, write=True, max_len=80, append=True
)
aioble.register_services(service)
multitest.globals(BDADDR=aioble.config("mac"))
multitest.next()
# Wait for central to connect to us.
print("advertise")
connection = await aioble.advertise(
20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS
)
print("connected")
# The first will just see the second write (truncated).
await characteristic1.written(timeout_ms=TIMEOUT_MS)
await characteristic1.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic1.read())
# The second will just see the second write (still truncated because MTU
# exchange hasn't happened).
await characteristic2.written(timeout_ms=TIMEOUT_MS)
await characteristic2.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic2.read())
# MTU exchange should happen here.
# The second will now see the full second write.
await characteristic2.written(timeout_ms=TIMEOUT_MS)
await characteristic2.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic2.read())
# The third will see the two full writes concatenated.
await characteristic3.written(timeout_ms=TIMEOUT_MS)
await characteristic3.written(timeout_ms=TIMEOUT_MS)
print("written", characteristic3.read())
# Wait for the central to disconnect.
await connection.disconnected(timeout_ms=TIMEOUT_MS)
print("disconnected")
def instance0():
try:
asyncio.run(instance0_task())
finally:
aioble.stop()
# Acting in central role.
async def instance1_task():
multitest.next()
# Connect to peripheral and then disconnect.
print("connect")
device = aioble.Device(*BDADDR)
connection = await device.connect(timeout_ms=TIMEOUT_MS)
# Discover characteristics.
service = await connection.service(SERVICE_UUID)
print("service", service.uuid)
characteristic1 = await service.characteristic(CHAR1_UUID)
print("characteristic1", characteristic1.uuid)
characteristic2 = await service.characteristic(CHAR2_UUID)
print("characteristic2", characteristic2.uuid)
characteristic3 = await service.characteristic(CHAR3_UUID)
print("characteristic3", characteristic3.uuid)
# Write to each characteristic twice, with a long enough value to trigger
# truncation.
print("write1")
await characteristic1.write(
"central1-aaaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic1.write(
"central1-bbbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
print("write2a")
await characteristic2.write(
"central2a-aaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic2.write(
"central2a-bbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
print("exchange mtu")
await connection.exchange_mtu(100)
print("write2b")
await characteristic2.write(
"central2b-aaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic2.write(
"central2b-bbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
print("write3")
await characteristic3.write(
"central3-aaaaaaaaaaaaaaaaaaaaaaaaaaaaa", response=True, timeout_ms=TIMEOUT_MS
)
await characteristic3.write(
"central3-bbbbbbbbbbbbbbbbbbbbbbbbbbbbb", response=True, timeout_ms=TIMEOUT_MS
)
# Disconnect from peripheral.
print("disconnect")
await connection.disconnect(timeout_ms=TIMEOUT_MS)
print("disconnected")
def instance1():
try:
asyncio.run(instance1_task())
finally:
aioble.stop()

Wyświetl plik

@ -0,0 +1,21 @@
--- instance0 ---
advertise
connected
written b'central1-bbbbbbbbbbb'
written b'central2a-bbbbbbbbbb'
written b'central2b-bbbbbbbbbbbbbbbbbbbbbbbbbbbb'
written b'central3-aaaaaaaaaaaaaaaaaaaaaaaaaaaaacentral3-bbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
disconnected
--- instance1 ---
connect
service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
characteristic1 UUID('00000000-1111-2222-3333-444444444444')
characteristic2 UUID('00000000-1111-2222-3333-555555555555')
characteristic3 UUID('00000000-1111-2222-3333-666666666666')
write1
write2a
exchange mtu
write2b
write3
disconnect
disconnected

Wyświetl plik

@ -1,3 +1,3 @@
metadata(description="WM8960 codec.", version="0.1.0") metadata(description="WM8960 codec.", version="0.1.1")
module("wm8960.py", opt=3) module("wm8960.py", opt=3)

Wyświetl plik

@ -683,7 +683,7 @@ class WM8960:
) )
self.regs[_ALC3] = (_ALC_MODE_MASK, mode << _ALC_MODE_SHIFT) self.regs[_ALC3] = (_ALC_MODE_MASK, mode << _ALC_MODE_SHIFT)
try: try:
rate = _alc_sample_rate_table[self.sample_rate] rate = self._alc_sample_rate_table[self.sample_rate]
except: except:
rate = 0 rate = 0
self.regs[_ADDCTL3] = (_DACCTL3_ALCSR_MASK, rate) self.regs[_ADDCTL3] = (_DACCTL3_ALCSR_MASK, rate)

Wyświetl plik

@ -5,7 +5,7 @@ import time, math, framebuf, lcd160cr
def get_lcd(lcd): def get_lcd(lcd):
if type(lcd) is str: if isinstance(lcd, str):
lcd = lcd160cr.LCD160CR(lcd) lcd = lcd160cr.LCD160CR(lcd)
return lcd return lcd

Wyświetl plik

@ -1,8 +1,3 @@
metadata(description="LCD160CR driver.", version="0.1.0") metadata(description="LCD160CR driver.", version="0.1.0")
options.defaults(test=False)
module("lcd160cr.py", opt=3) module("lcd160cr.py", opt=3)
if options.test:
module("lcd160cr_test.py", opt=3)

Wyświetl plik

@ -598,7 +598,7 @@ class BMI270:
def _write_burst(self, reg, data, chunk=16): def _write_burst(self, reg, data, chunk=16):
self._write_reg(_INIT_ADDR_0, 0) self._write_reg(_INIT_ADDR_0, 0)
self._write_reg(_INIT_ADDR_1, 0) self._write_reg(_INIT_ADDR_1, 0)
for i in range(0, len(data) // chunk): for i in range(len(data) // chunk):
offs = i * chunk offs = i * chunk
self._write_reg(reg, data[offs : offs + chunk]) self._write_reg(reg, data[offs : offs + chunk])
init_addr = ((i + 1) * chunk) // 2 init_addr = ((i + 1) * chunk) // 2
@ -606,7 +606,7 @@ class BMI270:
self._write_reg(_INIT_ADDR_1, (init_addr >> 4) & 0xFF) self._write_reg(_INIT_ADDR_1, (init_addr >> 4) & 0xFF)
def _poll_reg(self, reg, mask, retry=10, delay=100): def _poll_reg(self, reg, mask, retry=10, delay=100):
for i in range(0, retry): for i in range(retry):
if self._read_reg(reg) & mask: if self._read_reg(reg) & mask:
return True return True
time.sleep_ms(delay) time.sleep_ms(delay)

Wyświetl plik

@ -165,11 +165,8 @@ class BMM150:
z = (z5 / (z4 * 4)) / 16 z = (z5 / (z4 * 4)) / 16
return z return z
def reset(self):
self._write_reg(_CMD, 0xB6)
def magnet_raw(self): def magnet_raw(self):
for i in range(0, 10): for i in range(10):
self._read_reg_into(_DATA, self.scratch) self._read_reg_into(_DATA, self.scratch)
if self.scratch[3] & 0x1: if self.scratch[3] & 0x1:
return ( return (

Wyświetl plik

@ -46,6 +46,7 @@ while (True):
import array import array
from micropython import const from micropython import const
import time
_CTRL3_C = const(0x12) _CTRL3_C = const(0x12)
_CTRL1_XL = const(0x10) _CTRL1_XL = const(0x10)
@ -196,7 +197,7 @@ class LSM6DSOX:
def reset(self): def reset(self):
self._write_reg(_CTRL3_C, self._read_reg(_CTRL3_C) | 0x1) self._write_reg(_CTRL3_C, self._read_reg(_CTRL3_C) | 0x1)
for i in range(0, 10): for i in range(10):
if (self._read_reg(_CTRL3_C) & 0x01) == 0: if (self._read_reg(_CTRL3_C) & 0x01) == 0:
return return
time.sleep_ms(10) time.sleep_ms(10)

Wyświetl plik

@ -1,2 +1,2 @@
metadata(description="ST LSM6DSOX imu driver.", version="1.0.0") metadata(description="ST LSM6DSOX imu driver.", version="1.0.1")
module("lsm6dsox.py", opt=3) module("lsm6dsox.py", opt=3)

Wyświetl plik

@ -44,6 +44,7 @@ while (True):
time.sleep_ms(100) time.sleep_ms(100)
""" """
import array import array
from micropython import const
_WHO_AM_I = const(0xF) _WHO_AM_I = const(0xF)

Wyświetl plik

@ -38,6 +38,7 @@ while (True):
time.sleep_ms(10) time.sleep_ms(10)
""" """
import machine import machine
from micropython import const
_LPS22_CTRL_REG1 = const(0x10) _LPS22_CTRL_REG1 = const(0x10)
_LPS22_CTRL_REG2 = const(0x11) _LPS22_CTRL_REG2 = const(0x11)

Wyświetl plik

@ -104,7 +104,7 @@ class ESPFlash:
raise Exception("Command ESP_WRITE_REG failed.") raise Exception("Command ESP_WRITE_REG failed.")
def _poll_reg(self, addr, flag, retry=10, delay=0.050): def _poll_reg(self, addr, flag, retry=10, delay=0.050):
for i in range(0, retry): for i in range(retry):
reg = self._read_reg(addr) reg = self._read_reg(addr)
if (reg & flag) == 0: if (reg & flag) == 0:
break break

Wyświetl plik

@ -16,6 +16,7 @@ Currently these radio modem chipsets are supported:
* SX1277 * SX1277
* SX1278 * SX1278
* SX1279 * SX1279
* STM32WL55 "sub-GHz radio" peripheral
Most radio configuration features are supported, as well as transmitting or Most radio configuration features are supported, as well as transmitting or
receiving packets. receiving packets.
@ -37,6 +38,7 @@ modem model that matches your hardware:
- `lora-sx126x` for SX1261 & SX1262 support. - `lora-sx126x` for SX1261 & SX1262 support.
- `lora-sx127x` for SX1276-SX1279 support. - `lora-sx127x` for SX1276-SX1279 support.
- `lora-stm32wl5` for STM32WL55 support.
It's recommended to install only the packages that you need, to save firmware It's recommended to install only the packages that you need, to save firmware
size. size.
@ -113,6 +115,24 @@ example: lower max frequency, lower maximum SF value) is responsibility of the
calling code. When possible please use the correct class anyhow, as per-part calling code. When possible please use the correct class anyhow, as per-part
code may be added in the future. code may be added in the future.
### Creating STM32WL55
```
from lora import WL55SubGhzModem
def get_modem():
# The LoRa configuration will depend on your board and location, see
# below under "Modem Configuration" for some possible examples.
lora_cfg = { 'freq_khz': SEE_BELOW_FOR_CORRECT_VALUE }
return WL55SubGhzModem(lora_cfg)
modem = get_modem()
```
Note: As this is an internal peripheral of the STM32WL55 microcontroller,
support also depends on MicroPython being built for a board based on this
microcontroller.
### Notes about initialisation ### Notes about initialisation
* See below for details about the `lora_cfg` structure that configures the modem's * See below for details about the `lora_cfg` structure that configures the modem's
@ -157,6 +177,15 @@ Here is a full list of parameters that can be passed to both constructors:
| `lora_cfg` | No | If set to an initial LoRa configuration then the modem is set up with this configuration. If not set here, can be set by calling `configure()` later on. | | | `lora_cfg` | No | If set to an initial LoRa configuration then the modem is set up with this configuration. If not set here, can be set by calling `configure()` later on. | |
| `ant`_sw | No | Optional antenna switch object instance, see below for description. | | | `ant`_sw | No | Optional antenna switch object instance, see below for description. | |
#### STM32WL55
| Parameter | Required | Description |
|-------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `lora_cfg` | No | If set to an initial LoRa configuration then the modem is set up with this configuration. If not set here, can be set by calling `configure()` later on. |
| `tcxo_millivolts` | No | Defaults to 1700. The voltage supplied on pin PB0_VDDTCXO. See `dio3_tcxo_millivolts` above for details, this parameter has the same behaviour. |
| ant_sw | No | Defaults to an instance of `lora.NucleoWL55RFConfig` class for the NUCLEO-WL55 development board. Set to `None` to disable any automatic antenna switching. See below for description. |
## Modem Configuration ## Modem Configuration
It is necessary to correctly configure the modem before use. At minimum, the It is necessary to correctly configure the modem before use. At minimum, the
@ -383,10 +412,11 @@ Type: `str`, not case sensitive
Default: RFO_HF or RFO_LF (low power) Default: RFO_HF or RFO_LF (low power)
SX127x modems have multiple antenna pins for different power levels and SX127x modems and STM32WL55 microcontrollers have multiple antenna pins for
frequency ranges. The board/module that the LoRa modem chip is on may have different power levels and frequency ranges. The board/module that the LoRa
particular antenna connections, or even an RF switch that needs to be set via a modem chip is on may have particular antenna connections, or even an RF switch
GPIO to connect an antenna pin to a particular output (see `ant_sw`, below). that needs to be set via a GPIO to connect an antenna pin to a particular output
(see `ant_sw`, below).
The driver must configure the modem to use the correct pin for a particular The driver must configure the modem to use the correct pin for a particular
hardware antenna connection before transmitting. When receiving, the modem hardware antenna connection before transmitting. When receiving, the modem
@ -396,7 +426,7 @@ A common symptom of incorrect `tx_ant` setting is an extremely weak RF signal.
Consult modem datasheet for more details. Consult modem datasheet for more details.
SX127x values: ##### SX127x tx_ant
| Value | RF Transmit Pin | | Value | RF Transmit Pin |
|-----------------|----------------------------------| |-----------------|----------------------------------|
@ -407,23 +437,35 @@ Pin "RFO_HF" is automatically used for frequencies above 862MHz, and is not
supported on SX1278. "RFO_LF" is used for frequencies below 862MHz. Consult supported on SX1278. "RFO_LF" is used for frequencies below 862MHz. Consult
datasheet Table 32 "Frequency Bands" for more details. datasheet Table 32 "Frequency Bands" for more details.
**Important**: If changing `tx_ant` value, configure `output_power` at the same ##### WL55SubGhzModem tx_ant
time or again before transmitting.
| Value | RF Transmit Pin |
|-----------------|-------------------------|
| `"PA_BOOST"` | RFO_HP pin (high power) |
| Any other value | RFO_LP pin (low power) |
**NOTE**: Currently the `PA_BOOST` HP antenna output is lower than it should be
on this board, due to an unknown driver bug.
If setting `tx_ant` value, also set `output_power` at the same time or again
before transmitting.
#### `output_power` - Transmit output power level #### `output_power` - Transmit output power level
Type: `int` Type: `int`
Default: Depends on modem Default: Depends on modem
Nominal TX output power in dBm. The possible range depends on the modem and (for Nominal TX output power in dBm. The possible range depends on the modem and for
SX127x only) the `tx_ant` configuration. some modems the `tx_ant` configuration.
| Modem | `tx_ant` value | Range | "Optimal" | | Modem | `tx_ant` value | Range (dBm) | "Optimal" (dBm) | |
|--------|------------------|-------------------|------------------------| |-----------------|----------------------------|-------------------|------------------------|---|
| SX1261 | N/A | -17 to +15 | +10, +14 or +15 [*][^] | | SX1261 | N/A | -17 to +15 | +10, +14 or +15 [*][^] | |
| SX1262 | N/A | -9 to +22 | +14, +17, +20, +22 [*] | | SX1262 | N/A | -9 to +22 | +14, +17, +20, +22 [*] | |
| SX127x | "PA_BOOST" | +2 to +17, or +20 | Any | | SX127x | "PA_BOOST" | +2 to +17, or +20 | Any | |
| SX127x | RFO_HF or RFO_LF | -4 to +15 | Any | | SX127x | RFO_HF or RFO_LF | -4 to +15 | Any | |
| WL55SubGhzModem | "PA_BOOST" | -9 to +22 | +14, +17, +20, +22 [*] | |
| WL55SubGhzModem | Any other value (not None) | -17 to +14 | +10, +14 or +15 [*][^] | |
Values which are out of range for the modem will be clamped at the Values which are out of range for the modem will be clamped at the
minimum/maximum values shown above. minimum/maximum values shown above.
@ -432,14 +474,14 @@ Actual radiated TX power for RF regulatory purposes depends on the RF hardware,
antenna, and the rest of the modem configuration. It should be measured and antenna, and the rest of the modem configuration. It should be measured and
tuned empirically not determined from this configuration information alone. tuned empirically not determined from this configuration information alone.
[*] For SX1261 and SX1262 the datasheet shows "Optimal" Power Amplifier [*] For some modems the datasheet shows "Optimal" Power Amplifier
configuration values for these output power levels. If setting one of these configuration values for these output power levels. If setting one of these
levels, the optimal settings from the datasheet are applied automatically by the levels, the optimal settings from the datasheet are applied automatically by the
driver. Therefore it is recommended to use one of these power levels if driver. Therefore it is recommended to use one of these power levels if
possible. possible.
[^] For SX1261 +15dBm is only possible with frequency above 400MHz, will be +14dBm [^] In the marked configurations +15dBm is only possible with frequency above
otherwise. 400MHz, will be +14dBm otherwise.
#### `implicit_header` - Implicit/Explicit Header Mode #### `implicit_header` - Implicit/Explicit Header Mode
Type: `bool` Type: `bool`
@ -1028,12 +1070,12 @@ following different approaches:
`poll_send()` now?" check function if there's no easy way to determine `poll_send()` now?" check function if there's no easy way to determine
which interrupt has woken the board up. which interrupt has woken the board up.
* Implement a custom interrupt callback function and call * Implement a custom interrupt callback function and call
`modem.set_irq_callback()` to install it. The function will be called with a `modem.set_irq_callback()` to install it. The function will be called if a
single argument, which is either the `Pin` that triggered a hardware interrupt hardware interrupt occurs, possibly in hard interrupt context. Refer to the
or `None` for a soft interrupt. Refer to the documentation about [writing interrupt documentation about [writing interrupt handlers][isr_rules] for more
handlers](https://docs.micropython.org/en/latest/reference/isr_rules.html) for information. It may also be called if the driver triggers a soft interrupt.
more information. The `lora-async` modem classes install their own callback here, The `lora-async` modem classes install their own callback here, so it's not
so it's not possible to mix this approach with the provided asynchronous API. possible to mix this approach with the provided asynchronous API.
* Call `modem.poll_recv()` or `modem.poll_send()`. This takes more time * Call `modem.poll_recv()` or `modem.poll_send()`. This takes more time
and uses more power as it reads the modem IRQ status directly from the modem and uses more power as it reads the modem IRQ status directly from the modem
via SPI, but it also give the most definite result. via SPI, but it also give the most definite result.
@ -1137,9 +1179,21 @@ The meaning of `tx_arg` depends on the modem:
above), and `False` otherwise. above), and `False` otherwise.
* For SX1262 it is `True` (indicating High Power mode). * For SX1262 it is `True` (indicating High Power mode).
* For SX1261 it is `False` (indicating Low Power mode). * For SX1261 it is `False` (indicating Low Power mode).
* For WL55SubGhzModem it is `True` if the `PA_BOOST` `tx_ant` setting is in use (see above), and `False` otherwise.
This parameter can be ignored if it's already known what modem and antenna is being used. This parameter can be ignored if it's already known what modem and antenna is being used.
### WL55SubGhzModem ant_sw
When instantiating the `WL55SubGhzModem` and `AsyncWL55SubGHzModem` classes, the
default `ant_sw` parameter is not `None`. Instead, the default will instantiate
an object of type `lora.NucleoWL55RFConfig`. This implements the antenna switch
connections for the ST NUCLEO-WL55 development board (as connected to GPIO pins
C4, C5 and C3). See ST document [UM2592][ST-UM2592-p27] (PDF) Figure 18 for details.
When using these modem classes (only), to disable any automatic antenna
switching behaviour it's necessary to explicitly set `ant_sw=None`.
## Troubleshooting ## Troubleshooting
Some common errors and their causes: Some common errors and their causes:
@ -1150,7 +1204,10 @@ The SX1261/2 drivers will raise this exception if the modem's TCXO fails to
provide the necessary clock signal when starting a transmit or receive provide the necessary clock signal when starting a transmit or receive
operation, or moving into "standby" mode. operation, or moving into "standby" mode.
Usually, this means the constructor parameter `dio3_tcxo_millivolts` (see above) Sometimes, this means the constructor parameter `dio3_tcxo_millivolts` (see above)
must be set as the SX126x chip DIO3 output pin is the power source for the TCXO must be set as the SX126x chip DIO3 output pin is the power source for the TCXO
connected to the modem. Often this parameter should be set to `3300` (3.3V) but connected to the modem. Often this parameter should be set to `3300` (3.3V) but
it may be another value, consult the documentation for your LoRa modem module. it may be another value, consult the documentation for your LoRa modem module.
[isr_rules]: https://docs.micropython.org/en/latest/reference/isr_rules.html
[ST-UM2592-p27]: https://www.st.com/resource/en/user_manual/dm00622917-stm32wl-nucleo64-board-mb1389-stmicroelectronics.pdf#page=27

Wyświetl plik

@ -111,9 +111,8 @@ class AsyncModem:
if _DEBUG: if _DEBUG:
print(f"wait complete") print(f"wait complete")
def _callback(self, _): def _callback(self):
# IRQ callback from BaseModem._radio_isr. Hard IRQ context unless _DEBUG # IRQ callback from BaseModem._radio_isr. May be in Hard IRQ context.
# is on.
# #
# Set both RX & TX flag. This isn't necessary for "real" interrupts, but may be necessary # Set both RX & TX flag. This isn't necessary for "real" interrupts, but may be necessary
# to wake both for the case of a "soft" interrupt triggered by sleep() or standby(), where # to wake both for the case of a "soft" interrupt triggered by sleep() or standby(), where

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0") metadata(version="0.1.1")
require("lora") require("lora")
package("lora") package("lora")

Wyświetl plik

@ -0,0 +1,136 @@
# MicroPython LoRa STM32WL55 embedded sub-ghz radio driver
# MIT license; Copyright (c) 2022 Angus Gratton
#
# This driver is essentially an embedded SX1262 with a custom internal interface block.
# Requires the stm module in MicroPython to be compiled with STM32WL5 subghz radio support.
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
from machine import Pin, SPI
import stm
from . import sx126x
from micropython import const
_CMD_CLR_ERRORS = const(0x07)
_REG_OCP = const(0x8E7)
# Default antenna switch config is as per Nucleo WL-55 board. See UM2592 Fig 18.
# Possible to work with other antenna switch board configurations by passing
# different ant_sw_class arguments to the modem, any class that creates an object with rx/tx
class NucleoWL55RFConfig:
def __init__(self):
self._FE_CTRL = (Pin(x, mode=Pin.OUT) for x in ("C4", "C5", "C3"))
def _set_fe_ctrl(self, values):
for pin, val in zip(self._FE_CTRL, values):
pin(val)
def rx(self):
self._set_fe_ctrl((1, 0, 1))
def tx(self, hp):
self._set_fe_ctrl((0 if hp else 1, 1, 1))
def idle(self):
pass
class DIO1:
# Dummy DIO1 "Pin" wrapper class to pass to the _SX126x class
def irq(self, handler, _):
stm.subghz_irq(handler)
class _WL55SubGhzModem(sx126x._SX126x):
# Don't construct this directly, construct lora.WL55SubGhzModem or lora.AsyncWL55SubGHzModem
def __init__(
self,
lora_cfg=None,
tcxo_millivolts=1700,
ant_sw=NucleoWL55RFConfig,
):
self._hp = False
if ant_sw == NucleoWL55RFConfig:
# To avoid the default argument being an object instance
ant_sw = NucleoWL55RFConfig()
super().__init__(
# RM0453 7.2.13 says max 16MHz, but this seems more stable
SPI("SUBGHZ", baudrate=8_000_000),
stm.subghz_cs,
stm.subghz_is_busy,
DIO1(),
False, # dio2_rf_sw
tcxo_millivolts, # dio3_tcxo_millivolts
10_000, # dio3_tcxo_start_time_us, first time after POR is quite long
None, # reset
lora_cfg,
ant_sw,
)
def _clear_errors(self):
# A weird difference between STM32WL55 and SX1262, WL55 only takes one
# parameter byte for the Clr_Error() command compared to two on SX1262.
# The bytes are always zero in both cases.
#
# (Not clear if sending two bytes will also work always/sometimes, but
# sending one byte to SX1262 definitely does not work!
self._cmd("BB", _CMD_CLR_ERRORS, 0x00)
def _clear_irq(self, clear_bits=0xFFFF):
super()._clear_irq(clear_bits)
# SUBGHZ Radio IRQ requires manual re-enabling after interrupt
stm.subghz_irq(self._radio_isr)
def _tx_hp(self):
# STM32WL5 supports both High and Low Power antenna pins depending on tx_ant setting
return self._hp
def _get_pa_tx_params(self, output_power, tx_ant):
# Given an output power level in dBm and the tx_ant setting (if any),
# return settings for SetPaConfig and SetTxParams.
#
# ST document RM0453 Set_PaConfig() reference and accompanying Table 35
# show values that are an exact superset of the SX1261 and SX1262
# available values, depending on which antenna pin is to be
# used. Therefore, call either modem's existing _get_pa_tx_params()
# function depending on the current tx_ant setting (default is low
# power).
if tx_ant is not None:
# Note: currently HP antenna power output is less than it should be,
# due to some (unknown) bug.
self._hp = tx_ant == "PA_BOOST"
# Update the OCP register to match the maximum power level
self._reg_write(_REG_OCP, 0x38 if self._hp else 0x18)
if self._hp:
return sx126x._SX1262._get_pa_tx_params(self, output_power, tx_ant)
else:
return sx126x._SX1261._get_pa_tx_params(self, output_power, tx_ant)
# Define the actual modem classes that use the SyncModem & AsyncModem "mixin-like" classes
# to create sync and async variants.
try:
from .sync_modem import SyncModem
class WL55SubGhzModem(_WL55SubGhzModem, SyncModem):
pass
except ImportError:
pass
try:
from .async_modem import AsyncModem
class AsyncWL55SubGhzModem(_WL55SubGhzModem, AsyncModem):
pass
except ImportError:
pass

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1")
require("lora-sx126x")
package("lora")

Wyświetl plik

@ -99,7 +99,7 @@ _IRQ_DRIVER_RX_MASK = const(_IRQ_RX_DONE | _IRQ_TIMEOUT | _IRQ_CRC_ERR | _IRQ_HE
# In any case, timeouts here are to catch broken/bad hardware or massive driver # In any case, timeouts here are to catch broken/bad hardware or massive driver
# bugs rather than commonplace issues. # bugs rather than commonplace issues.
# #
_CMD_BUSY_TIMEOUT_BASE_US = const(200) _CMD_BUSY_TIMEOUT_BASE_US = const(7000)
# Datasheet says 3.5ms needed to run a full Calibrate command (all blocks), # Datasheet says 3.5ms needed to run a full Calibrate command (all blocks),
# however testing shows it can be as much as as 18ms. # however testing shows it can be as much as as 18ms.
@ -141,12 +141,16 @@ class _SX126x(BaseModem):
self._sleep = True # assume the radio is in sleep mode to start, will wake on _cmd self._sleep = True # assume the radio is in sleep mode to start, will wake on _cmd
self._dio1 = dio1 self._dio1 = dio1
if hasattr(busy, "init"):
busy.init(Pin.IN) busy.init(Pin.IN)
if hasattr(cs, "init"):
cs.init(Pin.OUT, value=1) cs.init(Pin.OUT, value=1)
if dio1: if hasattr(dio1, "init"):
dio1.init(Pin.IN) dio1.init(Pin.IN)
self._busy_timeout = _CMD_BUSY_TIMEOUT_BASE_US self._busy_timeout = _CMD_BUSY_TIMEOUT_BASE_US + (
dio3_tcxo_start_time_us if dio3_tcxo_millivolts else 0
)
self._buf = bytearray(9) # shared buffer for commands self._buf = bytearray(9) # shared buffer for commands
@ -166,7 +170,8 @@ class _SX126x(BaseModem):
reset(1) reset(1)
time.sleep_ms(5) time.sleep_ms(5)
else: else:
self.standby() # Otherwise, at least put the radio to a known state # Otherwise, at least put the radio to a known state
self._cmd("BB", _CMD_SET_STANDBY, 0) # STDBY_RC mode, not ready for TCXO yet
status = self._get_status() status = self._get_status()
if (status[0] != _STATUS_MODE_STANDBY_RC and status[0] != _STATUS_MODE_STANDBY_HSE32) or ( if (status[0] != _STATUS_MODE_STANDBY_RC and status[0] != _STATUS_MODE_STANDBY_HSE32) or (
@ -185,7 +190,6 @@ class _SX126x(BaseModem):
# #
# timeout register is set in units of 15.625us each, use integer math # timeout register is set in units of 15.625us each, use integer math
# to calculate and round up: # to calculate and round up:
self._busy_timeout = (_CMD_BUSY_TIMEOUT_BASE_US + dio3_tcxo_start_time_us) * 2
timeout = (dio3_tcxo_start_time_us * 1000 + 15624) // 15625 timeout = (dio3_tcxo_start_time_us * 1000 + 15624) // 15625
if timeout < 0 or timeout > 1 << 24: if timeout < 0 or timeout > 1 << 24:
raise ValueError("{} out of range".format("dio3_tcxo_start_time_us")) raise ValueError("{} out of range".format("dio3_tcxo_start_time_us"))
@ -231,7 +235,7 @@ class _SX126x(BaseModem):
0x0, # DIO2Mask, not used 0x0, # DIO2Mask, not used
0x0, # DIO3Mask, not used 0x0, # DIO3Mask, not used
) )
dio1.irq(self._radio_isr, trigger=Pin.IRQ_RISING) dio1.irq(self._radio_isr, Pin.IRQ_RISING)
self._clear_irq() self._clear_irq()
@ -382,7 +386,9 @@ class _SX126x(BaseModem):
self._cmd(">BBH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword) self._cmd(">BBH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword)
if "output_power" in lora_cfg: if "output_power" in lora_cfg:
pa_config_args, self._output_power = self._get_pa_tx_params(lora_cfg["output_power"]) pa_config_args, self._output_power = self._get_pa_tx_params(
lora_cfg["output_power"], lora_cfg.get("tx_ant", None)
)
self._cmd("BBBBB", _CMD_SET_PA_CONFIG, *pa_config_args) self._cmd("BBBBB", _CMD_SET_PA_CONFIG, *pa_config_args)
if "pa_ramp_us" in lora_cfg: if "pa_ramp_us" in lora_cfg:
@ -664,7 +670,7 @@ class _SX126x(BaseModem):
while self._busy(): while self._busy():
ticks_diff = time.ticks_diff(time.ticks_us(), start) ticks_diff = time.ticks_diff(time.ticks_us(), start)
if ticks_diff > timeout_us: if ticks_diff > timeout_us:
raise RuntimeError("BUSY timeout") raise RuntimeError("BUSY timeout", timeout_us)
time.sleep_us(1) time.sleep_us(1)
if _DEBUG and ticks_diff > 105: if _DEBUG and ticks_diff > 105:
# By default, debug log any busy time that takes longer than the # By default, debug log any busy time that takes longer than the
@ -760,7 +766,7 @@ class _SX1262(_SX126x):
# SX1262 has High Power only (deviceSel==0) # SX1262 has High Power only (deviceSel==0)
return True return True
def _get_pa_tx_params(self, output_power): def _get_pa_tx_params(self, output_power, tx_ant):
# Given an output power level in dB, return a 2-tuple: # Given an output power level in dB, return a 2-tuple:
# - First item is the 3 arguments for SetPaConfig command # - First item is the 3 arguments for SetPaConfig command
# - Second item is the power level argument value for SetTxParams command. # - Second item is the power level argument value for SetTxParams command.
@ -831,7 +837,7 @@ class _SX1261(_SX126x):
# SX1261 has Low Power only (deviceSel==1) # SX1261 has Low Power only (deviceSel==1)
return False return False
def _get_pa_tx_params(self, output_power): def _get_pa_tx_params(self, output_power, tx_ant):
# Given an output power level in dB, return a 2-tuple: # Given an output power level in dB, return a 2-tuple:
# - First item is the 3 arguments for SetPaConfig command # - First item is the 3 arguments for SetPaConfig command
# - Second item is the power level argument value for SetTxParams command. # - Second item is the power level argument value for SetTxParams command.

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0") metadata(version="0.1.1")
require("lora") require("lora")
package("lora") package("lora")

Wyświetl plik

@ -23,7 +23,18 @@ except ImportError as e:
if "no module named 'lora." not in str(e): if "no module named 'lora." not in str(e):
raise raise
try:
from .stm32wl5 import * # noqa: F401
ok = True
except ImportError as e:
if "no module named 'lora." not in str(e):
raise
if not ok: if not ok:
raise ImportError( raise ImportError(
"Incomplete lora installation. Need at least one of lora-sync, lora-async and one of lora-sx126x, lora-sx127x" "Incomplete lora installation. Need at least one of lora-sync, lora-async and one of lora-sx126x, lora-sx127x"
) )
del ok

Wyświetl plik

@ -233,25 +233,16 @@ class BaseModem:
# #
# ISR implementation is relatively simple, just exists to signal an optional # ISR implementation is relatively simple, just exists to signal an optional
# callback, record a timestamp, and wake up the hardware if # callback, record a timestamp, and wake up the hardware if
# needed. ppplication code is expected to call poll_send() or # needed. Application code is expected to call poll_send() or
# poll_recv() as applicable in order to confirm the modem state. # poll_recv() as applicable in order to confirm the modem state.
# #
# This is a MP hard irq in some configurations, meaning no memory allocation is possible. # This is a MP hard irq in some configurations.
# def _radio_isr(self, _):
# 'pin' may also be None if this is a "soft" IRQ triggered after a receive
# timed out during a send (meaning no receive IRQ will fire, but the
# receiver should wake up and move on anyhow.)
def _radio_isr(self, pin):
self._last_irq = time.ticks_ms() self._last_irq = time.ticks_ms()
if self._irq_callback: if self._irq_callback:
self._irq_callback(pin) self._irq_callback()
if _DEBUG: if _DEBUG:
# Note: this may cause a MemoryError and fail if _DEBUG is enabled in this base class print("_radio_isr")
# but disabled in the subclass, meaning this is a hard irq handler
try:
print("_radio_isr pin={}".format(pin))
except MemoryError:
pass
def irq_triggered(self): def irq_triggered(self):
# Returns True if the ISR has executed since the last time a send or a receive # Returns True if the ISR has executed since the last time a send or a receive
@ -264,8 +255,7 @@ class BaseModem:
# This is used by the AsyncModem implementation, but can be called in # This is used by the AsyncModem implementation, but can be called in
# other circumstances to implement custom ISR logic. # other circumstances to implement custom ISR logic.
# #
# Note that callback may be called in hard ISR context, meaning no # Note that callback may be called in hard ISR context.
# memory allocation is possible.
self._irq_callback = callback self._irq_callback = callback
def _get_last_irq(self): def _get_last_irq(self):

Wyświetl plik

@ -1,2 +1,2 @@
metadata(version="0.1.0") metadata(version="0.2.0")
package("lora") package("lora")

Wyświetl plik

@ -1,6 +1,7 @@
# MicroPython package installer # MicroPython package installer
# MIT license; Copyright (c) 2022 Jim Mussared # MIT license; Copyright (c) 2022 Jim Mussared
from micropython import const
import requests import requests
import sys import sys

Wyświetl plik

@ -1,6 +1,7 @@
# This module should be imported from REPL, not run from command line. # This module should be imported from REPL, not run from command line.
import binascii import binascii
import hashlib import hashlib
from micropython import const
import network import network
import os import os
import socket import socket

Wyświetl plik

@ -1,31 +1,16 @@
# #
# uaiohttpclient - fetch URL passed as command line argument. # uaiohttpclient - fetch URL passed as command line argument.
# #
import sys
import uasyncio as asyncio import uasyncio as asyncio
import uaiohttpclient as aiohttp import uaiohttpclient as aiohttp
def print_stream(resp): async def run(url):
print((yield from resp.read())) resp = await aiohttp.request("GET", url)
return
while True:
line = yield from reader.readline()
if not line:
break
print(line.rstrip())
def run(url):
resp = yield from aiohttp.request("GET", url)
print(resp) print(resp)
yield from print_stream(resp) print(await resp.read())
import sys
import logging
logging.basicConfig(level=logging.INFO)
url = sys.argv[1] url = sys.argv[1]
loop = asyncio.get_event_loop() asyncio.run(run(url))
loop.run_until_complete(run(url))
loop.close()

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="HTTP client module for MicroPython uasyncio module", version="0.5.1") metadata(description="HTTP client module for MicroPython uasyncio module", version="0.5.2")
# Originally written by Paul Sokolovsky. # Originally written by Paul Sokolovsky.

Wyświetl plik

@ -5,8 +5,8 @@ class ClientResponse:
def __init__(self, reader): def __init__(self, reader):
self.content = reader self.content = reader
def read(self, sz=-1): async def read(self, sz=-1):
return (yield from self.content.read(sz)) return await self.content.read(sz)
def __repr__(self): def __repr__(self):
return "<ClientResponse %d %s>" % (self.status, self.headers) return "<ClientResponse %d %s>" % (self.status, self.headers)
@ -17,22 +17,22 @@ class ChunkedClientResponse(ClientResponse):
self.content = reader self.content = reader
self.chunk_size = 0 self.chunk_size = 0
def read(self, sz=4 * 1024 * 1024): async def read(self, sz=4 * 1024 * 1024):
if self.chunk_size == 0: if self.chunk_size == 0:
l = yield from self.content.readline() line = await self.content.readline()
# print("chunk line:", l) # print("chunk line:", l)
l = l.split(b";", 1)[0] line = line.split(b";", 1)[0]
self.chunk_size = int(l, 16) self.chunk_size = int(line, 16)
# print("chunk size:", self.chunk_size) # print("chunk size:", self.chunk_size)
if self.chunk_size == 0: if self.chunk_size == 0:
# End of message # End of message
sep = yield from self.content.read(2) sep = await self.content.read(2)
assert sep == b"\r\n" assert sep == b"\r\n"
return b"" return b""
data = yield from self.content.read(min(sz, self.chunk_size)) data = await self.content.read(min(sz, self.chunk_size))
self.chunk_size -= len(data) self.chunk_size -= len(data)
if self.chunk_size == 0: if self.chunk_size == 0:
sep = yield from self.content.read(2) sep = await self.content.read(2)
assert sep == b"\r\n" assert sep == b"\r\n"
return data return data
@ -40,40 +40,46 @@ class ChunkedClientResponse(ClientResponse):
return "<ChunkedClientResponse %d %s>" % (self.status, self.headers) return "<ChunkedClientResponse %d %s>" % (self.status, self.headers)
def request_raw(method, url): async def request_raw(method, url):
try: try:
proto, dummy, host, path = url.split("/", 3) proto, dummy, host, path = url.split("/", 3)
except ValueError: except ValueError:
proto, dummy, host = url.split("/", 2) proto, dummy, host = url.split("/", 2)
path = "" path = ""
if ":" in host:
host, port = host.split(":")
port = int(port)
else:
port = 80
if proto != "http:": if proto != "http:":
raise ValueError("Unsupported protocol: " + proto) raise ValueError("Unsupported protocol: " + proto)
reader, writer = yield from asyncio.open_connection(host, 80) reader, writer = await asyncio.open_connection(host, port)
# Use protocol 1.0, because 1.1 always allows to use chunked transfer-encoding # Use protocol 1.0, because 1.1 always allows to use chunked
# But explicitly set Connection: close, even though this should be default for 1.0, # transfer-encoding But explicitly set Connection: close, even
# because some servers misbehave w/o it. # though this should be default for 1.0, because some servers
# misbehave w/o it.
query = "%s /%s HTTP/1.0\r\nHost: %s\r\nConnection: close\r\nUser-Agent: compat\r\n\r\n" % ( query = "%s /%s HTTP/1.0\r\nHost: %s\r\nConnection: close\r\nUser-Agent: compat\r\n\r\n" % (
method, method,
path, path,
host, host,
) )
yield from writer.awrite(query.encode("latin-1")) await writer.awrite(query.encode("latin-1"))
# yield from writer.aclose()
return reader return reader
def request(method, url): async def request(method, url):
redir_cnt = 0 redir_cnt = 0
redir_url = None
while redir_cnt < 2: while redir_cnt < 2:
reader = yield from request_raw(method, url) reader = await request_raw(method, url)
headers = [] headers = []
sline = yield from reader.readline() sline = await reader.readline()
sline = sline.split(None, 2) sline = sline.split(None, 2)
status = int(sline[1]) status = int(sline[1])
chunked = False chunked = False
while True: while True:
line = yield from reader.readline() line = await reader.readline()
if not line or line == b"\r\n": if not line or line == b"\r\n":
break break
headers.append(line) headers.append(line)
@ -85,7 +91,7 @@ def request(method, url):
if 301 <= status <= 303: if 301 <= status <= 303:
redir_cnt += 1 redir_cnt += 1
yield from reader.aclose() await reader.aclose()
continue continue
break break

Wyświetl plik

@ -61,9 +61,10 @@ ignore = [
"F401", "F401",
"F403", "F403",
"F405", "F405",
"E501",
"F541", "F541",
"F821",
"F841", "F841",
"ISC001",
"ISC003", # micropython does not support implicit concatenation of f-strings "ISC003", # micropython does not support implicit concatenation of f-strings
"PIE810", # micropython does not support passing tuples to .startswith or .endswith "PIE810", # micropython does not support passing tuples to .startswith or .endswith
"PLC1901", "PLC1901",
@ -75,8 +76,9 @@ ignore = [
"PLW2901", "PLW2901",
"RUF012", "RUF012",
"RUF100", "RUF100",
"W191",
] ]
line-length = 260 line-length = 99
target-version = "py37" target-version = "py37"
[tool.ruff.mccabe] [tool.ruff.mccabe]
@ -91,3 +93,12 @@ max-statements = 166
[tool.ruff.per-file-ignores] [tool.ruff.per-file-ignores]
"micropython/aiorepl/aiorepl.py" = ["PGH001"] "micropython/aiorepl/aiorepl.py" = ["PGH001"]
# manifest.py files are evaluated with some global names pre-defined
"**/manifest.py" = ["F821"]
"ports/**/boards/manifest*.py" = ["F821"]
# ble multitests are evaluated with some names pre-defined
"micropython/bluetooth/aioble/multitests/*" = ["F821"]
[tool.ruff.format]

Wyświetl plik

@ -0,0 +1,32 @@
aiohttp is an HTTP client module for MicroPython asyncio module,
with API mostly compatible with CPython [aiohttp](https://github.com/aio-libs/aiohttp)
module.
> [!NOTE]
> Only client is implemented.
See `examples/client.py`
```py
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://micropython.org') as response:
print("Status:", response.status)
print("Content-Type:", response.headers['Content-Type'])
html = await response.text()
print("Body:", html[:15], "...")
asyncio.run(main())
```
```
$ micropython examples/client.py
Status: 200
Content-Type: text/html; charset=utf-8
Body: <!DOCTYPE html> ...
```

Wyświetl plik

@ -0,0 +1,264 @@
# MicroPython aiohttp library
# MIT license; Copyright (c) 2023 Carlos Gil
import asyncio
import json as _json
from .aiohttp_ws import (
_WSRequestContextManager,
ClientWebSocketResponse,
WebSocketClient,
WSMsgType,
)
HttpVersion10 = "HTTP/1.0"
HttpVersion11 = "HTTP/1.1"
class ClientResponse:
def __init__(self, reader):
self.content = reader
def _decode(self, data):
c_encoding = self.headers.get("Content-Encoding")
if c_encoding in ("gzip", "deflate", "gzip,deflate"):
try:
import deflate, io
if c_encoding == "deflate":
with deflate.DeflateIO(io.BytesIO(data), deflate.ZLIB) as d:
return d.read()
elif c_encoding == "gzip":
with deflate.DeflateIO(io.BytesIO(data), deflate.GZIP, 15) as d:
return d.read()
except ImportError:
print("WARNING: deflate module required")
return data
async def read(self, sz=-1):
return self._decode(await self.content.read(sz))
async def text(self, encoding="utf-8"):
return (await self.read(sz=-1)).decode(encoding)
async def json(self):
return _json.loads(await self.read())
def __repr__(self):
return "<ClientResponse %d %s>" % (self.status, self.headers)
class ChunkedClientResponse(ClientResponse):
def __init__(self, reader):
self.content = reader
self.chunk_size = 0
async def read(self, sz=4 * 1024 * 1024):
if self.chunk_size == 0:
l = await self.content.readline()
l = l.split(b";", 1)[0]
self.chunk_size = int(l, 16)
if self.chunk_size == 0:
# End of message
sep = await self.content.read(2)
assert sep == b"\r\n"
return b""
data = await self.content.read(min(sz, self.chunk_size))
self.chunk_size -= len(data)
if self.chunk_size == 0:
sep = await self.content.read(2)
assert sep == b"\r\n"
return self._decode(data)
def __repr__(self):
return "<ChunkedClientResponse %d %s>" % (self.status, self.headers)
class _RequestContextManager:
def __init__(self, client, request_co):
self.reqco = request_co
self.client = client
async def __aenter__(self):
return await self.reqco
async def __aexit__(self, *args):
await self.client._reader.aclose()
return await asyncio.sleep(0)
class ClientSession:
def __init__(self, base_url="", headers={}, version=HttpVersion10):
self._reader = None
self._base_url = base_url
self._base_headers = {"Connection": "close", "User-Agent": "compat"}
self._base_headers.update(**headers)
self._http_version = version
async def __aenter__(self):
return self
async def __aexit__(self, *args):
return await asyncio.sleep(0)
# TODO: Implement timeouts
async def _request(self, method, url, data=None, json=None, ssl=None, params=None, headers={}):
redir_cnt = 0
redir_url = None
while redir_cnt < 2:
reader = await self.request_raw(method, url, data, json, ssl, params, headers)
_headers = []
sline = await reader.readline()
sline = sline.split(None, 2)
status = int(sline[1])
chunked = False
while True:
line = await reader.readline()
if not line or line == b"\r\n":
break
_headers.append(line)
if line.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
chunked = True
elif line.startswith(b"Location:"):
url = line.rstrip().split(None, 1)[1].decode("latin-1")
if 301 <= status <= 303:
redir_cnt += 1
await reader.aclose()
continue
break
if chunked:
resp = ChunkedClientResponse(reader)
else:
resp = ClientResponse(reader)
resp.status = status
resp.headers = _headers
resp.url = url
if params:
resp.url += "?" + "&".join(f"{k}={params[k]}" for k in sorted(params))
try:
resp.headers = {
val.split(":", 1)[0]: val.split(":", 1)[-1].strip()
for val in [hed.decode().strip() for hed in _headers]
}
except Exception:
pass
self._reader = reader
return resp
async def request_raw(
self,
method,
url,
data=None,
json=None,
ssl=None,
params=None,
headers={},
is_handshake=False,
version=None,
):
if json and isinstance(json, dict):
data = _json.dumps(json)
if data is not None and method == "GET":
method = "POST"
if params:
url += "?" + "&".join(f"{k}={params[k]}" for k in sorted(params))
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
port = 443
if ssl is None:
ssl = True
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
reader, writer = await asyncio.open_connection(host, port, ssl=ssl)
# Use protocol 1.0, because 1.1 always allows to use chunked transfer-encoding
# But explicitly set Connection: close, even though this should be default for 1.0,
# because some servers misbehave w/o it.
if version is None:
version = self._http_version
if "Host" not in headers:
headers.update(Host=host)
if not data:
query = "%s /%s %s\r\n%s\r\n" % (
method,
path,
version,
"\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n" if headers else "",
)
else:
headers.update(**{"Content-Length": len(str(data))})
if json:
headers.update(**{"Content-Type": "application/json"})
query = """%s /%s %s\r\n%s\r\n%s\r\n\r\n""" % (
method,
path,
version,
"\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n",
data,
)
if not is_handshake:
await writer.awrite(query.encode("latin-1"))
return reader
else:
await writer.awrite(query.encode())
return reader, writer
def request(self, method, url, data=None, json=None, ssl=None, params=None, headers={}):
return _RequestContextManager(
self,
self._request(
method,
self._base_url + url,
data=data,
json=json,
ssl=ssl,
params=params,
headers=dict(**self._base_headers, **headers),
),
)
def get(self, url, **kwargs):
return self.request("GET", url, **kwargs)
def post(self, url, **kwargs):
return self.request("POST", url, **kwargs)
def put(self, url, **kwargs):
return self.request("PUT", url, **kwargs)
def patch(self, url, **kwargs):
return self.request("PATCH", url, **kwargs)
def delete(self, url, **kwargs):
return self.request("DELETE", url, **kwargs)
def head(self, url, **kwargs):
return self.request("HEAD", url, **kwargs)
def options(self, url, **kwargs):
return self.request("OPTIONS", url, **kwargs)
def ws_connect(self, url, ssl=None):
return _WSRequestContextManager(self, self._ws_connect(url, ssl=ssl))
async def _ws_connect(self, url, ssl=None):
ws_client = WebSocketClient(None)
await ws_client.connect(url, ssl=ssl, handshake_request=self.request_raw)
self._reader = ws_client.reader
return ClientWebSocketResponse(ws_client)

Wyświetl plik

@ -0,0 +1,269 @@
# MicroPython aiohttp library
# MIT license; Copyright (c) 2023 Carlos Gil
# adapted from https://github.com/danni/uwebsockets
# and https://github.com/miguelgrinberg/microdot/blob/main/src/microdot_asyncio_websocket.py
import asyncio
import random
import json as _json
import binascii
import re
import struct
from collections import namedtuple
URL_RE = re.compile(r"(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?")
URI = namedtuple("URI", ("protocol", "hostname", "port", "path")) # noqa: PYI024
def urlparse(uri):
"""Parse ws:// URLs"""
match = URL_RE.match(uri)
if match:
protocol = match.group(1)
host = match.group(2)
port = match.group(3)
path = match.group(4)
if protocol == "wss":
if port is None:
port = 443
elif protocol == "ws":
if port is None:
port = 80
else:
raise ValueError("Scheme {} is invalid".format(protocol))
return URI(protocol, host, int(port), path)
class WebSocketMessage:
def __init__(self, opcode, data):
self.type = opcode
self.data = data
class WSMsgType:
TEXT = 1
BINARY = 2
ERROR = 258
class WebSocketClient:
CONT = 0
TEXT = 1
BINARY = 2
CLOSE = 8
PING = 9
PONG = 10
def __init__(self, params):
self.params = params
self.closed = False
self.reader = None
self.writer = None
async def connect(self, uri, ssl=None, handshake_request=None):
uri = urlparse(uri)
assert uri
if uri.protocol == "wss":
if not ssl:
ssl = True
await self.handshake(uri, ssl, handshake_request)
@classmethod
def _parse_frame_header(cls, header):
byte1, byte2 = struct.unpack("!BB", header)
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
fin = bool(byte1 & 0x80)
opcode = byte1 & 0x0F
# Byte 2: MASK(1) LENGTH(7)
mask = bool(byte2 & (1 << 7))
length = byte2 & 0x7F
return fin, opcode, mask, length
def _process_websocket_frame(self, opcode, payload):
if opcode == self.TEXT:
payload = payload.decode()
elif opcode == self.BINARY:
pass
elif opcode == self.CLOSE:
# raise OSError(32, "Websocket connection closed")
return opcode, payload
elif opcode == self.PING:
return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch
return None, None
return None, payload
@classmethod
def _encode_websocket_frame(cls, opcode, payload):
if opcode == cls.TEXT:
payload = payload.encode()
length = len(payload)
fin = mask = True
# Frame header
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
byte1 = 0x80 if fin else 0
byte1 |= opcode
# Byte 2: MASK(1) LENGTH(7)
byte2 = 0x80 if mask else 0
if length < 126: # 126 is magic value to use 2-byte length header
byte2 |= length
frame = struct.pack("!BB", byte1, byte2)
elif length < (1 << 16): # Length fits in 2-bytes
byte2 |= 126 # Magic code
frame = struct.pack("!BBH", byte1, byte2, length)
elif length < (1 << 64):
byte2 |= 127 # Magic code
frame = struct.pack("!BBQ", byte1, byte2, length)
else:
raise ValueError
# Mask is 4 bytes
mask_bits = struct.pack("!I", random.getrandbits(32))
frame += mask_bits
payload = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(payload))
return frame + payload
async def handshake(self, uri, ssl, req):
headers = {}
_http_proto = "http" if uri.protocol != "wss" else "https"
url = f"{_http_proto}://{uri.hostname}:{uri.port}{uri.path or '/'}"
key = binascii.b2a_base64(bytes(random.getrandbits(8) for _ in range(16)))[:-1]
headers["Host"] = f"{uri.hostname}:{uri.port}"
headers["Connection"] = "Upgrade"
headers["Upgrade"] = "websocket"
headers["Sec-WebSocket-Key"] = key
headers["Sec-WebSocket-Version"] = "13"
headers["Origin"] = f"{_http_proto}://{uri.hostname}:{uri.port}"
self.reader, self.writer = await req(
"GET",
url,
ssl=ssl,
headers=headers,
is_handshake=True,
version="HTTP/1.1",
)
header = await self.reader.readline()
header = header[:-2]
assert header.startswith(b"HTTP/1.1 101 "), header
while header:
header = await self.reader.readline()
header = header[:-2]
async def receive(self):
while True:
opcode, payload = await self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload)
if send_opcode: # pragma: no cover
await self.send(data, send_opcode)
if opcode == self.CLOSE:
self.closed = True
return opcode, data
elif data: # pragma: no branch
return opcode, data
async def send(self, data, opcode=None):
frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY), data
)
self.writer.write(frame)
await self.writer.drain()
async def close(self):
if not self.closed: # pragma: no cover
self.closed = True
await self.send(b"", self.CLOSE)
async def _read_frame(self):
header = await self.reader.read(2)
if len(header) != 2: # pragma: no cover
# raise OSError(32, "Websocket connection closed")
opcode = self.CLOSE
payload = b""
return opcode, payload
fin, opcode, has_mask, length = self._parse_frame_header(header)
if length == 126: # Magic number, length header is 2 bytes
(length,) = struct.unpack("!H", await self.reader.read(2))
elif length == 127: # Magic number, length header is 8 bytes
(length,) = struct.unpack("!Q", await self.reader.read(8))
if has_mask: # pragma: no cover
mask = await self.reader.read(4)
payload = await self.reader.read(length)
if has_mask: # pragma: no cover
payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload))
return opcode, payload
class ClientWebSocketResponse:
def __init__(self, wsclient):
self.ws = wsclient
def __aiter__(self):
return self
async def __anext__(self):
msg = WebSocketMessage(*await self.ws.receive())
# print(msg.data, msg.type) # DEBUG
if (not msg.data and msg.type == self.ws.CLOSE) or self.ws.closed:
raise StopAsyncIteration
return msg
async def close(self):
await self.ws.close()
async def send_str(self, data):
if not isinstance(data, str):
raise TypeError("data argument must be str (%r)" % type(data))
await self.ws.send(data)
async def send_bytes(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError("data argument must be byte-ish (%r)" % type(data))
await self.ws.send(data)
async def send_json(self, data):
await self.send_str(_json.dumps(data))
async def receive_str(self):
msg = WebSocketMessage(*await self.ws.receive())
if msg.type != self.ws.TEXT:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
return msg.data
async def receive_bytes(self):
msg = WebSocketMessage(*await self.ws.receive())
if msg.type != self.ws.BINARY:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
return msg.data
async def receive_json(self):
data = await self.receive_str()
return _json.loads(data)
class _WSRequestContextManager:
def __init__(self, client, request_co):
self.reqco = request_co
self.client = client
async def __aenter__(self):
return await self.reqco
async def __aexit__(self, *args):
await self.client._reader.aclose()
return await asyncio.sleep(0)

Wyświetl plik

@ -0,0 +1,18 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("http://micropython.org") as response:
print("Status:", response.status)
print("Content-Type:", response.headers["Content-Type"])
html = await response.text()
print("Body:", html[:15], "...")
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,20 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
headers = {"Accept-Encoding": "gzip,deflate"}
async def main():
async with aiohttp.ClientSession(headers=headers, version=aiohttp.HttpVersion11) as session:
async with session.get("http://micropython.org") as response:
print("Status:", response.status)
print("Content-Type:", response.headers["Content-Type"])
print(response.headers)
html = await response.text()
print(html)
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,29 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
URL = sys.argv.pop()
if not URL.startswith("http"):
URL = "http://micropython.org"
print(URL)
async def fetch(client):
async with client.get(URL) as resp:
assert resp.status == 200
return await resp.text()
async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client)
print(html)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,18 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
headers = {"Authorization": "Basic bG9naW46cGFzcw=="}
async def main():
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get("http://httpbin.org/headers") as r:
json_body = await r.json()
print(json_body)
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,25 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession("http://httpbin.org") as session:
async with session.get("/get") as resp:
assert resp.status == 200
rget = await resp.text()
print(f"GET: {rget}")
async with session.post("/post", json={"foo": "bar"}) as resp:
assert resp.status == 200
rpost = await resp.text()
print(f"POST: {rpost}")
async with session.put("/put", data=b"data") as resp:
assert resp.status == 200
rput = await resp.json()
print("PUT: ", rput)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,20 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
params = {"key1": "value1", "key2": "value2"}
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/get", params=params) as response:
expect = "http://httpbin.org/get?key1=value1&key2=value2"
assert str(response.url) == expect, f"{response.url} != {expect}"
html = await response.text()
print(html)
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,44 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
try:
URL = sys.argv[1] # expects a websocket echo server
except Exception:
URL = "ws://echo.websocket.events"
sslctx = False
if URL.startswith("wss:"):
try:
import ssl
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
sslctx.verify_mode = ssl.CERT_NONE
except Exception:
pass
async def ws_test_echo(session):
async with session.ws_connect(URL, ssl=sslctx) as ws:
await ws.send_str("hello world!\r\n")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
if "close" in msg.data:
break
await ws.send_str("close\r\n")
await ws.close()
async def main():
async with aiohttp.ClientSession() as session:
await ws_test_echo(session)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,53 @@
import sys
sys.path.insert(0, ".")
import aiohttp
import asyncio
try:
URL = sys.argv[1] # expects a websocket echo server
READ_BANNER = False
except Exception:
URL = "ws://echo.websocket.events"
READ_BANNER = True
sslctx = False
if URL.startswith("wss:"):
try:
import ssl
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
sslctx.verify_mode = ssl.CERT_NONE
except Exception:
pass
async def ws_test_echo(session):
async with session.ws_connect(URL, ssl=sslctx) as ws:
if READ_BANNER:
print(await ws.receive_str())
try:
while True:
await ws.send_str(f"{input('>>> ')}\r\n")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data, end="")
break
except KeyboardInterrupt:
pass
finally:
await ws.close()
async def main():
async with aiohttp.ClientSession() as session:
await ws_test_echo(session)
if __name__ == "__main__":
asyncio.run(main())

Wyświetl plik

@ -0,0 +1,7 @@
metadata(
description="HTTP client module for MicroPython asyncio module",
version="0.0.1",
pypi="aiohttp",
)
package("aiohttp")

Wyświetl plik

@ -160,7 +160,7 @@ def decode_simple_value(decoder):
def decode_float16(decoder): def decode_float16(decoder):
payload = decoder.read(2) payload = decoder.read(2)
return unpack_float16(payload) raise NotImplementedError # no float16 unpack function
def decode_float32(decoder): def decode_float32(decoder):
@ -185,7 +185,7 @@ special_decoders = {
20: lambda self: False, 20: lambda self: False,
21: lambda self: True, 21: lambda self: True,
22: lambda self: None, 22: lambda self: None,
23: lambda self: undefined, # 23 is undefined
24: decode_simple_value, 24: decode_simple_value,
25: decode_float16, 25: decode_float16,
26: decode_float32, 26: decode_float32,
@ -210,8 +210,9 @@ class CBORDecoder(object):
data = self.fp.read(amount) data = self.fp.read(amount)
if len(data) < amount: if len(data) < amount:
raise CBORDecodeError( raise CBORDecodeError(
"premature end of stream (expected to read {} bytes, got {} " "premature end of stream (expected to read {} bytes, got {} instead)".format(
"instead)".format(amount, len(data)) amount, len(data)
)
) )
return data return data

Wyświetl plik

@ -380,9 +380,11 @@ def client(host, udp=False, reverse=False, bandwidth=10 * 1024 * 1024):
ticks_us_end = param["time"] * 1000000 ticks_us_end = param["time"] * 1000000
poll = select.poll() poll = select.poll()
poll.register(s_ctrl, select.POLLIN) poll.register(s_ctrl, select.POLLIN)
buf = None
s_data = None s_data = None
start = None start = None
udp_packet_id = 0 udp_packet_id = 0
udp_last_send = None
while True: while True:
for pollable in poll.poll(stats.max_dt_ms()): for pollable in poll.poll(stats.max_dt_ms()):
if pollable_is_sock(pollable, s_data): if pollable_is_sock(pollable, s_data):

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.8.0", pypi="requests") metadata(version="0.8.1", pypi="requests")
package("requests") package("requests")

Wyświetl plik

@ -45,7 +45,7 @@ def request(
parse_headers=True, parse_headers=True,
): ):
redirect = None # redirection url, None means no redirection redirect = None # redirection url, None means no redirection
chunked_data = data and getattr(data, "__iter__", None) and not getattr(data, "__len__", None) chunked_data = data and getattr(data, "__next__", None) and not getattr(data, "__len__", None)
if auth is not None: if auth is not None:
import ubinascii import ubinascii

Wyświetl plik

@ -5,3 +5,4 @@ absolute_import = True
with_statement = True with_statement = True
print_function = True print_function = True
unicode_literals = True unicode_literals = True
annotations = True

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.0.3") metadata(version="0.1.0")
module("__future__.py") module("__future__.py")

Wyświetl plik

@ -52,6 +52,25 @@ def _bytes_from_decode_data(s):
raise TypeError("argument should be bytes or ASCII string, not %s" % s.__class__.__name__) raise TypeError("argument should be bytes or ASCII string, not %s" % s.__class__.__name__)
def _maketrans(f, t):
"""Re-implement bytes.maketrans() as there is no such function in micropython"""
if len(f) != len(t):
raise ValueError("maketrans arguments must have same length")
translation_table = dict(zip(f, t))
return translation_table
def _translate(input_bytes, trans_table):
"""Re-implement bytes.translate() as there is no such function in micropython"""
result = bytearray()
for byte in input_bytes:
translated_byte = trans_table.get(byte, byte)
result.append(translated_byte)
return bytes(result)
# Base64 encoding/decoding uses binascii # Base64 encoding/decoding uses binascii
@ -73,7 +92,7 @@ def b64encode(s, altchars=None):
if not isinstance(altchars, bytes_types): if not isinstance(altchars, bytes_types):
raise TypeError("expected bytes, not %s" % altchars.__class__.__name__) raise TypeError("expected bytes, not %s" % altchars.__class__.__name__)
assert len(altchars) == 2, repr(altchars) assert len(altchars) == 2, repr(altchars)
return encoded.translate(bytes.maketrans(b"+/", altchars)) encoded = _translate(encoded, _maketrans(b"+/", altchars))
return encoded return encoded
@ -95,7 +114,7 @@ def b64decode(s, altchars=None, validate=False):
if altchars is not None: if altchars is not None:
altchars = _bytes_from_decode_data(altchars) altchars = _bytes_from_decode_data(altchars)
assert len(altchars) == 2, repr(altchars) assert len(altchars) == 2, repr(altchars)
s = s.translate(bytes.maketrans(altchars, b"+/")) s = _translate(s, _maketrans(altchars, b"+/"))
if validate and not re.match(b"^[A-Za-z0-9+/]*=*$", s): if validate and not re.match(b"^[A-Za-z0-9+/]*=*$", s):
raise binascii.Error("Non-base64 digit found") raise binascii.Error("Non-base64 digit found")
return binascii.a2b_base64(s) return binascii.a2b_base64(s)
@ -120,8 +139,8 @@ def standard_b64decode(s):
return b64decode(s) return b64decode(s)
# _urlsafe_encode_translation = bytes.maketrans(b'+/', b'-_') # _urlsafe_encode_translation = _maketrans(b'+/', b'-_')
# _urlsafe_decode_translation = bytes.maketrans(b'-_', b'+/') # _urlsafe_decode_translation = _maketrans(b'-_', b'+/')
def urlsafe_b64encode(s): def urlsafe_b64encode(s):
@ -132,7 +151,7 @@ def urlsafe_b64encode(s):
'/'. '/'.
""" """
# return b64encode(s).translate(_urlsafe_encode_translation) # return b64encode(s).translate(_urlsafe_encode_translation)
raise NotImplementedError() return b64encode(s, b"-_").rstrip(b"\n")
def urlsafe_b64decode(s): def urlsafe_b64decode(s):
@ -266,7 +285,7 @@ def b32decode(s, casefold=False, map01=None):
if map01 is not None: if map01 is not None:
map01 = _bytes_from_decode_data(map01) map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01) assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b"01", b"O" + map01)) s = _translate(s, _maketrans(b"01", b"O" + map01))
if casefold: if casefold:
s = s.upper() s = s.upper()
# Strip off pad characters from the right. We need to count the pad # Strip off pad characters from the right. We need to count the pad

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="3.3.4") metadata(version="3.3.5")
require("binascii") require("binascii")
require("struct") require("struct")

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.1.4") metadata(version="0.2.0")
# Originally written by Paul Sokolovsky. # Originally written by Paul Sokolovsky.

Wyświetl plik

@ -66,6 +66,13 @@ def isdir(path):
return False return False
def isfile(path):
try:
return bool(os.stat(path)[0] & 0x8000)
except OSError:
return False
def expanduser(s): def expanduser(s):
if s == "~" or s.startswith("~/"): if s == "~" or s.startswith("~/"):
h = os.getenv("HOME") h = os.getenv("HOME")

Wyświetl plik

@ -20,3 +20,7 @@ assert not exists(dir + "/test_path.py--")
assert isdir(dir + "/os") assert isdir(dir + "/os")
assert not isdir(dir + "/os--") assert not isdir(dir + "/os--")
assert not isdir(dir + "/test_path.py") assert not isdir(dir + "/test_path.py")
assert not isfile(dir + "/os")
assert isfile(dir + "/test_path.py")
assert not isfile(dir + "/test_path.py--")

Wyświetl plik

@ -0,0 +1,45 @@
# time
This library _extends_ the built-in [MicroPython `time`
module](https://docs.micropython.org/en/latest/library/time.html#module-time) to
include
[`time.strftime()`](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior).
`strftime()` is omitted from the built-in `time` module to conserve space.
## Installation
Use `mip` via `mpremote`:
```bash
> mpremote mip install time
```
See [Package management](https://docs.micropython.org/en/latest/reference/packages.html) for more details on using `mip` and `mpremote`.
## Common uses
`strftime()` is used when using a loggging [Formatter
Object](https://docs.python.org/3/library/logging.html#formatter-objects) that
employs
[`asctime`](https://docs.python.org/3/library/logging.html#formatter-objects).
For example:
```python
logging.Formatter('%(asctime)s | %(name)s | %(levelname)s - %(message)s')
```
The expected output might look like:
```text
Tue Feb 17 09:42:58 2009 | MAIN | INFO - test
```
But if this `time` extension library isn't installed, `asctime` will always be
`None`:
```text
None | MAIN | INFO - test
```

Wyświetl plik

@ -1,18 +1,15 @@
#!/bin/bash #!/bin/bash
######################################################################################## ########################################################################################
# code formatting # commit formatting
function ci_code_formatting_setup { function ci_commit_formatting_run {
sudo apt-add-repository --yes --update ppa:pybricks/ppa git remote add upstream https://github.com/micropython/micropython-lib.git
sudo apt-get install uncrustify git fetch --depth=100 upstream master
pip3 install black # If the common ancestor commit hasn't been found, fetch more.
uncrustify --version git merge-base upstream/master HEAD || git fetch --unshallow upstream master
black --version # For a PR, upstream/master..HEAD ends with a merge commit into master, exclude that one.
} tools/verifygitlog.py -v upstream/master..HEAD --no-merges
function ci_code_formatting_run {
tools/codeformat.py -v
} }
######################################################################################## ########################################################################################

Wyświetl plik

@ -25,87 +25,19 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE. # THE SOFTWARE.
# This is based on tools/codeformat.py from the main micropython/micropython # This is just a wrapper around running ruff format, so that code formatting can be
# repository but without support for .c/.h files. # invoked in the same way as in the main repo.
import argparse
import glob
import itertools
import os import os
import re
import subprocess import subprocess
# Relative to top-level repo dir.
PATHS = [
"**/*.py",
]
EXCLUSIONS = []
# Path to repo top-level dir. # Path to repo top-level dir.
TOP = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) TOP = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
PY_EXTS = (".py",)
def list_files(paths, exclusions=None, prefix=""):
files = set()
for pattern in paths:
files.update(glob.glob(os.path.join(prefix, pattern), recursive=True))
for pattern in exclusions or []:
files.difference_update(glob.fnmatch.filter(files, os.path.join(prefix, pattern)))
return sorted(files)
def main(): def main():
cmd_parser = argparse.ArgumentParser(description="Auto-format Python files.") command = ["ruff", "format", "."]
cmd_parser.add_argument("-v", action="store_true", help="Enable verbose output") subprocess.check_call(command, cwd=TOP)
cmd_parser.add_argument(
"-f",
action="store_true",
help="Filter files provided on the command line against the default list of files to check.",
)
cmd_parser.add_argument("files", nargs="*", help="Run on specific globs")
args = cmd_parser.parse_args()
# Expand the globs passed on the command line, or use the default globs above.
files = []
if args.files:
files = list_files(args.files)
if args.f:
# Filter against the default list of files. This is a little fiddly
# because we need to apply both the inclusion globs given in PATHS
# as well as the EXCLUSIONS, and use absolute paths
files = {os.path.abspath(f) for f in files}
all_files = set(list_files(PATHS, EXCLUSIONS, TOP))
if args.v: # In verbose mode, log any files we're skipping
for f in files - all_files:
print("Not checking: {}".format(f))
files = list(files & all_files)
else:
files = list_files(PATHS, EXCLUSIONS, TOP)
# Extract files matching a specific language.
def lang_files(exts):
for file in files:
if os.path.splitext(file)[1].lower() in exts:
yield file
# Run tool on N files at a time (to avoid making the command line too long).
def batch(cmd, files, N=200):
while True:
file_args = list(itertools.islice(files, N))
if not file_args:
break
subprocess.check_call(cmd + file_args)
# Format Python files with black.
command = ["black", "--fast", "--line-length=99"]
if args.v:
command.append("-v")
else:
command.append("-q")
batch(command, lang_files(PY_EXTS))
if __name__ == "__main__": if __name__ == "__main__":

Wyświetl plik

@ -185,9 +185,7 @@ urls = {{ Homepage = "https://github.com/micropython/micropython-lib" }}
""" """
[tool.hatch.build] [tool.hatch.build]
packages = ["{}"] packages = ["{}"]
""".format( """.format(top_level_package),
top_level_package
),
file=toml_file, file=toml_file,
) )

Wyświetl plik

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# This is an exact duplicate of verifygitlog.py from the main repo.
import re
import subprocess
import sys
verbosity = 0 # Show what's going on, 0 1 or 2.
suggestions = 1 # Set to 0 to not include lengthy suggestions in error messages.
ignore_prefixes = []
def verbose(*args):
if verbosity:
print(*args)
def very_verbose(*args):
if verbosity > 1:
print(*args)
class ErrorCollection:
# Track errors and warnings as the program runs
def __init__(self):
self.has_errors = False
self.has_warnings = False
self.prefix = ""
def error(self, text):
print("error: {}{}".format(self.prefix, text))
self.has_errors = True
def warning(self, text):
print("warning: {}{}".format(self.prefix, text))
self.has_warnings = True
def git_log(pretty_format, *args):
# Delete pretty argument from user args so it doesn't interfere with what we do.
args = ["git", "log"] + [arg for arg in args if "--pretty" not in args]
args.append("--pretty=format:" + pretty_format)
very_verbose("git_log", *args)
# Generator yielding each output line.
for line in subprocess.Popen(args, stdout=subprocess.PIPE).stdout:
yield line.decode().rstrip("\r\n")
def diagnose_subject_line(subject_line, subject_line_format, err):
err.error("Subject line: " + subject_line)
if not subject_line.endswith("."):
err.error('* must end with "."')
if not re.match(r"^[^!]+: ", subject_line):
err.error('* must start with "path: "')
if re.match(r"^[^!]+: *$", subject_line):
err.error("* must contain a subject after the path.")
m = re.match(r"^[^!]+: ([a-z][^ ]*)", subject_line)
if m:
err.error('* first word of subject ("{}") must be capitalised.'.format(m.group(1)))
if re.match(r"^[^!]+: [^ ]+$", subject_line):
err.error("* subject must contain more than one word.")
err.error("* must match: " + repr(subject_line_format))
err.error('* Example: "py/runtime: Add support for foo to bar."')
def verify(sha, err):
verbose("verify", sha)
err.prefix = "commit " + sha + ": "
# Author and committer email.
for line in git_log("%ae%n%ce", sha, "-n1"):
very_verbose("email", line)
if "noreply" in line:
err.error("Unwanted email address: " + line)
# Message body.
raw_body = list(git_log("%B", sha, "-n1"))
verify_message_body(raw_body, err)
def verify_message_body(raw_body, err):
if not raw_body:
err.error("Message is empty")
return
# Subject line.
subject_line = raw_body[0]
for prefix in ignore_prefixes:
if subject_line.startswith(prefix):
verbose("Skipping ignored commit message")
return
very_verbose("subject_line", subject_line)
subject_line_format = r"^[^!]+: [A-Z]+.+ .+\.$"
if not re.match(subject_line_format, subject_line):
diagnose_subject_line(subject_line, subject_line_format, err)
if len(subject_line) >= 73:
err.error("Subject line must be 72 or fewer characters: " + subject_line)
# Second one divides subject and body.
if len(raw_body) > 1 and raw_body[1]:
err.error("Second message line must be empty: " + raw_body[1])
# Message body lines.
for line in raw_body[2:]:
# Long lines with URLs are exempt from the line length rule.
if len(line) >= 76 and "://" not in line:
err.error("Message lines should be 75 or less characters: " + line)
if not raw_body[-1].startswith("Signed-off-by: ") or "@" not in raw_body[-1]:
err.error('Message must be signed-off. Use "git commit -s".')
def run(args):
verbose("run", *args)
err = ErrorCollection()
if "--check-file" in args:
filename = args[-1]
verbose("checking commit message from", filename)
with open(args[-1]) as f:
# Remove comment lines as well as any empty lines at the end.
lines = [line.rstrip("\r\n") for line in f if not line.startswith("#")]
while not lines[-1]:
lines.pop()
verify_message_body(lines, err)
else: # Normal operation, pass arguments to git log
for sha in git_log("%h", *args):
verify(sha, err)
if err.has_errors or err.has_warnings:
if suggestions:
print("See https://github.com/micropython/micropython/blob/master/CODECONVENTIONS.md")
else:
print("ok")
if err.has_errors:
sys.exit(1)
def show_help():
print("usage: verifygitlog.py [-v -n -h --check-file] ...")
print("-v : increase verbosity, can be specified multiple times")
print("-n : do not print multi-line suggestions")
print("-h : print this help message and exit")
print(
"--check-file : Pass a single argument which is a file containing a candidate commit message"
)
print(
"--ignore-rebase : Skip checking commits with git rebase autosquash prefixes or WIP as a prefix"
)
print("... : arguments passed to git log to retrieve commits to verify")
print(" see https://www.git-scm.com/docs/git-log")
print(" passing no arguments at all will verify all commits")
print("examples:")
print("verifygitlog.py -n10 # Check last 10 commits")
print("verifygitlog.py -v master..HEAD # Check commits since master")
if __name__ == "__main__":
args = sys.argv[1:]
verbosity = args.count("-v")
suggestions = args.count("-n") == 0
if "--ignore-rebase" in args:
args.remove("--ignore-rebase")
ignore_prefixes = ["squash!", "fixup!", "amend!", "WIP"]
if "-h" in args:
show_help()
else:
args = [arg for arg in args if arg not in ["-v", "-n", "-h"]]
run(args)

Wyświetl plik

@ -1,36 +1,55 @@
import sys import sys
import ffilib import ffilib
import array import array
import uctypes
pcre2 = ffilib.open("libpcre2-8")
# pcre2_code *pcre2_compile(PCRE2_SPTR pattern, PCRE2_SIZE length,
# uint32_t options, int *errorcode, PCRE2_SIZE *erroroffset,
# pcre2_compile_context *ccontext);
pcre2_compile = pcre2.func("p", "pcre2_compile_8", "siippp")
# int pcre2_match(const pcre2_code *code, PCRE2_SPTR subject,
# PCRE2_SIZE length, PCRE2_SIZE startoffset, uint32_t options,
# pcre2_match_data *match_data, pcre2_match_context *mcontext);
pcre2_match = pcre2.func("i", "pcre2_match_8", "Psiiipp")
# int pcre2_pattern_info(const pcre2_code *code, uint32_t what,
# void *where);
pcre2_pattern_info = pcre2.func("i", "pcre2_pattern_info_8", "Pip")
# PCRE2_SIZE *pcre2_get_ovector_pointer(pcre2_match_data *match_data);
pcre2_get_ovector_pointer = pcre2.func("p", "pcre2_get_ovector_pointer_8", "p")
# pcre2_match_data *pcre2_match_data_create_from_pattern(const pcre2_code *code,
# pcre2_general_context *gcontext);
pcre2_match_data_create_from_pattern = pcre2.func(
"p", "pcre2_match_data_create_from_pattern_8", "Pp"
)
# PCRE2_SIZE that is of type size_t.
# Use ULONG as type to support both 32bit and 64bit.
PCRE2_SIZE_SIZE = uctypes.sizeof({"field": 0 | uctypes.ULONG})
PCRE2_SIZE_TYPE = "L"
# Real value in pcre2.h is 0xFFFFFFFF for 32bit and
# 0x0xFFFFFFFFFFFFFFFF for 64bit that is equivalent
# to -1
PCRE2_ZERO_TERMINATED = -1
pcre = ffilib.open("libpcre") IGNORECASE = I = 0x8
MULTILINE = M = 0x400
# pcre *pcre_compile(const char *pattern, int options, DOTALL = S = 0x20
# const char **errptr, int *erroffset, VERBOSE = X = 0x80
# const unsigned char *tableptr); PCRE2_ANCHORED = 0x80000000
pcre_compile = pcre.func("p", "pcre_compile", "sipps")
# int pcre_exec(const pcre *code, const pcre_extra *extra,
# const char *subject, int length, int startoffset,
# int options, int *ovector, int ovecsize);
pcre_exec = pcre.func("i", "pcre_exec", "PPsiiipi")
# int pcre_fullinfo(const pcre *code, const pcre_extra *extra,
# int what, void *where);
pcre_fullinfo = pcre.func("i", "pcre_fullinfo", "PPip")
IGNORECASE = I = 1
MULTILINE = M = 2
DOTALL = S = 4
VERBOSE = X = 8
PCRE_ANCHORED = 0x10
# TODO. Note that Python3 has unicode by default # TODO. Note that Python3 has unicode by default
ASCII = A = 0 ASCII = A = 0
UNICODE = U = 0 UNICODE = U = 0
PCRE_INFO_CAPTURECOUNT = 2 PCRE2_INFO_CAPTURECOUNT = 0x4
class PCREMatch: class PCREMatch:
@ -67,19 +86,23 @@ class PCREPattern:
def search(self, s, pos=0, endpos=-1, _flags=0): def search(self, s, pos=0, endpos=-1, _flags=0):
assert endpos == -1, "pos: %d, endpos: %d" % (pos, endpos) assert endpos == -1, "pos: %d, endpos: %d" % (pos, endpos)
buf = array.array("i", [0]) buf = array.array("i", [0])
pcre_fullinfo(self.obj, None, PCRE_INFO_CAPTURECOUNT, buf) pcre2_pattern_info(self.obj, PCRE2_INFO_CAPTURECOUNT, buf)
cap_count = buf[0] cap_count = buf[0]
ov = array.array("i", [0, 0, 0] * (cap_count + 1)) match_data = pcre2_match_data_create_from_pattern(self.obj, None)
num = pcre_exec(self.obj, None, s, len(s), pos, _flags, ov, len(ov)) num = pcre2_match(self.obj, s, len(s), pos, _flags, match_data, None)
if num == -1: if num == -1:
# No match # No match
return None return None
ov_ptr = pcre2_get_ovector_pointer(match_data)
# pcre2_get_ovector_pointer return PCRE2_SIZE
ov_buf = uctypes.bytearray_at(ov_ptr, PCRE2_SIZE_SIZE * (cap_count + 1) * 2)
ov = array.array(PCRE2_SIZE_TYPE, ov_buf)
# We don't care how many matching subexpressions we got, we # We don't care how many matching subexpressions we got, we
# care only about total # of capturing ones (including empty) # care only about total # of capturing ones (including empty)
return PCREMatch(s, cap_count + 1, ov) return PCREMatch(s, cap_count + 1, ov)
def match(self, s, pos=0, endpos=-1): def match(self, s, pos=0, endpos=-1):
return self.search(s, pos, endpos, PCRE_ANCHORED) return self.search(s, pos, endpos, PCRE2_ANCHORED)
def sub(self, repl, s, count=0): def sub(self, repl, s, count=0):
if not callable(repl): if not callable(repl):
@ -141,9 +164,9 @@ class PCREPattern:
def compile(pattern, flags=0): def compile(pattern, flags=0):
errptr = bytes(4) errcode = bytes(4)
erroffset = bytes(4) erroffset = bytes(4)
regex = pcre_compile(pattern, flags, errptr, erroffset, None) regex = pcre2_compile(pattern, PCRE2_ZERO_TERMINATED, flags, errcode, erroffset, None)
assert regex assert regex
return PCREPattern(regex) return PCREPattern(regex)
@ -154,7 +177,7 @@ def search(pattern, string, flags=0):
def match(pattern, string, flags=0): def match(pattern, string, flags=0):
r = compile(pattern, flags | PCRE_ANCHORED) r = compile(pattern, flags | PCRE2_ANCHORED)
return r.search(string) return r.search(string)