From eba897420d96650a372b4b65831c24741d667890 Mon Sep 17 00:00:00 2001 From: Andrew Leech Date: Mon, 19 Sep 2022 10:37:36 +1000 Subject: [PATCH] aioble/server.py: Maintain write order for captured characteristics. This replaced the per-characteristic queues with a single shared queue, which means that the characteristics will return from `written()` in the exact order that the original writes arrived, even if the writes are occuring across multiple different characteristics. This work was funded by Planet Innovation. Signed-off-by: Jim Mussared --- micropython/bluetooth/aioble/aioble/server.py | 106 +++++++++++----- .../aioble/multitests/ble_write_order.py | 118 ++++++++++++++++++ .../aioble/multitests/ble_write_order.py.exp | 37 ++++++ 3 files changed, 233 insertions(+), 28 deletions(-) create mode 100644 micropython/bluetooth/aioble/multitests/ble_write_order.py create mode 100644 micropython/bluetooth/aioble/multitests/ble_write_order.py.exp diff --git a/micropython/bluetooth/aioble/aioble/server.py b/micropython/bluetooth/aioble/aioble/server.py index d01d3e4c..76374a36 100644 --- a/micropython/bluetooth/aioble/aioble/server.py +++ b/micropython/bluetooth/aioble/aioble/server.py @@ -60,6 +60,12 @@ def _server_irq(event, data): def _server_shutdown(): global _registered_characteristics _registered_characteristics = {} + if hasattr(BaseCharacteristic, "_capture_task"): + BaseCharacteristic._capture_task.cancel() + del BaseCharacteristic._capture_queue + del BaseCharacteristic._capture_write_event + del BaseCharacteristic._capture_consumed_event + del BaseCharacteristic._capture_task register_irq_handler(_server_irq, _server_shutdown) @@ -97,6 +103,42 @@ class BaseCharacteristic: else: ble.gatts_write(self._value_handle, data, send_update) + # When the a capture-enabled characteristic is created, create the + # necessary events (if not already created). + @staticmethod + def _init_capture(): + if hasattr(BaseCharacteristic, "_capture_queue"): + return + + BaseCharacteristic._capture_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT) + BaseCharacteristic._capture_write_event = asyncio.ThreadSafeFlag() + BaseCharacteristic._capture_consumed_event = asyncio.ThreadSafeFlag() + BaseCharacteristic._capture_task = asyncio.create_task( + BaseCharacteristic._run_capture_task() + ) + + # Monitor the shared queue for incoming characteristic writes and forward + # them sequentially to the individual characteristic events. + @staticmethod + async def _run_capture_task(): + write = BaseCharacteristic._capture_write_event + consumed = BaseCharacteristic._capture_consumed_event + q = BaseCharacteristic._capture_queue + + while True: + if len(q): + conn, data, characteristic = q.popleft() + # Let the characteristic waiting in `written()` know that it + # can proceed. + characteristic._write_data = (conn, data) + characteristic._write_event.set() + # Wait for the characteristic to complete `written()` before + # continuing. + await consumed.wait() + + if not len(q): + await write.wait() + # Wait for a write on this characteristic. Returns the connection that did # the write, or a tuple of (connection, value) if capture is enabled for # this characteristics. @@ -105,17 +147,27 @@ class BaseCharacteristic: # Not a writable characteristic. return - # If the queue is empty, then we need to wait. However, if the queue - # has a single item, we also need to do a no-op wait in order to - # clear the event flag (because the queue will become empty and - # therefore the event should be cleared). - if len(self._write_queue) <= 1: - with DeviceTimeout(None, timeout_ms): - await self._write_event.wait() + # If no write has been seen then we need to wait. If the event has + # already been set this will clear the event and continue + # immediately. In regular mode, this is set by the write IRQ + # directly (in _remote_write). In capture mode, this is set when it's + # our turn by _capture_task. + with DeviceTimeout(None, timeout_ms): + await self._write_event.wait() - # Either we started > 1 item, or the wait completed successfully, return - # the front of the queue. - return self._write_queue.popleft() + # Return the write data and clear the stored copy. + # In default usage this will be just the connection handle. + # In capture mode this will be a tuple of (connection_handle, received_data) + data = self._write_data + self._write_data = None + + if self.flags & _FLAG_WRITE_CAPTURE: + # Notify the shared queue monitor that the event has been consumed + # by the caller to `written()` and another characteristic can now + # proceed. + BaseCharacteristic._capture_consumed_event.set() + + return data def on_read(self, connection): return 0 @@ -124,27 +176,20 @@ class BaseCharacteristic: if characteristic := _registered_characteristics.get(value_handle, None): # If we've gone from empty to one item, then wake something # blocking on `await char.written()`. - wake = len(characteristic._write_queue) == 0 conn = DeviceConnection._connected.get(conn_handle, None) - q = characteristic._write_queue if characteristic.flags & _FLAG_WRITE_CAPTURE: - # For capture, we append both the connection and the written - # value to the queue. The deque will enforce the max queue len. + # For capture, we append the connection and the written value + # value to the shared queue along with the matching characteristic object. + # The deque will enforce the max queue len. data = characteristic.read() - q.append((conn, data)) + BaseCharacteristic._capture_queue.append((conn, data, characteristic)) + BaseCharacteristic._capture_write_event.set() else: - # Use the queue as a single slot -- it has max length of 1, - # so if there's an existing item it will be replaced. - q.append(conn) - - if wake: - # Queue is now non-empty. If something is waiting, it will be - # worken. If something isn't waiting right now, then a future - # caller to `await char.written()` will see the queue is - # non-empty, and wait on the event if it's going to empty the - # queue. + # Store the write connection handle to be later used to retrieve the data + # then set event to handle in written() task. + characteristic._write_data = conn characteristic._write_event.set() def _remote_read(conn_handle, value_handle): @@ -178,10 +223,15 @@ class Characteristic(BaseCharacteristic): if capture: # Capture means that we keep track of all writes, and capture # their values (and connection) in a queue. Otherwise we just - # track the most recent connection. + # track the connection of the most recent write. flags |= _FLAG_WRITE_CAPTURE + BaseCharacteristic._init_capture() + + # Set when this characteristic has a value waiting in self._write_data. self._write_event = asyncio.ThreadSafeFlag() - self._write_queue = deque((), _WRITE_CAPTURE_QUEUE_LIMIT if capture else 1) + # The connection of the most recent write, or a tuple of + # (connection, data) if capture is enabled. + self._write_data = None if notify: flags |= _FLAG_NOTIFY if indicate: @@ -263,7 +313,7 @@ class Descriptor(BaseCharacteristic): flags |= _FLAG_DESC_READ if write: self._write_event = asyncio.ThreadSafeFlag() - self._write_queue = deque((), 1) + self._write_data = None flags |= _FLAG_DESC_WRITE self.uuid = uuid diff --git a/micropython/bluetooth/aioble/multitests/ble_write_order.py b/micropython/bluetooth/aioble/multitests/ble_write_order.py new file mode 100644 index 00000000..4b64031e --- /dev/null +++ b/micropython/bluetooth/aioble/multitests/ble_write_order.py @@ -0,0 +1,118 @@ +# Test characteristic write capture preserves order across characteristics. + +import sys + +sys.path.append("") + +from micropython import const +import time, machine + +import uasyncio as asyncio +import aioble +import bluetooth + +TIMEOUT_MS = 5000 + +# Without the write ordering (via the shared queue) in server.py, this test +# passes with delay of 1, fails some at 5, fails more at 50 +DUMMY_DELAY = 50 + +SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A") +CHAR_FIRST_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444") +CHAR_SECOND_UUID = bluetooth.UUID("00000000-1111-2222-3333-555555555555") + +# Acting in peripheral role. +async def instance0_task(): + service = aioble.Service(SERVICE_UUID) + characteristic_first = aioble.Characteristic( + service, + CHAR_FIRST_UUID, + write=True, + capture=True, + ) + # Second characteristic enabled write capture. + characteristic_second = aioble.Characteristic( + service, + CHAR_SECOND_UUID, + write=True, + capture=True, + ) + aioble.register_services(service) + + # Register characteristic.written() handlers as asyncio background tasks. + # The order of these is important! + asyncio.create_task(task_written(characteristic_second, "second")) + asyncio.create_task(task_written(characteristic_first, "first")) + + # This dummy task simulates background processing on a real system that + # can block the asyncio loop for brief periods of time + asyncio.create_task(task_dummy()) + + multitest.globals(BDADDR=aioble.config("mac")) + multitest.next() + + # Wait for central to connect to us. + print("advertise") + async with await aioble.advertise( + 20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS + ) as connection: + print("connected") + + await connection.disconnected() + + +async def task_written(chr, label): + while True: + await chr.written() + data = chr.read().decode() + print(f"written: {label} {data}") + + +async def task_dummy(): + while True: + time.sleep_ms(DUMMY_DELAY) + await asyncio.sleep_ms(5) + + +def instance0(): + try: + asyncio.run(instance0_task()) + finally: + aioble.stop() + + +# Acting in central role. +async def instance1_task(): + multitest.next() + + # Connect to peripheral and then disconnect. + print("connect") + device = aioble.Device(*BDADDR) + async with await device.connect(timeout_ms=TIMEOUT_MS) as connection: + # Discover characteristics. + service = await connection.service(SERVICE_UUID) + print("service", service.uuid) + characteristic_first = await service.characteristic(CHAR_FIRST_UUID) + characteristic_second = await service.characteristic(CHAR_SECOND_UUID) + print("characteristic", characteristic_first.uuid, characteristic_second.uuid) + + for i in range(5): + print(f"write c{i}") + await characteristic_first.write("c" + str(i), timeout_ms=TIMEOUT_MS) + await characteristic_second.write("c" + str(i), timeout_ms=TIMEOUT_MS) + + await asyncio.sleep_ms(300) + + for i in range(5): + print(f"write r{i}") + await characteristic_second.write("r" + str(i), timeout_ms=TIMEOUT_MS) + await characteristic_first.write("r" + str(i), timeout_ms=TIMEOUT_MS) + + await asyncio.sleep_ms(300) + + +def instance1(): + try: + asyncio.run(instance1_task()) + finally: + aioble.stop() diff --git a/micropython/bluetooth/aioble/multitests/ble_write_order.py.exp b/micropython/bluetooth/aioble/multitests/ble_write_order.py.exp new file mode 100644 index 00000000..516de685 --- /dev/null +++ b/micropython/bluetooth/aioble/multitests/ble_write_order.py.exp @@ -0,0 +1,37 @@ +--- instance0 --- +advertise +connected +written: first c0 +written: second c0 +written: first c1 +written: second c1 +written: first c2 +written: second c2 +written: first c3 +written: second c3 +written: first c4 +written: second c4 +written: second r0 +written: first r0 +written: second r1 +written: first r1 +written: second r2 +written: first r2 +written: second r3 +written: first r3 +written: second r4 +written: first r4 +--- instance1 --- +connect +service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a') +characteristic UUID('00000000-1111-2222-3333-444444444444') UUID('00000000-1111-2222-3333-555555555555') +write c0 +write c1 +write c2 +write c3 +write c4 +write r0 +write r1 +write r2 +write r3 +write r4