Remove undocumented and outdated uasyncio directory.

main
Peter Hinch 2020-12-27 17:21:48 +00:00
rodzic 56f775241a
commit 163a2f264c
48 zmienionych plików z 0 dodań i 4542 usunięć

Wyświetl plik

@ -1,161 +0,0 @@
# 1. Changes to usayncio
This archive contains suggestions for changes to new `uasyncio`.
## 1.1 Changes implemented
1. Implement as a Python package.
2. Implement synchronisation primitives as package modules to conserve RAM.
3. `Primitive` class has methods common to most synchronisation primitives.
Avoids the need for primitives to access the task queue directly.
4. Add `.priority` method to `Stream` class. Enables I/O to be handled at high
priority on a per-device basis.
5. Rename task queue class `TQueue` to avoid name clash with Queue primitive.
### Minor changes
1. Move `StreamReader` and `StreamWriter` assignments out of legacy section of
code: these classes exist in `asyncio` 3.8.
2. `.CreateTask` produces an assertion fail if called with a generator function.
Avoids obscure traceback if someone omits the parens.
3. Add machine readable version info. Useful in testing.
## 1.2 Suggested changes
I haven't implemented these.
1. Make `Event.set` capable of being triggered from an ISR.
2. Implement `wait_for_ms` as per V2.
# 2. CPython-compatible synchronisation primitives
These aim to work efficiently with the new version. All are separate modules to
conserve RAM. Items 1-4 subclass `uasyncio.Primitive`.
1. `Event`: Moved to separate module.
2. `Lock`: Kevin Köck's solution.
3. `Queue`: Paul's solution adapted for efficiency.
4. `Semaphore`: Also implements `BoundedSemaphore`.
5. `Condition`.
# 3. Other primitives
Included as examples of user-contributed primitives - see final section.
1. `Message`: Awaitable `Event` subclass with a data payload.
2. `Barrier`: Multiple tasks wait until all are either waiting on a Barrier
instance or have triggered the instance without waiting. Similar to `gather`
without the controlling coro: a barrier is shared between peers and may be
used in loops.
# 4. Test scripts
Hopefully these are self-documenting on import.
1. `prim_test.py` Tests for synchronisation primitives. Runs on MicroPython and
CPython V3.5-3.8. Demonstrates that MicroPython primitives behave similarly to
native CPython ones.
2. `test_fast_scheduling.py` Demonstrates difference between normal and priority
I/O scheduling. Runs on Pyboard.
3. `ms_timer.py` and `ms_timer_test.py` A practical use of priority scheduling to
implement a timer with higher precision than `asyncio.sleep_ms`. Runs on Pyboard.
4. `test_can.py` Demonstrates differences in behaviour between CPython 3.8 and
MicroPython. See code comments.
# 5. CPython compatibility of user primitives
`Message` is common to CPython and MicroPython.
There are two implementations of `Barrier` with the same functionality: a CPython
version and a MicroPython version with specific optimisations. The `Barrier` class
is loosely based on
[a Microsoft concept](https://docs.microsoft.com/en-us/windows/win32/sync/synchronization-barriers).
## 5.1 Directory structure of primitives
MicroPython optimised primitives are in `uasyncio/`. Primitives compatible with
`asyncio` are in `primitives/`.
# 6. Future uasyncio implementations
If part of `uasyncio` is to be implemented in C, it would be good if the following
capabilities were retained:
1. The ability to subclass the `asyncio` compatible primitives.
2. The ability to subclass `uasyncio.Primitive` (or provide other access to that
functionality).
3. A means of replacing the timebase by one based on the RTC for low power
applications.
4. A means of creating awaitable classes (e.g. `__iter__`).
# 7. Revisiting topics discussed via email
I am revising my tutorial to promote Python 3.8 syntax and to verify that code
samples run under MicroPython and CPython 3.8. I'm removing references to event
loop methods except for one minor section. This describes how to code for
compatibility with CPython versions 3.5-3.7.
Here are my observations on issues previously discussed.
## 7.1 Awaitable classes
I now have portable code which produces no syntax errors under CPython 3.8. It
is arguably hacky but a similar hack is required for V2. Nobody has complained.
```python
up = False # Running under MicroPython?
try:
import uasyncio as asyncio
up = True # Or can use sys.implementation.name
except ImportError:
import asyncio
async def times_two(n): # Coro to await
await asyncio.sleep(1)
return 2 * n
class Foo():
def __await__(self):
res = 1
for n in range(5):
print('__await__ called')
if up: # MicroPython
res = yield from times_two(res)
else: # CPython
res = yield from times_two(res).__await__()
return res
__iter__ = __await__ # MicroPython compatibility
async def bar():
foo = Foo() # foo is awaitable
print('waiting for foo')
res = await foo # Retrieve value
print('done', res)
asyncio.run(bar())
```
## 7.2 run_forever() behaviour
In an email I commented that the following code sample never terminates under
CPython 3.8, whereas under MicroPython it does:
```python
try:
import asyncio
except ImportError:
import uasyncio as asyncio
async def test():
print("test")
for _ in range(2):
await asyncio.sleep(0)
print('test2')
await asyncio.sleep(0.5)
print('Done')
loop=asyncio.get_event_loop()
loop.create_task(test())
loop.run_forever()
# asyncio.run(test())
```
While the observation is true, using the preferred (commented out) syntax it
terminates in CPython 3.8 and in MicroPython. My view is that it's not worth
fixing.

Wyświetl plik

@ -1,378 +0,0 @@
/*
* This file is part of the MicroPython project, http://micropython.org/
*
* The MIT License (MIT)
*
* Copyright (c) 2014 Damien P. George
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "py/mpconfig.h"
#if MICROPY_PY_USELECT
#include <stdio.h>
#include "py/runtime.h"
#include "py/obj.h"
#include "py/objlist.h"
#include "py/stream.h"
#include "py/mperrno.h"
#include "py/mphal.h"
// Flags for poll()
#define FLAG_ONESHOT (1)
/// \module select - Provides select function to wait for events on a stream
///
/// This module provides the select function.
typedef struct _poll_obj_t {
mp_obj_t obj;
mp_uint_t (*ioctl)(mp_obj_t obj, mp_uint_t request, mp_uint_t arg, int *errcode);
mp_uint_t flags;
mp_uint_t flags_ret;
} poll_obj_t;
STATIC void poll_map_add(mp_map_t *poll_map, const mp_obj_t *obj, mp_uint_t obj_len, mp_uint_t flags, bool or_flags) {
for (mp_uint_t i = 0; i < obj_len; i++) {
mp_map_elem_t *elem = mp_map_lookup(poll_map, mp_obj_id(obj[i]), MP_MAP_LOOKUP_ADD_IF_NOT_FOUND);
if (elem->value == NULL) {
// object not found; get its ioctl and add it to the poll list
const mp_stream_p_t *stream_p = mp_get_stream_raise(obj[i], MP_STREAM_OP_IOCTL);
poll_obj_t *poll_obj = m_new_obj(poll_obj_t);
poll_obj->obj = obj[i];
poll_obj->ioctl = stream_p->ioctl;
poll_obj->flags = flags;
poll_obj->flags_ret = 0;
elem->value = poll_obj;
} else {
// object exists; update its flags
if (or_flags) {
((poll_obj_t*)elem->value)->flags |= flags;
} else {
((poll_obj_t*)elem->value)->flags = flags;
}
}
}
}
// poll each object in the map
STATIC mp_uint_t poll_map_poll(mp_map_t *poll_map, mp_uint_t *rwx_num) {
mp_uint_t n_ready = 0;
for (mp_uint_t i = 0; i < poll_map->alloc; ++i) {
if (!MP_MAP_SLOT_IS_FILLED(poll_map, i)) {
continue;
}
poll_obj_t *poll_obj = (poll_obj_t*)poll_map->table[i].value;
int errcode;
mp_int_t ret = poll_obj->ioctl(poll_obj->obj, MP_STREAM_POLL, poll_obj->flags, &errcode);
poll_obj->flags_ret = ret;
if (ret == -1) {
// error doing ioctl
mp_raise_OSError(errcode);
}
if (ret != 0) {
// object is ready
n_ready += 1;
if (rwx_num != NULL) {
if (ret & MP_STREAM_POLL_RD) {
rwx_num[0] += 1;
}
if (ret & MP_STREAM_POLL_WR) {
rwx_num[1] += 1;
}
if ((ret & ~(MP_STREAM_POLL_RD | MP_STREAM_POLL_WR)) != 0) {
rwx_num[2] += 1;
}
}
}
}
return n_ready;
}
/// \function select(rlist, wlist, xlist[, timeout])
STATIC mp_obj_t select_select(uint n_args, const mp_obj_t *args) {
// get array data from tuple/list arguments
size_t rwx_len[3];
mp_obj_t *r_array, *w_array, *x_array;
mp_obj_get_array(args[0], &rwx_len[0], &r_array);
mp_obj_get_array(args[1], &rwx_len[1], &w_array);
mp_obj_get_array(args[2], &rwx_len[2], &x_array);
// get timeout
mp_uint_t timeout = -1;
if (n_args == 4) {
if (args[3] != mp_const_none) {
#if MICROPY_PY_BUILTINS_FLOAT
float timeout_f = mp_obj_get_float(args[3]);
if (timeout_f >= 0) {
timeout = (mp_uint_t)(timeout_f * 1000);
}
#else
timeout = mp_obj_get_int(args[3]) * 1000;
#endif
}
}
// merge separate lists and get the ioctl function for each object
mp_map_t poll_map;
mp_map_init(&poll_map, rwx_len[0] + rwx_len[1] + rwx_len[2]);
poll_map_add(&poll_map, r_array, rwx_len[0], MP_STREAM_POLL_RD, true);
poll_map_add(&poll_map, w_array, rwx_len[1], MP_STREAM_POLL_WR, true);
poll_map_add(&poll_map, x_array, rwx_len[2], MP_STREAM_POLL_ERR | MP_STREAM_POLL_HUP, true);
mp_uint_t start_tick = mp_hal_ticks_ms();
rwx_len[0] = rwx_len[1] = rwx_len[2] = 0;
for (;;) {
// poll the objects
mp_uint_t n_ready = poll_map_poll(&poll_map, rwx_len);
if (n_ready > 0 || (timeout != -1 && mp_hal_ticks_ms() - start_tick >= timeout)) {
// one or more objects are ready, or we had a timeout
mp_obj_t list_array[3];
list_array[0] = mp_obj_new_list(rwx_len[0], NULL);
list_array[1] = mp_obj_new_list(rwx_len[1], NULL);
list_array[2] = mp_obj_new_list(rwx_len[2], NULL);
rwx_len[0] = rwx_len[1] = rwx_len[2] = 0;
for (mp_uint_t i = 0; i < poll_map.alloc; ++i) {
if (!MP_MAP_SLOT_IS_FILLED(&poll_map, i)) {
continue;
}
poll_obj_t *poll_obj = (poll_obj_t*)poll_map.table[i].value;
if (poll_obj->flags_ret & MP_STREAM_POLL_RD) {
((mp_obj_list_t*)list_array[0])->items[rwx_len[0]++] = poll_obj->obj;
}
if (poll_obj->flags_ret & MP_STREAM_POLL_WR) {
((mp_obj_list_t*)list_array[1])->items[rwx_len[1]++] = poll_obj->obj;
}
if ((poll_obj->flags_ret & ~(MP_STREAM_POLL_RD | MP_STREAM_POLL_WR)) != 0) {
((mp_obj_list_t*)list_array[2])->items[rwx_len[2]++] = poll_obj->obj;
}
}
mp_map_deinit(&poll_map);
return mp_obj_new_tuple(3, list_array);
}
MICROPY_EVENT_POLL_HOOK
}
}
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_select_select_obj, 3, 4, select_select);
/// \class Poll - poll class
typedef struct _mp_obj_poll_t {
mp_obj_base_t base;
mp_map_t poll_map;
short iter_cnt;
short iter_idx;
int flags;
// callee-owned tuple
mp_obj_t ret_tuple;
} mp_obj_poll_t;
/// \method register(obj[, eventmask])
STATIC mp_obj_t poll_register(uint n_args, const mp_obj_t *args) {
mp_obj_poll_t *self = args[0];
mp_uint_t flags;
if (n_args == 3) {
flags = mp_obj_get_int(args[2]);
} else {
flags = MP_STREAM_POLL_RD | MP_STREAM_POLL_WR;
}
poll_map_add(&self->poll_map, &args[1], 1, flags, false);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(poll_register_obj, 2, 3, poll_register);
/// \method unregister(obj)
STATIC mp_obj_t poll_unregister(mp_obj_t self_in, mp_obj_t obj_in) {
mp_obj_poll_t *self = self_in;
mp_map_lookup(&self->poll_map, mp_obj_id(obj_in), MP_MAP_LOOKUP_REMOVE_IF_FOUND);
// TODO raise KeyError if obj didn't exist in map
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_2(poll_unregister_obj, poll_unregister);
/// \method modify(obj, eventmask)
STATIC mp_obj_t poll_modify(mp_obj_t self_in, mp_obj_t obj_in, mp_obj_t eventmask_in) {
mp_obj_poll_t *self = self_in;
mp_map_elem_t *elem = mp_map_lookup(&self->poll_map, mp_obj_id(obj_in), MP_MAP_LOOKUP);
if (elem == NULL) {
mp_raise_OSError(MP_ENOENT);
}
((poll_obj_t*)elem->value)->flags = mp_obj_get_int(eventmask_in);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_3(poll_modify_obj, poll_modify);
STATIC mp_uint_t poll_poll_internal(uint n_args, const mp_obj_t *args) {
mp_obj_poll_t *self = args[0];
// work out timeout (its given already in ms)
mp_uint_t timeout = -1;
int flags = 0;
if (n_args >= 2) {
if (args[1] != mp_const_none) {
mp_int_t timeout_i = mp_obj_get_int(args[1]);
if (timeout_i >= 0) {
timeout = timeout_i;
}
}
if (n_args >= 3) {
flags = mp_obj_get_int(args[2]);
}
}
self->flags = flags;
mp_uint_t start_tick = mp_hal_ticks_ms();
mp_uint_t n_ready;
for (;;) {
// poll the objects
n_ready = poll_map_poll(&self->poll_map, NULL);
if (n_ready > 0 || (timeout != -1 && mp_hal_ticks_ms() - start_tick >= timeout)) {
break;
}
MICROPY_EVENT_POLL_HOOK
}
return n_ready;
}
STATIC mp_obj_t poll_poll(uint n_args, const mp_obj_t *args) {
mp_obj_poll_t *self = args[0];
mp_uint_t n_ready = poll_poll_internal(n_args, args);
// one or more objects are ready, or we had a timeout
mp_obj_list_t *ret_list = mp_obj_new_list(n_ready, NULL);
n_ready = 0;
for (mp_uint_t i = 0; i < self->poll_map.alloc; ++i) {
if (!MP_MAP_SLOT_IS_FILLED(&self->poll_map, i)) {
continue;
}
poll_obj_t *poll_obj = (poll_obj_t*)self->poll_map.table[i].value;
if (poll_obj->flags_ret != 0) {
mp_obj_t tuple[2] = {poll_obj->obj, MP_OBJ_NEW_SMALL_INT(poll_obj->flags_ret)};
ret_list->items[n_ready++] = mp_obj_new_tuple(2, tuple);
if (self->flags & FLAG_ONESHOT) {
// Don't poll next time, until new event flags will be set explicitly
poll_obj->flags &= ~(poll_obj->flags_ret & (MP_STREAM_POLL_RD | MP_STREAM_POLL_WR));
}
}
}
return ret_list;
}
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(poll_poll_obj, 1, 3, poll_poll);
STATIC mp_obj_t poll_ipoll(size_t n_args, const mp_obj_t *args) {
mp_obj_poll_t *self = MP_OBJ_TO_PTR(args[0]);
if (self->ret_tuple == MP_OBJ_NULL) {
self->ret_tuple = mp_obj_new_tuple(2, NULL);
}
int n_ready = poll_poll_internal(n_args, args);
self->iter_cnt = n_ready;
self->iter_idx = 0;
return args[0];
}
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(poll_ipoll_obj, 1, 3, poll_ipoll);
STATIC mp_obj_t poll_iternext(mp_obj_t self_in) {
mp_obj_poll_t *self = MP_OBJ_TO_PTR(self_in);
if (self->iter_cnt == 0) {
return MP_OBJ_STOP_ITERATION;
}
self->iter_cnt--;
for (mp_uint_t i = self->iter_idx; i < self->poll_map.alloc; ++i) {
self->iter_idx++;
if (!MP_MAP_SLOT_IS_FILLED(&self->poll_map, i)) {
continue;
}
poll_obj_t *poll_obj = (poll_obj_t*)self->poll_map.table[i].value;
if (poll_obj->flags_ret != 0) {
mp_obj_tuple_t *t = MP_OBJ_TO_PTR(self->ret_tuple);
t->items[0] = poll_obj->obj;
t->items[1] = MP_OBJ_NEW_SMALL_INT(poll_obj->flags_ret);
if (self->flags & FLAG_ONESHOT) {
// Don't poll next time, until new event flags will be set explicitly
poll_obj->flags &= ~(poll_obj->flags_ret & (MP_STREAM_POLL_RD | MP_STREAM_POLL_WR));
}
return MP_OBJ_FROM_PTR(t);
}
}
assert(!"inconsistent number of poll active entries");
self->iter_cnt = 0;
return MP_OBJ_STOP_ITERATION;
}
STATIC const mp_rom_map_elem_t poll_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_register), MP_ROM_PTR(&poll_register_obj) },
{ MP_ROM_QSTR(MP_QSTR_unregister), MP_ROM_PTR(&poll_unregister_obj) },
{ MP_ROM_QSTR(MP_QSTR_modify), MP_ROM_PTR(&poll_modify_obj) },
{ MP_ROM_QSTR(MP_QSTR_poll), MP_ROM_PTR(&poll_poll_obj) },
{ MP_ROM_QSTR(MP_QSTR_ipoll), MP_ROM_PTR(&poll_ipoll_obj) },
};
STATIC MP_DEFINE_CONST_DICT(poll_locals_dict, poll_locals_dict_table);
STATIC const mp_obj_type_t mp_type_poll = {
{ &mp_type_type },
.name = MP_QSTR_poll,
.getiter = mp_identity_getiter,
.iternext = poll_iternext,
.locals_dict = (void*)&poll_locals_dict,
};
/// \function poll()
STATIC mp_obj_t select_poll(void) {
mp_obj_poll_t *poll = m_new_obj(mp_obj_poll_t);
poll->base.type = &mp_type_poll;
mp_map_init(&poll->poll_map, 0);
poll->iter_cnt = 0;
poll->ret_tuple = MP_OBJ_NULL;
return poll;
}
MP_DEFINE_CONST_FUN_OBJ_0(mp_select_poll_obj, select_poll);
STATIC const mp_rom_map_elem_t mp_module_select_globals_table[] = {
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_uselect) },
{ MP_ROM_QSTR(MP_QSTR_select), MP_ROM_PTR(&mp_select_select_obj) },
{ MP_ROM_QSTR(MP_QSTR_poll), MP_ROM_PTR(&mp_select_poll_obj) },
{ MP_ROM_QSTR(MP_QSTR_POLLIN), MP_ROM_INT(MP_STREAM_POLL_RD) },
{ MP_ROM_QSTR(MP_QSTR_POLLOUT), MP_ROM_INT(MP_STREAM_POLL_WR) },
{ MP_ROM_QSTR(MP_QSTR_POLLERR), MP_ROM_INT(MP_STREAM_POLL_ERR) },
{ MP_ROM_QSTR(MP_QSTR_POLLHUP), MP_ROM_INT(MP_STREAM_POLL_HUP) },
};
STATIC MP_DEFINE_CONST_DICT(mp_module_select_globals, mp_module_select_globals_table);
const mp_obj_module_t mp_module_uselect = {
.base = { &mp_type_module },
.globals = (mp_obj_dict_t*)&mp_module_select_globals,
};
#endif // MICROPY_PY_USELECT

Wyświetl plik

@ -1,34 +0,0 @@
# ms_timer.py A relatively high precision delay class for the fast_io version
# of uasyncio
import uasyncio as asyncio
import utime
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MillisecTimer(io.IOBase):
def __init__(self, fast=True):
self.end = 0
self.sreader = asyncio.StreamReader(self)
self.sreader.priority(fast)
def __iter__(self):
await self.sreader.readline()
def __call__(self, ms):
self.end = utime.ticks_add(utime.ticks_ms(), ms)
return self
def readline(self):
return b'\n'
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0:
ret |= MP_STREAM_POLL_RD
return ret

Wyświetl plik

@ -1,42 +0,0 @@
# ms_timer_test.py Test/demo program for MillisecTimer. Adapted for new uasyncio.
import uasyncio as asyncio
import utime
import ms_timer
async def timer_test(n, fast):
timer = ms_timer.MillisecTimer(fast)
while True:
t = utime.ticks_ms()
await timer(30)
print('Task {} time {}ms'.format(n, utime.ticks_diff(utime.ticks_ms(), t)))
await asyncio.sleep(0.5 + n/5)
async def foo():
while True:
await asyncio.sleep(0)
utime.sleep_ms(10) # Emulate slow processing
def main(fast=True):
for _ in range(10):
asyncio.create_task(foo())
for n in range(3):
asyncio.create_task(timer_test(n, fast))
await asyncio.sleep(10)
def test(fast=True):
asyncio.run(main(fast))
s = '''This test creates ten tasks each of which blocks for 10ms.
It also creates three tasks each of which runs a MillisecTimer for 30ms,
timing the period which elapses while it runs. With fast I/O scheduling
the elapsed time is ~30ms as expected. With normal scheduling it is
about 130ms because of competetion from the blocking coros.
Run test() to test fast I/O, test(False) to test normal I/O.
Test prints the task number followed by the actual elapsed time in ms.
Test runs for 10s.'''
print(s)

Wyświetl plik

@ -1,38 +0,0 @@
Context: uasyncio StreamReader and StreamWriter objects hang indefinitely under fault conditions.
Under fault conditions uasyncio expects to receive POLLERR or POLLHUP conditions from the poll instance. In my testing this never occurs.
Testing was of client connections. Socket type was SOCK_STREAM. Two fault conditions were tested:
1. Server outage.
2. Socket closed by another coroutine.
Testing was performed with a server running under the Unix build. Clients were tested on:
1. Unix build (on same machine as server).
2. ESP8266.
3. ESP32.
Results were as follows. Numbers represent the event no. received from the poll instance. "No trigger" means that the poll instance produced no response after the fault. On all platforms where the client was reading, a server outage produced a POLLIN (1) response. On all but ESP32 this repeated indefinitely causing the client endlessly to read empty bytes objects.
Numbers are base 10. Mode refers to the client mode. Expected refers to uasyncio.
| Mode | Platform | Outage | Closure | Expected |
|:-----:|:--------:|:-------:|:----------:|:--------:|
| Read | Unix | 1 | 32 | 9 or 17 |
| Read | ESP8266 | 1 | No trigger | 9 or 17 |
| Read | ESP32 | 1 (once)| No trigger | 9 or 17 |
| Write | Unix | OSError | 32 | 12 or 20 |
| Write | ESP8266 | OSError | No trigger | 12 or 20 |
| Write | ESP832 | OSError | No trigger | 12 or 20 |
1 == POLLIN
4 == POLLOUT
9 == (POLLIN & POLLERR)
17 == (POLLIN & POLLHUP)
12 == (POLLOUT & POLLERR)
20 == (POLLOUT & POLLHUP)
32 == I have no idea.
Test scripts may be found here:
[Server - can run in read or write mode](https://github.com/peterhinch/micropython-samples/blob/master/uasyncio_iostream/poll/server.py)
[Read client](https://github.com/peterhinch/micropython-samples/blob/master/uasyncio_iostream/poll/client_r.py)
[Write client](https://github.com/peterhinch/micropython-samples/blob/master/uasyncio_iostream/poll/client_w.py)

Wyświetl plik

@ -1,55 +0,0 @@
# client_r.py Test poll object's response to two fault conditions under Unix and ESP8266
import usocket as socket
import uasyncio as asyncio
import uselect as select
server_addr = socket.getaddrinfo('192.168.0.35', 8123)[0][-1]
s = socket.socket()
s.connect(server_addr) # Expect OSError if server down
poller = select.poll()
poller.register(s, select.POLLIN)
s.setblocking(False)
success = False
async def run():
global success
ok = True
try:
while ok:
res = poller.ipoll(10)
for sock, ev in res:
if ev & select.POLLIN:
r = sock.readline()
print(ev, r)
# A server outage prints 1, b'' forever on ESP8266 or Unix.
# If killer closes socket on ESP8266 ev is always 1,
# on Unix get ev == 32
# Never see 9 or 17 (base 10) which are the error responses expected by uasyncio
# (POLLIN & POLLERR or POLLIN & POLLHUP)
else: # The only way I can make it work (on Unix) is to quit on 32
print('Terminating event:', ev) # What is 32??
ok = False
break
await asyncio.sleep(0)
except OSError:
print('Got OSError') # Never happens
success = True # Detected socket closure or error by OSError or event
async def killer():
await asyncio.sleep(5)
print('closing socket')
s.close()
for n in range(3, -1, -1):
print('Shutdown in {}s'.format(n)) # Leave time for response from run()
await asyncio.sleep(1)
if success:
print('Success: detected error/socket closure.')
else:
print('Failed to detect error/socket closure.')
loop = asyncio.get_event_loop()
loop.create_task(run())
try:
loop.run_until_complete(killer())
finally:
s.close()

Wyświetl plik

@ -1,55 +0,0 @@
# client_w.py Test poll object's response to two fault conditions under Unix and ESP8266
import usocket as socket
import uasyncio as asyncio
import uselect as select
server_addr = socket.getaddrinfo('192.168.0.35', 8123)[0][-1]
s = socket.socket()
s.connect(server_addr) # Expect OSError if server down
poller = select.poll()
poller.register(s, select.POLLOUT)
s.setblocking(False)
success = False
async def run():
global success
ok = True
try:
while ok:
res = poller.ipoll(10)
for sock, ev in res:
if ev & select.POLLOUT:
r = sock.send(b'0123456789\n')
print(ev, r)
# On ESP8266 if another task closes the socket the poll object
# never triggers. uasyncio expects it to trigger with POLLHUP or
# (POLLOUT & POLLERR or POLLOUT & POLLHUP)
# If server fails gets OSError on both platforms.
else: # But on Unix socket closure produces ev == 32
print('Terminating event:', ev) # What is 32??
ok = False
break
await asyncio.sleep(1)
await asyncio.sleep(0)
except OSError:
print('Got OSError') # Happens on ESP8266 if server fails
success = True # Detected socket closure or error by OSError or event
async def killer():
await asyncio.sleep(5)
print('closing socket')
s.close()
for n in range(3, -1, -1):
print('Shutdown in {}s'.format(n)) # Leave time for response from run()
await asyncio.sleep(1)
if success:
print('Success: detected error/socket closure.')
else:
print('Failed to detect error/socket closure.')
loop = asyncio.get_event_loop()
loop.create_task(run())
try:
loop.run_until_complete(killer())
finally:
s.close()

Wyświetl plik

@ -1,44 +0,0 @@
# Minimal stream based socket seerver. To test client exception handling. Can
# only handle a single client but will cope with client failure and reconnect.
# Run under MicroPython Unix build.
import usocket as socket
import utime
addr = socket.getaddrinfo('0.0.0.0', 8123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(addr)
s.listen(1)
def run(write=True):
try:
while True:
print('Awaiting connection.')
try:
conn, addr = s.accept()
except OSError as er:
print('Connect fail:', er.args[0])
conn.close()
continue
print('Got connection from', addr)
try:
while True:
if write:
conn.send(b'0123456789\n') # OSError on fail
utime.sleep(1)
else:
line = conn.readline()
if line == b'':
print('Connection fail')
break
else:
print(line)
except OSError:
conn.close()
finally:
conn.close()
s.close()
print('run() to send lines of 11 bytes on port 8123,')
print('run(False) to read lines')

Wyświetl plik

@ -1,454 +0,0 @@
# prim_test.py Test/demo of the 'micro' synchronisation primitives
# for the new uasyncio
# The MIT License (MIT)
#
# Copyright (c) 2017-2019 Peter Hinch
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
try:
import asyncio
except ImportError:
# Specific imports under MicroPython to conserve RAM
import uasyncio as asyncio
import uasyncio.lock
import uasyncio.event
import uasyncio.semaphore
import uasyncio.condition
import uasyncio.queue
from uasyncio.barrier import Barrier # MicroPython optimised
else:
from primitives.barrier import Barrier # CPython generic
from primitives.message import Message # Portable
def print_tests():
st = '''Available functions:
print_tests() Print this list.
ack_test() Test event acknowledge and Message class.
message_test() Test Message class.
event_test() Test Event and Lock objects.
barrier_test() Test the Barrier class.
semaphore_test(bounded=False) Test Semaphore or BoundedSemaphore.
condition_test() Test the Condition class.
queue_test() Test the Queue class
Recommended to issue ctrl-D after running each test.
'''
print('\x1b[32m')
print(st)
print('\x1b[39m')
print_tests()
def printexp(exp, runtime=0):
print('Expected output:')
print('\x1b[32m')
print(exp)
print('\x1b[39m')
if runtime:
print('Running (runtime = {}s):'.format(runtime))
else:
print('Running (runtime < 1s):')
# ************ Test Message class ************
# Demo use of acknowledge event
async def event_wait(message, ack_event, n):
await message
print('Eventwait {} got message with value {}'.format(n, message.value()))
ack_event.set()
async def run_ack():
message = Message()
ack1 = asyncio.Event()
ack2 = asyncio.Event()
count = 0
while True:
asyncio.create_task(event_wait(message, ack1, 1))
asyncio.create_task(event_wait(message, ack2, 2))
message.set(count)
count += 1
print('message was set')
await ack1.wait()
ack1.clear()
print('Cleared ack1')
await ack2.wait()
ack2.clear()
print('Cleared ack2')
message.clear()
print('Cleared message')
await asyncio.sleep(1)
async def ack_coro(delay):
asyncio.create_task(run_ack())
await asyncio.sleep(delay)
print("I've seen attack ships burn on the shoulder of Orion...")
print("Time to die...")
def ack_test():
printexp('''Running (runtime = 10s):
message was set
Eventwait 1 got message with value 0
Eventwait 2 got message with value 0
Cleared ack1
Cleared ack2
Cleared message
message was set
Eventwait 1 got message with value 1
Eventwait 2 got message with value 1
Cleared ack1
Cleared ack2
Cleared message
message was set
... text omitted ...
Eventwait 1 got message with value 9
Eventwait 2 got message with value 9
Cleared ack1
Cleared ack2
Cleared message
I've seen attack ships burn on the shoulder of Orion...
Time to die...
''', 10)
asyncio.get_event_loop().run_until_complete(ack_coro(10))
# ************ Test Message class ************
async def wait_message(message):
print('Waiting for message')
msg = await message
message.clear()
print('Got message {}'.format(msg))
async def run_message_test():
message = Message()
asyncio.create_task(wait_message(message))
await asyncio.sleep(1)
message.set('Hello world')
await asyncio.sleep(1)
def message_test():
printexp('''Running (runtime = 2s):
Waiting for message
Got message Hello world
''', 2)
asyncio.get_event_loop().run_until_complete(run_message_test())
# ************ Test Lock and Event classes ************
async def run_lock(n, lock):
print('run_lock {} waiting for lock'.format(n))
await lock.acquire()
print('run_lock {} acquired lock'.format(n))
await asyncio.sleep(1) # Delay to demo other coros waiting for lock
lock.release()
print('run_lock {} released lock'.format(n))
async def eventset(event):
print('Waiting 5 secs before setting event')
await asyncio.sleep(5)
event.set()
print('event was set')
async def eventwait(event):
print('waiting for event')
await event.wait()
print('got event')
event.clear()
async def run_event_test():
print('Test Lock class')
lock = asyncio.Lock()
asyncio.create_task(run_lock(1, lock))
asyncio.create_task(run_lock(2, lock))
asyncio.create_task(run_lock(3, lock))
print('Test Event class')
event = asyncio.Event()
asyncio.create_task(eventset(event))
await eventwait(event) # run_event_test runs fast until this point
print('Event status {}'.format('Incorrect' if event.is_set() else 'OK'))
print('Tasks complete')
def event_test():
printexp('''Test Lock class
Test Event class
waiting for event
run_lock 1 waiting for lock
run_lock 1 acquired lock
run_lock 2 waiting for lock
run_lock 3 waiting for lock
Waiting 5 secs before setting event
run_lock 1 released lock
run_lock 2 acquired lock
run_lock 2 released lock
run_lock 3 acquired lock
run_lock 3 released lock
event was set
got event
Event status OK
Tasks complete
''', 5)
asyncio.get_event_loop().run_until_complete(run_event_test())
# ************ Barrier test ************
async def main(duration):
barrier = Barrier(3, callback, ('Synch',))
for _ in range(3):
asyncio.create_task(report(barrier))
await asyncio.sleep(duration)
def callback(text):
print(text)
async def report(barrier):
for i in range(5):
print('{} '.format(i), end='')
await barrier
def barrier_test():
printexp('''0 0 0 Synch
1 1 1 Synch
2 2 2 Synch
3 3 3 Synch
4 4 4 Synch
''')
asyncio.get_event_loop().run_until_complete(main(2))
# ************ Semaphore test ************
async def run_sema(n, sema, barrier):
print('run_sema {} trying to access semaphore'.format(n))
async with sema:
print('run_sema {} acquired semaphore'.format(n))
# Delay demonstrates other coros waiting for semaphore
await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout
print('run_sema {} has released semaphore'.format(n))
barrier.trigger()
async def run_sema_test(bounded):
num_coros = 5
barrier = Barrier(num_coros + 1)
if bounded:
semaphore = asyncio.BoundedSemaphore(3)
else:
semaphore = asyncio.Semaphore(3)
for n in range(num_coros):
asyncio.create_task(run_sema(n, semaphore, barrier))
await barrier # Quit when all coros complete
try:
semaphore.release()
except ValueError:
print('Bounded semaphore exception test OK')
def semaphore_test(bounded=False):
if bounded:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 4 acquired semaphore
run_sema 1 has released semaphore
run_sema 3 acquired semaphore
run_sema 2 has released semaphore
run_sema 4 has released semaphore
run_sema 3 has released semaphore
Bounded semaphore exception test OK
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
else:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 3 acquired semaphore
run_sema 1 has released semaphore
run_sema 4 acquired semaphore
run_sema 2 has released semaphore
run_sema 3 has released semaphore
run_sema 4 has released semaphore
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
printexp(exp, 3)
asyncio.get_event_loop().run_until_complete(run_sema_test(bounded))
# ************ Condition test ************
tim = 0
async def cond01():
while True:
await asyncio.sleep(2)
with await cond:
cond.notify(2) # Notify 2 tasks
async def cond03(): # Maintain a count of seconds
global tim
await asyncio.sleep(0.5)
while True:
await asyncio.sleep(1)
tim += 1
async def cond01_new(cond):
while True:
await asyncio.sleep(2)
async with cond:
cond.notify(2) # Notify 2 tasks
async def cond03_new(): # Maintain a count of seconds
global tim
await asyncio.sleep(0.5)
while True:
await asyncio.sleep(1)
tim += 1
async def cond02(n, cond, barrier):
async with cond:
print('cond02', n, 'Awaiting notification.')
await cond.wait()
print('cond02', n, 'triggered. tim =', tim)
barrier.trigger()
def predicate():
return tim >= 8 # 12
async def cond04(n, cond, barrier):
async with cond:
print('cond04', n, 'Awaiting notification and predicate.')
await cond.wait_for(predicate)
print('cond04', n, 'triggered. tim =', tim)
barrier.trigger()
async def cond_go():
cond = asyncio.Condition()
ntasks = 7
barrier = Barrier(ntasks + 1)
t1 = asyncio.create_task(cond01_new(cond))
t3 = asyncio.create_task(cond03_new())
for n in range(ntasks):
asyncio.create_task(cond02(n, cond, barrier))
await barrier # All instances of cond02 have completed
# Test wait_for
barrier = Barrier(2)
asyncio.create_task(cond04(99, cond, barrier))
await barrier
# cancel continuously running coros.
t1.cancel()
t3.cancel()
await asyncio.sleep(0)
print('Done.')
def condition_test():
printexp('''cond02 0 Awaiting notification.
cond02 1 Awaiting notification.
cond02 2 Awaiting notification.
cond02 3 Awaiting notification.
cond02 4 Awaiting notification.
cond02 5 Awaiting notification.
cond02 6 Awaiting notification.
cond02 5 triggered. tim = 1
cond02 6 triggered. tim = 1
cond02 3 triggered. tim = 3
cond02 4 triggered. tim = 3
cond02 1 triggered. tim = 5
cond02 2 triggered. tim = 5
cond02 0 triggered. tim = 7
cond04 99 Awaiting notification and predicate.
cond04 99 triggered. tim = 9
Done.
''', 13)
asyncio.get_event_loop().run_until_complete(cond_go())
# ************ Queue test ************
async def fillq(myq):
for x in range(8):
print('Waiting to put item {} on queue'.format(x))
await myq.put(x)
async def mtq(myq):
await asyncio.sleep(1) # let q fill
while myq.qsize():
res = await myq.get()
print('Retrieved {} from queue'.format(res))
await asyncio.sleep(0.2)
async def queue_go():
myq = asyncio.Queue(5)
asyncio.create_task(fillq(myq))
await mtq(myq)
t = asyncio.create_task(fillq(myq))
await asyncio.sleep(1)
print('Queue filled. Cancelling fill task. Queue should be full.')
t.cancel()
await mtq(myq)
t = asyncio.create_task(myq.get())
await asyncio.sleep(1)
print('Cancelling attempt to get from empty queue.')
t.cancel()
print('Queue size:', myq.qsize())
def queue_test():
printexp('''Running (runtime = 7s):
Waiting to put item 0 on queue
Waiting to put item 1 on queue
Waiting to put item 2 on queue
Waiting to put item 3 on queue
Waiting to put item 4 on queue
Waiting to put item 5 on queue
Retrieved 0 from queue
Waiting to put item 6 on queue
Retrieved 1 from queue
Waiting to put item 7 on queue
Retrieved 2 from queue
Retrieved 3 from queue
Retrieved 4 from queue
Retrieved 5 from queue
Retrieved 6 from queue
Retrieved 7 from queue
Waiting to put item 0 on queue
Waiting to put item 1 on queue
Waiting to put item 2 on queue
Waiting to put item 3 on queue
Waiting to put item 4 on queue
Waiting to put item 5 on queue
Queue filled. Cancelling fill task. Queue should be full.
Retrieved 0 from queue
Retrieved 1 from queue
Retrieved 2 from queue
Retrieved 3 from queue
Retrieved 4 from queue
Cancelling attempt to get from empty queue.
Queue size: 0
''', 7)
asyncio.get_event_loop().run_until_complete(queue_go())

Wyświetl plik

@ -1,73 +0,0 @@
# Generic Barrier class: runs under CPython 3.8
# A Barrier synchronises N coros. In normal use each issues await barrier.
# Execution pauses until all other participant coros are waiting on it.
# At that point the callback is executed. Then the barrier is 'opened' and
# execution of all participants resumes.
# .trigger enables a coro to signal it has passed the barrier without waiting.
import asyncio
# Ignore "coroutine '_g' was never awaited" warning.
async def _g():
pass
type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_coro):
asyncio.create_task(res)
class Barrier():
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._func = func
self._args = args
self._reset(True)
def __await__(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
return
direction = self._down
while True: # Wait until last waiting thread changes the direction
if direction != self._down:
return
yield
def trigger(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')

Wyświetl plik

@ -1,34 +0,0 @@
# message.py
# A coro waiting on a message issues msg = await message_instance
# A coro rasing the message issues message_instance.set(msg)
# When all waiting coros have run
# message_instance.clear() should be issued
try:
import asyncio
except ImportError:
import uasyncio as asyncio
class Message(asyncio.Event):
def __init__(self):
super().__init__()
self._data = None
def clear(self):
super().clear()
def __await__(self):
yield from self.wait().__await__() # CPython
return self._data
def __iter__(self):
yield from self.wait() # MicroPython
return self._data
def set(self, data=None):
super().set()
self._data = data
def value(self):
return self._data

Wyświetl plik

@ -1,62 +0,0 @@
# Shows that MicroPython seems to cancel a task earlier than CPython
# Also demonstrates that CPython cancels tasks when run() terminates.
try:
import asyncio
except ImportError:
import uasyncio as asyncio
async def foo(n):
try:
while True:
await asyncio.sleep(0)
print(n)
except asyncio.CancelledError:
print('Task {} canned.'.format(n))
raise
async def main(n):
tasks = []
for n in range(3):
tasks.append(asyncio.create_task(foo(n)))
for _ in range(n):
await asyncio.sleep(0)
print('Cancelling task 1')
tasks[1].cancel()
for _ in range(3):
await asyncio.sleep(0)
asyncio.run(main(n=3))
# CPython 3.8
#>>> import test_can
#0
#1
#2
#Cancelling task 1
#0
#Task 1 canned.
#2
#0
#2
#0
#2
#0
#2
#Task 0 canned.
#Task 2 canned.
#>>>
# MicroPython
#>>> import test_can
#0
#1
#2
#Cancelling task 1
#Task 1 canned.
#0
#2
#0
#2
#0
#2
#>>>

Wyświetl plik

@ -1,86 +0,0 @@
# test_fast_scheduling.py Test fast_io PR
# https://github.com/micropython/micropython-lib/pull/287
# Test is designed to quantify the difference between fast and normal I/O without
# recourse to electronic testgear. Run on Pyboard.
# The MyIO class supports .readline() which updates a call counter and clears the
# .ready_rd flag. This is set by a timer (emulating the arrival of data from
# some hardware device).
# The .dummy method emulates a relatively slow user coro which yields with a zero
# delay; the test runs 10 instances of this. Each instance updates a common call
# counter.
# The receiver coro awaits .readline continuously. With normal scheduling each is
# scheduled after the ten .dummy instances have run. With fast scheduling and a
# timer period <= 10ms readline and dummy alternate. If timer period is increased
# readline is sheduled progressively less frequently.
import io
import pyb
import utime
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MyIO(io.IOBase):
def __init__(self):
self.read_count = 0
self.dummy_count = 0
self.ready_rd = False
pyb.Timer(4, freq = 100, callback = self.do_input)
# Read callback: emulate asynchronous input from hardware.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
return ret
def readline(self):
self.read_count += 1
self.ready_rd = False
return b'a\n'
async def dummy(self):
while True:
await asyncio.sleep(0)
self.dummy_count += 1
utime.sleep_ms(10) # Emulate time consuming user code
async def killer(self):
print('Test runs for 5s')
await asyncio.sleep(5)
print('I/O count {} Dummy count {}'.format(self.read_count, self.dummy_count))
async def receiver(myior, fast):
sreader = asyncio.StreamReader(myior)
if fast:
sreader.priority()
while True:
await sreader.readline()
def test(fast=False):
loop = asyncio.get_event_loop()
myior = MyIO()
loop.create_task(receiver(myior, fast))
for _ in range(10):
loop.create_task(myior.dummy())
loop.run_until_complete(myior.killer())
print('Test case of I/O competing with zero delay tasks.')
print('fast False, uasyncio V2: approx I/O count 25, dummy count 510.')
print('fast False, new uasyncio: I/O count 46, dummy 509.')
print('fast True, new uasyncio: approx I/O count 509, dummy count 509.')
print('Run test() to check I/O performance at normal priority.')
print('Run test(True) to check I/O performance at high priority.')

Wyświetl plik

@ -1,28 +0,0 @@
# Tests for uasyncio iostream read/write changes
These tests perform concurrent input and output and use timers to
emulate read/write hardware.
iotest1.py Device can perform unbuffered writes only.
iotest2.py Device performs buffered writes and unbuffered reads.
iotest4.py Run test(False) for unbuffered writes and buffered reads.
iotest5.py Unbuffered read and write.
Obsolete test:
iotest3.py Demonstrated workround for failing concurrent I/O using separate
input and output objects.
Other tests:
iotest.py Measure timing of I/O scheduling with a scope.
auart.py Run a loopback test on a physical UART.
auart_hd.py Simulate a pair of devices running a half-duplex protocol over a
pair of UARTs.
# Note on I/O scheduling
Examination of the code, along with running iotest.py, demonstrates that I/O
polling does not take place as frequently as it could. In the presence of coros
which issue `yield asyncio.sleep(0)` polling occurs when all such coros have
run.
An option is to modify `core.py` to schedule I/O after each coro has run - the
drawback being a reduced rate at which these round-robin tasks are scheduled.

Wyświetl plik

@ -1,191 +0,0 @@
# aswitch.py Switch and pushbutton classes for asyncio
# Delay_ms A retriggerable delay class. Can schedule a coro on timeout.
# Switch Simple debounced switch class for normally open grounded switch.
# Pushbutton extend the above to support logical state, long press and
# double-click events
# Tested on Pyboard but should run on other microcontroller platforms
# running MicroPython and uasyncio.
# The MIT License (MIT)
#
# Copyright (c) 2017 Peter Hinch
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
try:
import asyncio_priority as asyncio
except ImportError:
import uasyncio as asyncio
import utime as time
from asyn import launch
# launch: run a callback or initiate a coroutine depending on which is passed.
class Delay_ms(object):
def __init__(self, func=None, args=(), can_alloc=True, duration=1000):
self.func = func
self.args = args
self.can_alloc = can_alloc
self.duration = duration # Default duration
self.tstop = None # Not running
self.loop = asyncio.get_event_loop()
if not can_alloc:
self.loop.create_task(self._run())
async def _run(self):
while True:
if self.tstop is None: # Not running
await asyncio.sleep_ms(0)
else:
await self.killer()
def stop(self):
self.tstop = None
def trigger(self, duration=0): # Update end time
if duration <= 0:
duration = self.duration
if self.can_alloc and self.tstop is None: # No killer task is running
self.tstop = time.ticks_add(time.ticks_ms(), duration)
# Start a task which stops the delay after its period has elapsed
self.loop.create_task(self.killer())
self.tstop = time.ticks_add(time.ticks_ms(), duration)
def running(self):
return self.tstop is not None
async def killer(self):
twait = time.ticks_diff(self.tstop, time.ticks_ms())
while twait > 0: # Must loop here: might be retriggered
await asyncio.sleep_ms(twait)
if self.tstop is None:
break # Return if stop() called during wait
twait = time.ticks_diff(self.tstop, time.ticks_ms())
if self.tstop is not None and self.func is not None:
launch(self.func, self.args) # Timed out: execute callback
self.tstop = None # Not running
class Switch(object):
debounce_ms = 50
def __init__(self, pin):
self.pin = pin # Should be initialised for input with pullup
self._open_func = False
self._close_func = False
self.switchstate = self.pin.value() # Get initial state
loop = asyncio.get_event_loop()
loop.create_task(self.switchcheck()) # Thread runs forever
def open_func(self, func, args=()):
self._open_func = func
self._open_args = args
def close_func(self, func, args=()):
self._close_func = func
self._close_args = args
# Return current state of switch (0 = pressed)
def __call__(self):
return self.switchstate
async def switchcheck(self):
loop = asyncio.get_event_loop()
while True:
state = self.pin.value()
if state != self.switchstate:
# State has changed: act on it now.
self.switchstate = state
if state == 0 and self._close_func:
launch(self._close_func, self._close_args)
elif state == 1 and self._open_func:
launch(self._open_func, self._open_args)
# Ignore further state changes until switch has settled
await asyncio.sleep_ms(Switch.debounce_ms)
class Pushbutton(object):
debounce_ms = 50
long_press_ms = 1000
double_click_ms = 400
def __init__(self, pin):
self.pin = pin # Initialise for input
self._true_func = False
self._false_func = False
self._double_func = False
self._long_func = False
self.sense = pin.value() # Convert from electrical to logical value
self.buttonstate = self.rawstate() # Initial state
loop = asyncio.get_event_loop()
loop.create_task(self.buttoncheck()) # Thread runs forever
def press_func(self, func, args=()):
self._true_func = func
self._true_args = args
def release_func(self, func, args=()):
self._false_func = func
self._false_args = args
def double_func(self, func, args=()):
self._double_func = func
self._double_args = args
def long_func(self, func, args=()):
self._long_func = func
self._long_args = args
# Current non-debounced logical button state: True == pressed
def rawstate(self):
return bool(self.pin.value() ^ self.sense)
# Current debounced state of button (True == pressed)
def __call__(self):
return self.buttonstate
async def buttoncheck(self):
loop = asyncio.get_event_loop()
if self._long_func:
longdelay = Delay_ms(self._long_func, self._long_args)
if self._double_func:
doubledelay = Delay_ms()
while True:
state = self.rawstate()
# State has changed: act on it now.
if state != self.buttonstate:
self.buttonstate = state
if state:
# Button is pressed
if self._long_func and not longdelay.running():
# Start long press delay
longdelay.trigger(Pushbutton.long_press_ms)
if self._double_func:
if doubledelay.running():
launch(self._double_func, self._double_args)
else:
# First click: start doubleclick timer
doubledelay.trigger(Pushbutton.double_click_ms)
if self._true_func:
launch(self._true_func, self._true_args)
else:
# Button release
if self._long_func and longdelay.running():
# Avoid interpreting a second click as a long push
longdelay.stop()
if self._false_func:
launch(self._false_func, self._false_args)
# Ignore state changes until switch has settled
await asyncio.sleep_ms(Pushbutton.debounce_ms)

Wyświetl plik

@ -1,25 +0,0 @@
# Test of uasyncio stream I/O using UART
# Author: Peter Hinch
# Copyright Peter Hinch 2017 Released under the MIT license
# Link X1 and X2 to test.
import uasyncio as asyncio
from pyb import UART
uart = UART(4, 9600)
async def sender():
swriter = asyncio.StreamWriter(uart, {})
while True:
await swriter.awrite('Hello uart\n')
await asyncio.sleep(2)
async def receiver():
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.readline()
print('Recieved', res)
loop = asyncio.get_event_loop()
loop.create_task(sender())
loop.create_task(receiver())
loop.run_forever()

Wyświetl plik

@ -1,106 +0,0 @@
# auart_hd.py
# Author: Peter Hinch
# Copyright Peter Hinch 2018 Released under the MIT license
# Demo of running a half-duplex protocol to a device. The device never sends
# unsolicited messages. An example is a communications device which responds
# to AT commands.
# The master sends a message to the device, which may respond with one or more
# lines of data. The master assumes that the device has sent all its data when
# a timeout has elapsed.
# In this test a physical device is emulated by the DEVICE class
# To test link X1-X4 and X2-X3
from pyb import UART
import uasyncio as asyncio
import aswitch
# Dummy device waits for any incoming line and responds with 4 lines at 1 second
# intervals.
class DEVICE():
def __init__(self, uart_no = 4):
self.uart = UART(uart_no, 9600)
self.loop = asyncio.get_event_loop()
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
loop = asyncio.get_event_loop()
loop.create_task(self._run())
async def _run(self):
responses = ['Line 1', 'Line 2', 'Line 3', 'Goodbye']
while True:
res = await self.sreader.readline()
for response in responses:
await self.swriter.awrite("{}\r\n".format(response))
# Demo the fact that the master tolerates slow response.
await asyncio.sleep_ms(300)
# The master's send_command() method sends a command and waits for a number of
# lines from the device. The end of the process is signified by a timeout, when
# a list of lines is returned. This allows line-by-line processing.
# A special test mode demonstrates the behaviour with a non-responding device. If
# None is passed, no commend is sent. The master waits for a response which never
# arrives and returns an empty list.
class MASTER():
def __init__(self, uart_no = 2, timeout=4000):
self.uart = UART(uart_no, 9600)
self.timeout = timeout
self.loop = asyncio.get_event_loop()
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
self.delay = aswitch.Delay_ms()
self.response = []
loop = asyncio.get_event_loop()
loop.create_task(self._recv())
async def _recv(self):
while True:
res = await self.sreader.readline()
self.response.append(res) # Append to list of lines
self.delay.trigger(self.timeout) # Got something, retrigger timer
async def send_command(self, command):
self.response = [] # Discard any pending messages
if command is None:
print('Timeout test.')
else:
await self.swriter.awrite("{}\r\n".format(command))
print('Command sent:', command)
self.delay.trigger(self.timeout) # Re-initialise timer
while self.delay.running():
await asyncio.sleep(1) # Wait for 4s after last msg received
return self.response
async def test():
print('This test takes 10s to complete.')
for cmd in ['Run', None]:
print()
res = await master.send_command(cmd)
# can use b''.join(res) if a single string is required.
if res:
print('Result is:')
for line in res:
print(line.decode('UTF8'), end='')
else:
print('Timed out waiting for result.')
loop = asyncio.get_event_loop()
master = MASTER()
device = DEVICE()
loop.run_until_complete(test())
# Expected output
# >>> import auart_hd
# This test takes 10s to complete.
#
# Command sent: Run
# Result is:
# Line 1
# Line 2
# Line 3
# Goodbye
#
# Timeout test.
# Timed out waiting for result.
# >>>

Wyświetl plik

@ -1,69 +0,0 @@
# iotest.py Test PR #3836 timing using GPIO pins.
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL = const(3)
MP_STREAM_POLL_RD = const(1)
y1 = pyb.Pin('Y1', pyb.Pin.OUT)
class MyIO(io.IOBase):
def __init__(self):
self.ready = False
self.count = 0
tim = pyb.Timer(4)
tim.init(freq=1) # 1Hz - 1 simulated input line per sec.
tim.callback(self.setready)
def ioctl(self, req, arg):
if req == MP_STREAM_POLL and (arg & MP_STREAM_POLL_RD):
state = pyb.disable_irq()
r = self.ready
self.ready = False
pyb.enable_irq(state)
return r
return 0
def readline(self): # Y1 goes low when I/O is serviced
y1.value(0)
return '{}\n'.format(self.count)
def setready(self, t): # Y1 goes high when I/O becomes ready
self.count += 1
y1.value(1)
self.ready = True
myio = MyIO()
async def foo(p): # Toggle a pin when scheduled
print('start foo', p)
pin = pyb.Pin(p, pyb.Pin.OUT)
while True:
pin.value(1)
await asyncio.sleep(0)
pin.value(0)
await asyncio.sleep(0)
async def receiver():
last = None
nmissed = 0
sreader = asyncio.StreamReader(myio)
while True:
res = await sreader.readline()
print('Recieved {} Missed {}'.format(res, nmissed))
ires = int(res)
if last is not None:
if last != ires -1:
print('Missed {}'.format(ires - 1))
nmissed += 1
last = ires
loop = asyncio.get_event_loop()
loop.create_task(receiver())
loop.create_task(foo('Y2'))
loop.create_task(foo('Y3'))
loop.run_forever()

Wyświetl plik

@ -1,92 +0,0 @@
# iotest1.py Test PR #3836. User class write() performs unbuffered writing.
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
def printbuf(this_io):
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
class MyIO(io.IOBase):
def __init__(self):
self.ready_rd = False
self.ready_wr = False
self.wbuf = bytearray(100) # Write buffer
self.wprint_len = 0
self.widx = 0
self.wch = b''
self.rbuf = b'ready\n' # Read buffer
pyb.Timer(4, freq = 1, callback = self.do_input)
pyb.Timer(5, freq = 10, callback = self.do_output)
# Read callback: emulate asynchronous input from hardware.
# Typically would put bytes into a ring buffer and set .ready_rd.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
# Write timer callback. Emulate hardware: if there's data in the buffer
# write some or all of it
def do_output(self, t):
if self.wch:
self.wbuf[self.widx] = self.wch
self.widx += 1
if self.wch == ord('\n'):
self.wprint_len = self.widx # Save for schedule
micropython.schedule(printbuf, self)
self.widx = 0
self.wch = b''
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
if arg & MP_STREAM_POLL_WR:
if not self.wch:
ret |= MP_STREAM_POLL_WR # Ready if no char pending
return ret
def readline(self):
self.ready_rd = False
return self.rbuf
def write(self, buf, off, sz):
self.wch = buf[off] # A real driver would trigger hardware to write a char
return 1 # No. of bytes written. uasyncio waits on ioctl write ready
myio = MyIO()
async def receiver():
sreader = asyncio.StreamReader(myio)
while True:
res = await sreader.readline()
print('Received', res)
async def sender():
swriter = asyncio.StreamWriter(myio, {})
await asyncio.sleep(5)
count = 0
while True:
count += 1
tosend = 'Wrote Hello MyIO {}\n'.format(count)
await swriter.awrite(tosend.encode('UTF8'))
# Once this has occurred reading stops. ioctl keeps being called with arg == 0
# which normally occurs once only after a read
# IOWriteDone is never yielded: is this right?
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.create_task(receiver())
loop.create_task(sender())
loop.run_forever()

Wyświetl plik

@ -1,91 +0,0 @@
# iotest2.py Test PR #3836. User class write() performs buffered writing.
# Reading is unbuffered.
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
def printbuf(this_io):
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
this_io.wbuf = b''
class MyIO(io.IOBase):
def __init__(self):
self.ready_rd = False
self.ready_wr = False
self.wbuf = b''
self.wprint_len = 0
self.ridx = 0
self.rbuf = b'ready\n' # Read buffer
pyb.Timer(4, freq = 1, callback = self.do_input)
pyb.Timer(5, freq = 10, callback = self.do_output)
# Read callback: emulate asynchronous input from hardware.
# Typically would put bytes into a ring buffer and set .ready_rd.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
# Write timer callback. Emulate hardware: if there's data in the buffer
# write some or all of it
def do_output(self, t):
if self.wbuf:
self.wprint_len = self.wbuf.find(b'\n') + 1
micropython.schedule(printbuf, self)
def ioctl(self, req, arg): # see ports/stm32/uart.c
# print('ioctl', req, arg)
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
if arg & MP_STREAM_POLL_WR:
if not self.wch:
ret |= MP_STREAM_POLL_WR # Ready if no char pending
return ret
# Test of device that produces one character at a time
def readline(self):
self.ready_rd = False
ch = self.rbuf[self.ridx]
if ch == ord('\n'):
self.ridx = 0
else:
self.ridx += 1
return chr(ch)
def write(self, buf, off, sz):
self.wbuf = buf[:]
return sz # No. of bytes written. uasyncio waits on ioctl write ready
myio = MyIO()
async def receiver():
sreader = asyncio.StreamReader(myio)
while True:
res = await sreader.readline()
print('Received', res)
async def sender():
swriter = asyncio.StreamWriter(myio, {})
await asyncio.sleep(5)
count = 0
while True:
count += 1
tosend = 'Wrote Hello MyIO {}\n'.format(count)
await swriter.awrite(tosend.encode('UTF8'))
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.create_task(receiver())
loop.create_task(sender())
loop.run_forever()

Wyświetl plik

@ -1,110 +0,0 @@
# iotest3.py Test PR #3836. User class write() performs unbuffered writing.
# This test was to demonstrate the workround to the original issue by having
# separate read and write classes.
# With modified moduselect.c and uasyncio.__init__.py the test is probably
# irrelevant.
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MyIOR(io.IOBase):
def __init__(self):
self.ready_rd = False
self.rbuf = b'ready\n' # Read buffer
pyb.Timer(4, freq = 1, callback = self.do_input)
# Read callback: emulate asynchronous input from hardware.
# Typically would put bytes into a ring buffer and set .ready_rd.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if not arg:
print('ioctl arg 0')
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
return ret
def readline(self):
self.ready_rd = False
return self.rbuf
# MyIOW emulates a write-only device which can only handle one character at a
# time. The write() method is called by uasyncio. A real driver would cause the
# hardware to write a character. By setting .wch it causes the ioctl to report
# a not ready status.
# Some time later an asynchronous event occurs, indicating that the hardware
# has written a character and is ready for another. In this demo this is done
# by the timer callback do_output(), which clears .wch so that ioctl returns
# a ready status. For the demo it stores the characters in .wbuf for printing.
def printbuf(this_io):
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
class MyIOW(io.IOBase):
def __init__(self):
self.wbuf = bytearray(20) # Buffer for printing
self.wprint_len = 0
self.widx = 0
self.wch = b''
wtim = pyb.Timer(5, freq = 10, callback = self.do_output)
# Write timer callback. Emulate hardware: if there's data in the buffer
# write some or all of it
def do_output(self, t):
if self.wch:
self.wbuf[self.widx] = self.wch
self.widx += 1
if self.wch == ord('\n'):
self.wprint_len = self.widx # Save for schedule
micropython.schedule(printbuf, self)
self.widx = 0
self.wch = b''
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_WR:
if not self.wch:
ret |= MP_STREAM_POLL_WR # Ready if no char pending
return ret
def write(self, buf, off, sz):
self.wch = buf[off] # A real driver would trigger hardware to write a char
return 1 # No. of bytes written. uasyncio waits on ioctl write ready
myior = MyIOR()
myiow = MyIOW()
async def receiver():
sreader = asyncio.StreamReader(myior)
while True:
res = await sreader.readline()
print('Received', res)
async def sender():
swriter = asyncio.StreamWriter(myiow, {})
count = 0
while True:
count += 1
tosend = 'Wrote Hello MyIO {}\n'.format(count)
await swriter.awrite(tosend.encode('UTF8'))
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.create_task(receiver())
loop.create_task(sender())
loop.run_forever()

Wyświetl plik

@ -1,107 +0,0 @@
# iotest4.py Test PR #3836.
# User class write() performs unbuffered writing.
# For simplicity this uses buffered read: unbuffered is tested by iotest2.py.
# This test was to demonstrate the original issue.
# With modified moduselect.c and uasyncio.__init__.py the test now passes.
# iotest4.test() uses separate read and write objects.
# iotest4.test(False) uses a common object (failed without the mod).
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
def printbuf(this_io):
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
class MyIO(io.IOBase):
def __init__(self, read=False, write=False):
self.ready_rd = False # Read and write not ready
self.wch = b''
if read:
self.rbuf = b'ready\n' # Read buffer
pyb.Timer(4, freq = 1, callback = self.do_input)
if write:
self.wbuf = bytearray(100) # Write buffer
self.wprint_len = 0
self.widx = 0
pyb.Timer(5, freq = 10, callback = self.do_output)
# Read callback: emulate asynchronous input from hardware.
# Typically would put bytes into a ring buffer and set .ready_rd.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
# Write timer callback. Emulate hardware: if there's data in the buffer
# write some or all of it
def do_output(self, t):
if self.wch:
self.wbuf[self.widx] = self.wch
self.widx += 1
if self.wch == ord('\n'):
self.wprint_len = self.widx # Save for schedule
micropython.schedule(printbuf, self)
self.widx = 0
self.wch = b''
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
if arg & MP_STREAM_POLL_WR:
if not self.wch:
ret |= MP_STREAM_POLL_WR # Ready if no char pending
return ret
# Emulate a device with buffered read. Return the buffer, falsify read ready
# Read timer sets ready.
def readline(self):
self.ready_rd = False
return self.rbuf
# Emulate unbuffered hardware which writes one character: uasyncio waits
# until hardware is ready for the next. Hardware ready is emulated by write
# timer callback.
def write(self, buf, off=0, sz=0):
self.wch = buf[off] # Hardware starts to write a char
return 1 # 1 byte written. uasyncio waits on ioctl write ready
async def receiver(myior):
sreader = asyncio.StreamReader(myior)
while True:
res = await sreader.readline()
print('Received', res)
async def sender(myiow):
swriter = asyncio.StreamWriter(myiow, {})
await asyncio.sleep(5)
count = 0
while True:
count += 1
tosend = 'Wrote Hello MyIO {}\n'.format(count)
await swriter.awrite(tosend.encode('UTF8'))
await asyncio.sleep(2)
def test(good=True):
if good:
myior = MyIO(read=True)
myiow = MyIO(write=True)
else:
myior = MyIO(read=True, write=True)
myiow = myior
loop = asyncio.get_event_loop()
loop.create_task(receiver(myior))
loop.create_task(sender(myiow))
loop.run_forever()

Wyświetl plik

@ -1,105 +0,0 @@
# iotest5.py Test PR #3836.
# User class write() performs unbuffered writing.
# Read is also unbuffered.
# This test was to demonstrate the original issue.
# With modified moduselect.c and uasyncio.__init__.py the test now passes.
# iotest4.test() uses separate read and write objects.
# iotest4.test(False) uses a common object (failed without the mod).
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
def printbuf(this_io):
print(bytes(this_io.wbuf[:this_io.wprint_len]).decode(), end='')
class MyIO(io.IOBase):
def __init__(self, read=False, write=False):
self.ready_rd = False # Read and write not ready
self.rbuf = b'ready\n' # Read buffer
self.ridx = 0
pyb.Timer(4, freq = 5, callback = self.do_input)
self.wch = b''
self.wbuf = bytearray(100) # Write buffer
self.wprint_len = 0
self.widx = 0
pyb.Timer(5, freq = 10, callback = self.do_output)
# Read callback: emulate asynchronous input from hardware.
# Typically would put bytes into a ring buffer and set .ready_rd.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
# Write timer callback. Emulate hardware: if there's data in the buffer
# write some or all of it
def do_output(self, t):
if self.wch:
self.wbuf[self.widx] = self.wch
self.widx += 1
if self.wch == ord('\n'):
self.wprint_len = self.widx # Save for schedule
micropython.schedule(printbuf, self)
self.widx = 0
self.wch = b''
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
if arg & MP_STREAM_POLL_WR:
if not self.wch:
ret |= MP_STREAM_POLL_WR # Ready if no char pending
return ret
# Test of device that produces one character at a time
def readline(self):
self.ready_rd = False # Cleared by timer cb do_input
ch = self.rbuf[self.ridx]
if ch == ord('\n'):
self.ridx = 0
else:
self.ridx += 1
return chr(ch)
# Emulate unbuffered hardware which writes one character: uasyncio waits
# until hardware is ready for the next. Hardware ready is emulated by write
# timer callback.
def write(self, buf, off, sz):
self.wch = buf[off] # Hardware starts to write a char
return 1 # 1 byte written. uasyncio waits on ioctl write ready
async def receiver(myior):
sreader = asyncio.StreamReader(myior)
while True:
res = await sreader.readline()
print('Received', res)
async def sender(myiow):
swriter = asyncio.StreamWriter(myiow, {})
await asyncio.sleep(5)
count = 0
while True:
count += 1
tosend = 'Wrote Hello MyIO {}\n'.format(count)
await swriter.awrite(tosend.encode('UTF8'))
await asyncio.sleep(2)
myior = MyIO()
myiow = myior
loop = asyncio.get_event_loop()
loop.create_task(receiver(myior))
loop.create_task(sender(myiow))
loop.run_forever()

Wyświetl plik

@ -1,82 +0,0 @@
# iotest6.py Test fast_io PR
# https://github.com/micropython/micropython-lib/pull/287
# Test is designed to quantify the difference between fast and normal I/O without
# recourse to electronic testgear.
# The MyIO class supports .readline() which updates a call counter and clears the
# .ready_rd flag. This is set by a timer (emulating the arrival of data from
# some hardware device).
# The .dummy method emulates a relatively slow user coro which yields with a zero
# delay; the test runs 10 instances of this. Each instance updates a common call
# counter.
# The receiver coro awaits .readline continuously. With normal scheduling each is
# scheduled after the ten .dummy instances have run. With fast scheduling and a
# timer period <= 10ms readline and dummy alternate. If timer period is increased
# readline is sheduled progressively less frequently.
import io
import pyb
import utime
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MyIO(io.IOBase):
def __init__(self, read=False, write=False):
self.read_count = 0
self.dummy_count = 0
self.ready_rd = False
pyb.Timer(4, freq = 100, callback = self.do_input)
# Read callback: emulate asynchronous input from hardware.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
return ret
def readline(self):
self.read_count += 1
self.ready_rd = False
return b'a\n'
async def dummy(self):
while True:
await asyncio.sleep(0)
self.dummy_count += 1
utime.sleep_ms(10) # Emulate time consuming user code
async def killer(self):
print('Test runs for 5s')
await asyncio.sleep(5)
print('I/O count {} Dummy count {}'.format(self.read_count, self.dummy_count))
async def receiver(myior):
sreader = asyncio.StreamReader(myior)
while True:
await sreader.readline()
def test(fast_io=False):
loop = asyncio.get_event_loop(ioq_len = 6 if fast_io else 0)
myior = MyIO()
loop.create_task(receiver(myior))
for _ in range(10):
loop.create_task(myior.dummy())
loop.run_until_complete(myior.killer())
print('Test case of I/O competing with zero delay tasks.')
print('fast_io False: approx I/O count 25, dummy count 510.')
print('fast_io True: approx I/O count 510, dummy count 510.')
print('Run test() to check normal I/O, test(True) for fast I/O')

Wyświetl plik

@ -1,72 +0,0 @@
# iotest7.py Test fast_io PR
# https://github.com/micropython/micropython-lib/pull/287
# Test the case where runq is empty
# The MyIO class supports .readline() which updates a call counter and clears the
# .ready_rd flag. This is set by a timer (emulating the arrival of data from
# some hardware device).
import io
import pyb
import utime
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MyIO(io.IOBase):
def __init__(self, read=False, write=False):
self.read_count = 0
self.dummy_count = 0
self.ready_rd = False
pyb.Timer(4, freq = 100, callback = self.do_input)
# Read callback: emulate asynchronous input from hardware.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
return ret
def readline(self):
self.read_count += 1
self.ready_rd = False
return b'a\n'
async def dummy(self):
while True:
await asyncio.sleep_ms(50)
self.dummy_count += 1
async def killer(self):
print('Test runs for 5s')
await asyncio.sleep(5)
print('I/O count {} Dummy count {}'.format(self.read_count, self.dummy_count))
async def receiver(myior):
sreader = asyncio.StreamReader(myior)
while True:
await sreader.readline()
def test(fast_io=False):
loop = asyncio.get_event_loop(ioq_len = 6 if fast_io else 0)
myior = MyIO()
loop.create_task(receiver(myior))
loop.create_task(myior.dummy())
loop.run_until_complete(myior.killer())
print('Test case of empty runq: the fast_io option has no effect.')
print('I/O and dummy run at expected rates (around 500 and 99 counts respectively.')
print('Run test() to check normal I/O, test(True) for fast I/O')

Wyświetl plik

@ -1,34 +0,0 @@
# ms_timer.py A relatively high precision delay class for the fast_io version
# of uasyncio
import uasyncio as asyncio
import utime
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MillisecTimer(io.IOBase):
def __init__(self, fast=True):
self.end = 0
self.sreader = asyncio.StreamReader(self)
self.sreader.priority(fast)
def __iter__(self):
await self.sreader.readline()
def __call__(self, ms):
self.end = utime.ticks_add(utime.ticks_ms(), ms)
return self
def readline(self):
return b'\n'
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0:
ret |= MP_STREAM_POLL_RD
return ret

Wyświetl plik

@ -1,42 +0,0 @@
# ms_timer_test.py Test/demo program for MillisecTimer. Adapted for new uasyncio.
import uasyncio as asyncio
import utime
import ms_timer
async def timer_test(n, fast):
timer = ms_timer.MillisecTimer(fast)
while True:
t = utime.ticks_ms()
await timer(30)
print('Task {} time {}ms'.format(n, utime.ticks_diff(utime.ticks_ms(), t)))
await asyncio.sleep(0.5 + n/5)
async def foo():
while True:
await asyncio.sleep(0)
utime.sleep_ms(10) # Emulate slow processing
def main(fast=True):
for _ in range(10):
asyncio.create_task(foo())
for n in range(3):
asyncio.create_task(timer_test(n, fast))
await asyncio.sleep(10)
def test(fast=True):
asyncio.run(main(fast))
s = '''This test creates ten tasks each of which blocks for 10ms.
It also creates three tasks each of which runs a MillisecTimer for 30ms,
timing the period which elapses while it runs. With fast I/O scheduling
the elapsed time is ~30ms as expected. With normal scheduling it is
about 130ms because of competetion from the blocking coros.
Run test() to test fast I/O, test(False) to test normal I/O.
Test prints the task number followed by the actual elapsed time in ms.
Test runs for 10s.'''
print(s)

Wyświetl plik

@ -1,485 +0,0 @@
"""
MicroPython uasyncio module
MIT license; Copyright (c) 2019 Damien P. George
"""
from time import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
type_genf = type((lambda: (yield))) # Type of a generator function upy iss #3241
################################################################################
# Primitive class embodies methods common to most synchronisation primitives
class Primitive:
def __init__(self):
self.waiting = TQueue() # Linked list of Tasks waiting on completion
def run_next(self):
awt = self.waiting.next
if awt: # Schedule next task waiting on primitive
_queue.push_head(self.waiting.pop_head())
return awt
def run_all(self):
while self.waiting.next: # Schedule all tasks waiting on primitive
_queue.push_head(self.waiting.pop_head())
def save_current(self): # Postpone currently running task
self.waiting.push_head(cur_task)
# Set calling task's data to this event that it waits on, to double-link it
cur_task.data = self
################################################################################
# Task Queue class renamed to avoid conflict with Queue class
class TQueue:
def __init__(self):
self.next = None
self.last = None
def push_sorted(self, v, data):
v.data = data
if ticks_diff(data, ticks()) <= 0:
cur = self.last
if cur and ticks_diff(data, cur.data) >= 0:
# Optimisation: can start looking from self.last to insert this item
while cur.next and ticks_diff(data, cur.next.data) >= 0:
cur = cur.next
v.next = cur.next
cur.next = v
self.last = cur
return
cur = self
while cur.next and (not isinstance(cur.next.data, int) or ticks_diff(data, cur.next.data) >= 0):
cur = cur.next
v.next = cur.next
cur.next = v
if cur is not self:
self.last = cur
def push_head(self, v):
self.push_sorted(v, ticks())
def push_priority(self, v):
v.data = ticks()
v.next = self.next
self.next = v
def push_error(self, v, err):
# Push directly to head (but should probably still consider fairness)
v.data = err
v.next = self.next
self.next = v
def pop_head(self):
v = self.next
self.next = v.next
if self.last is v:
self.last = v.next
return v
def remove(self, v):
cur = self
while cur.next:
if cur.next is v:
cur.next = v.next
break
cur = cur.next
if self.last is v:
self.last = v.next
################################################################################
# Fundamental classes
class CancelledError(BaseException):
pass
class TimeoutError(Exception):
pass
# Task class representing a coroutine, can be waited on and cancelled
class Task:
def __init__(self, coro):
self.coro = coro # Coroutine of this Task
self.next = None # For linked list
self.data = None # General data for linked list
def __iter__(self):
if not hasattr(self, 'waiting'):
# Lazily allocated head of linked list of Tasks waiting on completion of this task
self.waiting = TQueue()
return self
def send(self, v):
if not self.coro:
# Task finished, raise return value to caller so it can continue
raise self.data
else:
# Put calling task on waiting queue
self.waiting.push_head(cur_task)
# Set calling task's data to this task that it waits on, to double-link it
cur_task.data = self
def cancel(self):
if self is cur_task:
raise RuntimeError('cannot cancel self')
# If Task waits on another task then forward the cancel to the one it's waiting on
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task
if hasattr(self.data, 'waiting'):
self.data.waiting.remove(self)
else:
_queue.remove(self)
_queue.push_error(self, CancelledError)
return True
# Create and schedule a new task from a coroutine
def create_task(coro):
assert not isinstance(coro, type_genf), 'Coroutine arg expected.' # upy issue #3241
t = Task(coro)
_queue.push_head(t)
return t
# "Yield" once, then raise StopIteration
class SingletonGenerator:
def __init__(self):
self.state = 0
self.exc = StopIteration()
def __iter__(self):
return self
def __next__(self):
if self.state:
self.state = 0
return None
else:
self.exc.__traceback__ = None
raise self.exc
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
_queue.push_sorted(cur_task, ticks_add(ticks(), t))
sgen.state = 1
return sgen
# Pause task execution for the given time (in seconds)
def sleep(t):
return sleep_ms(int(t * 1000))
################################################################################
# Helper functions
def _promote_to_task(aw):
return aw if isinstance(aw, Task) else create_task(aw)
def run(coro):
return run_until_complete(create_task(coro))
async def wait_for(aw, timeout):
aw = _promote_to_task(aw)
if timeout is None:
return await aw
def cancel(aw, timeout):
await sleep(timeout)
aw.cancel()
cancel_task = create_task(cancel(aw, timeout))
try:
ret = await aw
except CancelledError:
# Ignore CancelledError from aw, it's probably due to timeout
pass
finally:
_queue.remove(cancel_task)
if cancel_task.coro is None:
# Cancel task ran to completion, ie there was a timeout
raise TimeoutError
return ret
async def gather(*aws, return_exceptions=False):
ts = [_promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
#if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except Exception as er:
if return_exceptions:
ts[i] = er
else:
raise er
return ts
################################################################################
# General streams
# Queue and poller for stream IO
class IOQueue:
def __init__(self):
self.poller = select.poll()
self.map = {}
self.fast = set()
def _queue(self, s, idx):
if id(s) not in self.map:
entry = [None, None, s]
entry[idx] = cur_task
self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
else:
sm = self.map[id(s)]
assert sm[idx] is None
assert sm[1 - idx] is not None
sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT)
def _dequeue(self, s):
del self.map[id(s)]
self.poller.unregister(s)
def queue_read(self, s):
self._queue(s, 0)
def queue_write(self, s):
self._queue(s, 1)
def priority(self, sid, v):
self.fast.add(sid) if v else self.fast.discard(sid)
def remove(self, task):
while True:
del_s = None
for k in self.map: # Iterate without allocating on the heap
q0, q1, s = self.map[k]
if q0 is task or q1 is task:
del_s = s
break
if del_s is not None:
self._dequeue(s)
else:
break
def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sid = id(s)
sm = self.map[sid]
err = ev & ~(select.POLLIN | select.POLLOUT)
fast = sid in self.fast
#print('poll', s, sm, ev, err)
if ev & select.POLLIN or (err and sm[0] is not None):
if fast:
_queue.push_priority(sm[0])
else:
_queue.push_head(sm[0])
sm[0] = None
if ev & select.POLLOUT or (err and sm[1] is not None):
if fast:
_queue.push_priority(sm[1])
else:
_queue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)
class Stream:
def __init__(self, s, e={}):
self.s = s
self.e = e
self.out_buf = b''
def get_extra_info(self, v):
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
async def wait_closed(self):
# TODO yield?
self.s.close()
def priority(self, v=True):
_io_queue.priority(id(self.s), v)
async def read(self, n):
yield _io_queue.queue_read(self.s)
return self.s.read(n)
async def readline(self):
l = b''
while True:
yield _io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
def write(self, buf):
self.out_buf += buf
async def drain(self):
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
yield _io_queue.queue_write(self.s)
ret = self.s.write(mv[off:])
if ret is not None:
off += ret
self.out_buf = b''
################################################################################
# Socket streams
# Create a TCP stream connection to a remove host
async def open_connection(host, port):
try:
import usocket as socket
except ImportError:
import socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.args[0] != 115: # EINPROGRESS
raise er
yield _io_queue.queue_write(s)
return ss, ss
# Class representing a TCP stream server, can be closed and used in "async with"
class Server:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
def close(self):
self.task.cancel()
async def wait_closed(self):
await self.task
async def _serve(self, cb, host, port, backlog):
try:
import usocket as socket
except ImportError:
import socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
self.task = cur_task
# Accept incoming connections
while True:
try:
yield _io_queue.queue_read(s)
except CancelledError:
# Shutdown server
s.close()
return
s2, addr = s.accept()
s2.setblocking(False)
s2s = Stream(s2, {'peername': addr})
create_task(cb(s2s, s2s))
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
s = Server()
create_task(s._serve(cb, host, port, backlog))
return s
################################################################################
# Main run loop
# Queue of Task instances
_queue = TQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _queue is ready to run
dt = 1
while dt > 0:
dt = -1
if _queue.next:
# A task waiting on _queue
if isinstance(_queue.next.data, int):
# "data" is time to schedule task at
dt = max(0, ticks_diff(_queue.next.data, ticks()))
else:
# "data" is an exception to throw into the task
dt = 0
elif not _io_queue.map:
# No tasks can be woken so finished running
return
#print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _queue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
if isinstance(t.data, int):
t.coro.send(None)
else:
t.coro.throw(t.data)
except excs_all as er:
# This task is done, schedule any tasks waiting on it
if t is main_task:
if isinstance(er, StopIteration):
return er.value
raise er
t.data = er # save return value of coro to pass up to caller
waiting = False
if hasattr(t, 'waiting'):
while t.waiting.next:
_queue.push_head(t.waiting.pop_head())
waiting = True
t.waiting = None # Free waiting queue head
_io_queue.remove(t) # Remove task from the IO queue (if it's on it)
t.coro = None # Indicate task is done
# Print out exception for detached tasks
if not waiting and not isinstance(er, excs_stop):
print('task raised exception:', t.coro)
sys.print_exception(er)
StreamReader = Stream
StreamWriter = Stream # CPython 3.8 compatibility
################################################################################
# Legacy uasyncio compatibility
async def stream_awrite(self, buf, off=0, sz=-1):
if off != 0 or sz != -1:
buf = memoryview(buf)
if sz == -1:
sz = len(buf)
buf = buf[off:off + sz]
self.write(buf)
await self.drain()
Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?
class Loop:
def create_task(self, coro):
return create_task(coro)
def run_forever(self):
run_until_complete()
# TODO should keep running until .stop() is called, even if there're no tasks left
def run_until_complete(self, aw):
return run_until_complete(_promote_to_task(aw))
def close(self):
pass
def get_event_loop(runq_len=0, waitq_len=0):
return Loop()
version = (3, 0, 1)

Wyświetl plik

@ -1,75 +0,0 @@
# barrier.py MicroPython optimised version
# A Barrier synchronises N coros. In normal use each issues await barrier.
# Execution pauses until all other participant coros are waiting on it.
# At that point the callback is executed. Then the barrier is 'opened' and
# execution of all participants resumes.
# .trigger enables a coro to signal it has passed the barrier without waiting.
try:
import asyncio
raise RuntimeError('This version of barrier is MicroPython specific')
except ImportError:
import uasyncio
async def _g():
pass
type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_coro):
uasyncio.create_task(res)
class Barrier(uasyncio.Primitive):
def __init__(self, participants, func=None, args=()):
super().__init__()
self._participants = participants
self._func = func
self._args = args
self._reset(True)
def trigger(self):
self._update()
if self._at_limit(): # All other coros are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction and release others
self.run_all()
def __iter__(self): # MicroPython
self._update()
if self._at_limit(): # All other coros are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction and release others
self.run_all()
return
direction = self._down
# Other tasks have not reached barrier, put the calling task on the barrier's waiting queue
self.save_current()
yield
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')

Wyświetl plik

@ -1,60 +0,0 @@
import uasyncio
import uasyncio.lock
import uasyncio.event
class Condition:
def __init__(self, lock=None):
self.lock = uasyncio.Lock() if lock is None else lock
self.events = []
async def acquire(self):
await self.lock.acquire()
# enable this syntax:
# with await condition [as cond]:
#def __iter__(self):
#await self.lock.acquire()
#return self
async def __aenter__(self):
await self.lock.acquire()
return self
async def __aexit__(self, *_):
self.lock.release()
def locked(self):
return self.lock.locked()
def release(self):
self.lock.release() # Will raise RuntimeError if not locked
def notify(self, n=1): # Caller controls lock
if not self.lock.locked():
raise RuntimeError('Condition notify with lock not acquired.')
for _ in range(min(n, len(self.events))):
ev = self.events.pop()
ev.set()
def notify_all(self):
self.notify(len(self.events))
async def wait(self):
if not self.lock.locked():
raise RuntimeError('Condition wait with lock not acquired.')
ev = uasyncio.Event()
self.events.append(ev)
self.lock.release()
await ev.wait()
await self.lock.acquire()
assert ev not in self.events, 'condition wait assertion fail'
return True # CPython compatibility
async def wait_for(self, predicate):
result = predicate()
while not result:
await self.wait()
result = predicate()
return result
uasyncio.Condition = Condition

Wyświetl plik

@ -1,22 +0,0 @@
import uasyncio
# Event class for primitive events that can be waited on, set, and cleared
class Event(uasyncio.Primitive):
def __init__(self):
super().__init__()
self.state = False
def set(self): # Event becomes set, schedule any tasks waiting on it
self.run_all()
self.state = True
def clear(self):
self.state = False
def is_set(self):
return self.state # CPython compatibility
async def wait(self):
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.save_current()
yield
return True
uasyncio.Event = Event

Wyświetl plik

@ -1,55 +0,0 @@
import uasyncio
################################################################################
# Lock (optional component)
# Lock class for primitive mutex capability
import uasyncio
class Lock(uasyncio.Primitive):
def __init__(self):
super().__init__()
self._locked = False
self._awt = None # task that is going to acquire the lock. Needed to prevent race
# condition between pushing the next waiting task and the task actually acquiring
# the lock because during that time another newly started task could acquire the
# lock out-of-order instead of being pushed to the waiting list.
# Also needed to not release another waiting Task if multiple Tasks are cancelled.
async def acquire(self):
if self._locked or self._awt:
# Lock set or just released but has tasks waiting on it,
# put the calling task on the Lock's waiting queue and yield
self.save_current()
try:
yield
except uasyncio.CancelledError:
if self._awt is uasyncio.cur_task:
# Task that was going to acquire got cancelled after being scheduled.
# Schedule next waiting task
self._locked = True
self.release()
raise
self._locked = True
return True
async def __aenter__(self):
await self.acquire()
return self
def locked(self):
return self._locked
def release(self):
if not self._locked:
raise RuntimeError("Lock is not acquired.")
self._locked = False
# Lock becomes available. If task(s) are waiting on it save task which will
self._awt = self.run_next() # get lock and schedule that task
async def __aexit__(self, *args):
return self.release()
uasyncio.Lock = Lock

Wyświetl plik

@ -1,82 +0,0 @@
# queue.py: adapted from uasyncio V2
from ucollections import deque
import uasyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
# A queue, useful for coordinating producer and consumer coroutines.
# If maxsize is less than or equal to zero, the queue size is infinite. If it
# is an integer greater than 0, then "await put()" will block when the
# queue reaches maxsize, until an item is removed by get().
# Unlike the standard library Queue, you can reliably know this Queue's size
# with qsize(), since your single-threaded uasyncio application won't be
# interrupted between calling qsize() and doing an operation on the Queue.
class Queue(uasyncio.Primitive):
def __init__(self, maxsize=0):
super().__init__()
self.maxsize = maxsize
self._queue = deque((), maxsize)
def _get(self):
return self._queue.popleft()
async def get(self): # Usage: item = await queue.get()
if not self._queue:
# Queue is empty, put the calling Task on the waiting queue
self.save_current()
yield
self.run_next() # Task(s) waiting to put on queue, schedule first Task
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if not self._queue:
raise QueueEmpty()
return self._get()
def _put(self, val):
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
if self.qsize() >= self.maxsize and self.maxsize:
# Queue full, put the calling Task on the waiting queue
self.save_current()
yield
self.run_next() # Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.qsize() >= self.maxsize and self.maxsize:
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return not self._queue
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default),
# then full() is never True.
if self.maxsize <= 0:
return False
else:
return self.qsize() >= self.maxsize
# Name collision fixed
uasyncio.Queue = Queue

Wyświetl plik

@ -1,41 +0,0 @@
# semaphore.py
import uasyncio
class Semaphore((uasyncio.Primitive)):
def __init__(self, value=1):
super().__init__()
self._count = value
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self.release()
async def acquire(self):
if self._count == 0:
# Semaphore unavailable, put the calling Task on the waiting queue
self.save_current()
yield
self._count -= 1
def release(self):
self._count += 1
self.run_next() # Task(s) waiting on semaphore, schedule first Task
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
super().__init__(value)
self._initial_value = value
def release(self):
if self._count < self._initial_value:
super().release()
else:
raise ValueError('Semaphore released more than acquired')
uasyncio.Semaphore = Semaphore
uasyncio.BoundedSemaphore = BoundedSemaphore

Wyświetl plik

@ -1,19 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
async def _g():
pass
type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_coro):
loop = asyncio.get_event_loop()
loop.create_task(res)

Wyświetl plik

@ -1,68 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from . import launch
# A Barrier synchronises N coros. Each issues await barrier.
# Execution pauses until all other participant coros are waiting on it.
# At that point the callback is executed. Then the barrier is 'opened' and
# execution of all participants resumes.
# The nowait arg is to support task cancellation. It enables usage where one or
# more coros can register that they have reached the barrier without waiting
# for it. Any coros waiting normally on the barrier will pause until all
# non-waiting coros have passed the barrier and all waiting ones have reached
# it. The use of nowait promotes efficiency by enabling tasks which have been
# cancelled to leave the task queue as soon as possible.
class Barrier():
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._func = func
self._args = args
self._reset(True)
def __await__(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
return
direction = self._down
while True: # Wait until last waiting thread changes the direction
if direction != self._down:
return
await asyncio.sleep_ms(0)
__iter__ = __await__
def trigger(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')

Wyświetl plik

@ -1,63 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# Condition class
# from primitives.condition import Condition
class Condition():
def __init__(self, lock=None):
self.lock = asyncio.Lock() if lock is None else lock
self.events = []
async def acquire(self):
await self.lock.acquire()
# enable this syntax:
# with await condition [as cond]:
def __await__(self):
await self.lock.acquire()
return self
__iter__ = __await__
def __enter__(self):
return self
def __exit__(self, *_):
self.lock.release()
def locked(self):
return self.lock.locked()
def release(self):
self.lock.release() # Will raise RuntimeError if not locked
def notify(self, n=1): # Caller controls lock
if not self.lock.locked():
raise RuntimeError('Condition notify with lock not acquired.')
for _ in range(min(n, len(self.events))):
ev = self.events.pop()
ev.set()
def notify_all(self):
self.notify(len(self.events))
async def wait(self):
if not self.lock.locked():
raise RuntimeError('Condition wait with lock not acquired.')
ev = asyncio.Event()
self.events.append(ev)
self.lock.release()
await ev.wait()
await self.lock.acquire()
assert ev not in self.events, 'condition wait assertion fail'
return True # CPython compatibility
async def wait_for(self, predicate):
result = predicate()
while not result:
await self.wait()
result = predicate()
return result

Wyświetl plik

@ -1,60 +0,0 @@
import uasyncio as asyncio
import utime as time
from . import launch
# Usage:
# from primitives.delay_ms import Delay_ms
class Delay_ms:
verbose = False
def __init__(self, func=None, args=(), can_alloc=True, duration=1000):
self.func = func
self.args = args
self.can_alloc = can_alloc
self.duration = duration # Default duration
self._tstop = None # Killer not running
self._running = False # Timer not running
if not can_alloc:
asyncio.create_task(self._run())
async def _run(self):
while True:
if not self._running: # timer not running
await asyncio.sleep_ms(0)
else:
await self._killer()
def stop(self):
self._running = False
# If uasyncio is ever fixed we should cancel .killer
def trigger(self, duration=0): # Update end time
self._running = True
if duration <= 0:
duration = self.duration
tn = time.ticks_add(time.ticks_ms(), duration) # new end time
self.verbose and self._tstop is not None and self._tstop > tn \
and print("Warning: can't reduce Delay_ms time.")
# Start killer if can allocate and killer is not running
sk = self.can_alloc and self._tstop is None
# The following indicates ._killer is running: it will be
# started either here or in ._run
self._tstop = tn
if sk: # ._killer stops the delay when its period has elapsed
asyncio.create_task(self._killer())
def running(self):
return self._running
__call__ = running
async def _killer(self):
twait = time.ticks_diff(self._tstop, time.ticks_ms())
while twait > 0: # Must loop here: might be retriggered
await asyncio.sleep_ms(twait)
if self._tstop is None:
break # Return if stop() called during wait
twait = time.ticks_diff(self._tstop, time.ticks_ms())
if self._running and self.func is not None:
launch(self.func, self.args) # Timed out: execute callback
self._tstop = None # killer not running
self._running = False # timer is stopped

Wyświetl plik

@ -1,64 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# Usage:
# from primitives.message import Message
# A coro waiting on a message issues await message
# A coro rasing the message issues message.set(payload)
# When all waiting coros have run
# message.clear() should be issued
# This more efficient version is commented out because Event.set is not ISR
# friendly. TODO If it gets fixed, reinstate this (tested) version.
#class Message(asyncio.Event):
#def __init__(self, _=0):
#self._data = None
#super().__init__()
#def clear(self):
#self._data = None
#super().clear()
#def __await__(self):
#await super().wait()
#__iter__ = __await__
#def set(self, data=None):
#self._data = data
#super().set()
#def value(self):
#return self._data
# Has an ISR-friendly .set()
class Message():
def __init__(self, delay_ms=0):
self.delay_ms = delay_ms
self.clear()
def clear(self):
self._flag = False
self._data = None
async def wait(self): # CPython comptaibility
while not self._flag:
await asyncio.sleep_ms(self.delay_ms)
def __await__(self):
while not self._flag:
await asyncio.sleep_ms(self.delay_ms)
__iter__ = __await__
def is_set(self):
return self._flag
def set(self, data=None):
self._flag = True
self._data = data
def value(self):
return self._data

Wyświetl plik

@ -1,97 +0,0 @@
import uasyncio as asyncio
import utime as time
from . import launch
from primitives.delay_ms import Delay_ms
# An alternative Pushbutton solution with lower RAM use is available here
# https://github.com/kevinkk525/pysmartnode/blob/dev/pysmartnode/utils/abutton.py
class Pushbutton:
debounce_ms = 50
long_press_ms = 1000
double_click_ms = 400
def __init__(self, pin, suppress=False):
self.pin = pin # Initialise for input
self._supp = suppress
self._dblpend = False # Doubleclick waiting for 2nd click
self._dblran = False # Doubleclick executed user function
self._tf = False
self._ff = False
self._df = False
self._lf = False
self._ld = False # Delay_ms instance for long press
self._dd = False # Ditto for doubleclick
self.sense = pin.value() # Convert from electrical to logical value
self.state = self.rawstate() # Initial state
asyncio.create_task(self.buttoncheck()) # Thread runs forever
def press_func(self, func, args=()):
self._tf = func
self._ta = args
def release_func(self, func, args=()):
self._ff = func
self._fa = args
def double_func(self, func, args=()):
self._df = func
self._da = args
def long_func(self, func, args=()):
self._lf = func
self._la = args
# Current non-debounced logical button state: True == pressed
def rawstate(self):
return bool(self.pin.value() ^ self.sense)
# Current debounced state of button (True == pressed)
def __call__(self):
return self.state
def _ddto(self): # Doubleclick timeout: no doubleclick occurred
self._dblpend = False
if self._supp and not self.state:
if not self._ld or (self._ld and not self._ld()):
launch(self._ff, self._fa)
async def buttoncheck(self):
if self._lf: # Instantiate timers if funcs exist
self._ld = Delay_ms(self._lf, self._la)
if self._df:
self._dd = Delay_ms(self._ddto)
while True:
state = self.rawstate()
# State has changed: act on it now.
if state != self.state:
self.state = state
if state: # Button pressed: launch pressed func
if self._tf:
launch(self._tf, self._ta)
if self._lf: # There's a long func: start long press delay
self._ld.trigger(Pushbutton.long_press_ms)
if self._df:
if self._dd(): # Second click: timer running
self._dd.stop()
self._dblpend = False
self._dblran = True # Prevent suppressed launch on release
launch(self._df, self._da)
else:
# First click: start doubleclick timer
self._dd.trigger(Pushbutton.double_click_ms)
self._dblpend = True # Prevent suppressed launch on release
else: # Button release. Is there a release func?
if self._ff:
if self._supp:
d = self._ld
# If long delay exists, is running and doubleclick status is OK
if not self._dblpend and not self._dblran:
if (d and d()) or not d:
launch(self._ff, self._fa)
else:
launch(self._ff, self._fa)
if self._ld:
self._ld.stop() # Avoid interpreting a second click as a long push
self._dblran = False
# Ignore state changes until switch has settled
await asyncio.sleep_ms(Pushbutton.debounce_ms)

Wyświetl plik

@ -1,66 +0,0 @@
# queue.py: adapted from uasyncio V2
# Code is based on Paul Sokolovsky's work.
# This is a temporary solution until uasyncio V3 gets an efficient official version
import uasyncio as asyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = []
def _get(self):
return self._queue.pop(0)
async def get(self): # Usage: item = await queue.get()
while self.empty():
# Queue is empty, put the calling Task on the waiting queue
await asyncio.sleep_ms(0)
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise QueueEmpty()
return self._get()
def _put(self, val):
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.qsize() >= self.maxsize and self.maxsize:
# Queue full
await asyncio.sleep_ms(0)
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.qsize() >= self.maxsize and self.maxsize:
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return len(self._queue) == 0
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default),
# then full() is never True.
if self.maxsize <= 0:
return False
else:
return self.qsize() >= self.maxsize

Wyświetl plik

@ -1,37 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# A Semaphore is typically used to limit the number of coros running a
# particular piece of code at once. The number is defined in the constructor.
class Semaphore():
def __init__(self, value=1):
self._count = value
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
self.release()
await asyncio.sleep(0)
async def acquire(self):
while self._count == 0:
await asyncio.sleep_ms(0)
self._count -= 1
def release(self):
self._count += 1
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
super().__init__(value)
self._initial_value = value
def release(self):
if self._count < self._initial_value:
self._count += 1
else:
raise ValueError('Semaphore released more than acquired')

Wyświetl plik

@ -1,37 +0,0 @@
import uasyncio as asyncio
import utime as time
from . import launch
class Switch:
debounce_ms = 50
def __init__(self, pin):
self.pin = pin # Should be initialised for input with pullup
self._open_func = False
self._close_func = False
self.switchstate = self.pin.value() # Get initial state
asyncio.create_task(self.switchcheck()) # Thread runs forever
def open_func(self, func, args=()):
self._open_func = func
self._open_args = args
def close_func(self, func, args=()):
self._close_func = func
self._close_args = args
# Return current state of switch (0 = pressed)
def __call__(self):
return self.switchstate
async def switchcheck(self):
while True:
state = self.pin.value()
if state != self.switchstate:
# State has changed: act on it now.
self.switchstate = state
if state == 0 and self._close_func:
launch(self._close_func, self._close_args)
elif state == 1 and self._open_func:
launch(self._open_func, self._open_args)
# Ignore further state changes until switch has settled
await asyncio.sleep_ms(Switch.debounce_ms)

Wyświetl plik

@ -1,404 +0,0 @@
# asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes
# Test/demo of official asyncio library and official Lock class
# The MIT License (MIT)
#
# Copyright (c) 2017-2018 Peter Hinch
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# CPython 3.5 compatibility
# (ignore RuntimeWarning: coroutine '_g' was never awaited)
# To run:
# from primitives.tests.asyntest import test
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from primitives.message import Message
from primitives.barrier import Barrier
from primitives.semaphore import Semaphore, BoundedSemaphore
from primitives.condition import Condition
def print_tests():
st = '''Available functions:
test(0) Print this list.
test(1) Test message acknowledge.
test(2) Test Messge and Lock objects.
test(3) Test the Barrier class.
test(4) Test Semaphore
test(5) Test BoundedSemaphore.
test(6) Test the Condition class.
test(7) Test the Queue class.
Recommended to issue ctrl-D after running each test.
'''
print('\x1b[32m')
print(st)
print('\x1b[39m')
print_tests()
def printexp(exp, runtime=0):
print('Expected output:')
print('\x1b[32m')
print(exp)
print('\x1b[39m')
if runtime:
print('Running (runtime = {}s):'.format(runtime))
else:
print('Running (runtime < 1s):')
# ************ Test Message class ************
# Demo use of acknowledge message
async def message_wait(message, ack_message, n):
await message
print('message_wait {} got message with value {}'.format(n, message.value()))
ack_message.set()
async def run_ack():
message = Message()
ack1 = Message()
ack2 = Message()
count = 0
while True:
asyncio.create_task(message_wait(message, ack1, 1))
asyncio.create_task(message_wait(message, ack2, 2))
message.set(count)
count += 1
print('message was set')
await ack1
ack1.clear()
print('Cleared ack1')
await ack2
ack2.clear()
print('Cleared ack2')
message.clear()
print('Cleared message')
await asyncio.sleep(1)
async def ack_coro(delay):
print('Started ack coro with delay', delay)
await asyncio.sleep(delay)
print("I've seen attack ships burn on the shoulder of Orion...")
print("Time to die...")
def ack_test():
printexp('''message was set
message_wait 1 got message with value 0
message_wait 2 got message with value 0
Cleared ack1
Cleared ack2
Cleared message
message was set
message_wait 1 got message with value 1
message_wait 2 got message with value 1
... text omitted ...
message_wait 1 got message with value 5
message_wait 2 got message with value 5
Cleared ack1
Cleared ack2
Cleared message
I've seen attack ships burn on the shoulder of Orion...
Time to die...
''', 10)
asyncio.create_task(run_ack())
asyncio.run(ack_coro(6))
# ************ Test Lock and Message classes ************
async def run_lock(n, lock):
print('run_lock {} waiting for lock'.format(n))
await lock.acquire()
print('run_lock {} acquired lock'.format(n))
await asyncio.sleep(1) # Delay to demo other coros waiting for lock
lock.release()
print('run_lock {} released lock'.format(n))
async def messageset(message):
print('Waiting 5 secs before setting message')
await asyncio.sleep(5)
message.set()
print('message was set')
async def messagewait(message):
print('waiting for message')
await message
print('got message')
message.clear()
async def run_message_test():
print('Test Lock class')
lock = asyncio.Lock()
asyncio.create_task(run_lock(1, lock))
asyncio.create_task(run_lock(2, lock))
asyncio.create_task(run_lock(3, lock))
print('Test Message class')
message = Message()
asyncio.create_task(messageset(message))
await messagewait(message) # run_message_test runs fast until this point
print('Message status {}'.format('Incorrect' if message.is_set() else 'OK'))
print('Tasks complete')
def msg_test():
printexp('''Test Lock class
Test Message class
waiting for message
run_lock 1 waiting for lock
run_lock 1 acquired lock
run_lock 2 waiting for lock
run_lock 3 waiting for lock
Waiting 5 secs before setting message
run_lock 1 released lock
run_lock 2 acquired lock
run_lock 2 released lock
run_lock 3 acquired lock
run_lock 3 released lock
message was set
got message
Message status OK
Tasks complete
''', 5)
asyncio.run(run_message_test())
# ************ Barrier test ************
async def killer(duration):
await asyncio.sleep(duration)
def callback(text):
print(text)
async def report(barrier):
for i in range(5):
print('{} '.format(i), end='')
await barrier
def barrier_test():
printexp('''0 0 0 Synch
1 1 1 Synch
2 2 2 Synch
3 3 3 Synch
4 4 4 Synch
''')
barrier = Barrier(3, callback, ('Synch',))
for _ in range(3):
asyncio.create_task(report(barrier))
asyncio.run(killer(2))
# ************ Semaphore test ************
async def run_sema(n, sema, barrier):
print('run_sema {} trying to access semaphore'.format(n))
async with sema:
print('run_sema {} acquired semaphore'.format(n))
# Delay demonstrates other coros waiting for semaphore
await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout
print('run_sema {} has released semaphore'.format(n))
barrier.trigger()
async def run_sema_test(bounded):
num_coros = 5
barrier = Barrier(num_coros + 1)
if bounded:
semaphore = BoundedSemaphore(3)
else:
semaphore = Semaphore(3)
for n in range(num_coros):
asyncio.create_task(run_sema(n, semaphore, barrier))
await barrier # Quit when all coros complete
try:
semaphore.release()
except ValueError:
print('Bounded semaphore exception test OK')
def semaphore_test(bounded=False):
if bounded:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 4 acquired semaphore
run_sema 1 has released semaphore
run_sema 3 acquired semaphore
run_sema 2 has released semaphore
run_sema 4 has released semaphore
run_sema 3 has released semaphore
Bounded semaphore exception test OK
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
else:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 3 acquired semaphore
run_sema 1 has released semaphore
run_sema 4 acquired semaphore
run_sema 2 has released semaphore
run_sema 3 has released semaphore
run_sema 4 has released semaphore
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
printexp(exp, 3)
asyncio.run(run_sema_test(bounded))
# ************ Condition test ************
cond = Condition()
tim = 0
async def cond01():
while True:
await asyncio.sleep(2)
with await cond:
cond.notify(2) # Notify 2 tasks
async def cond03(): # Maintain a count of seconds
global tim
await asyncio.sleep(0.5)
while True:
await asyncio.sleep(1)
tim += 1
async def cond02(n, barrier):
with await cond:
print('cond02', n, 'Awaiting notification.')
await cond.wait()
print('cond02', n, 'triggered. tim =', tim)
barrier.trigger()
def predicate():
return tim >= 8 # 12
async def cond04(n, barrier):
with await cond:
print('cond04', n, 'Awaiting notification and predicate.')
await cond.wait_for(predicate)
print('cond04', n, 'triggered. tim =', tim)
barrier.trigger()
async def cond_go():
ntasks = 7
barrier = Barrier(ntasks + 1)
t1 = asyncio.create_task(cond01())
t3 = asyncio.create_task(cond03())
for n in range(ntasks):
asyncio.create_task(cond02(n, barrier))
await barrier # All instances of cond02 have completed
# Test wait_for
barrier = Barrier(2)
asyncio.create_task(cond04(99, barrier))
await barrier
# cancel continuously running coros.
t1.cancel()
t3.cancel()
await asyncio.sleep_ms(0)
print('Done.')
def condition_test():
printexp('''cond02 0 Awaiting notification.
cond02 1 Awaiting notification.
cond02 2 Awaiting notification.
cond02 3 Awaiting notification.
cond02 4 Awaiting notification.
cond02 5 Awaiting notification.
cond02 6 Awaiting notification.
cond02 5 triggered. tim = 1
cond02 6 triggered. tim = 1
cond02 3 triggered. tim = 3
cond02 4 triggered. tim = 3
cond02 1 triggered. tim = 5
cond02 2 triggered. tim = 5
cond02 0 triggered. tim = 7
cond04 99 Awaiting notification and predicate.
cond04 99 triggered. tim = 9
Done.
''', 13)
asyncio.run(cond_go())
# ************ Queue test ************
from primitives.queue import Queue
q = Queue()
async def slow_process():
await asyncio.sleep(2)
return 42
async def bar():
print('Waiting for slow process.')
result = await slow_process()
print('Putting result onto queue')
await q.put(result) # Put result on q
async def foo():
print("Running foo()")
result = await q.get()
print('Result was {}'.format(result))
async def queue_go(delay):
asyncio.create_task(foo())
asyncio.create_task(bar())
await asyncio.sleep(delay)
print("I've seen starships burn off the shoulder of Orion...")
print("Time to die...")
def queue_test():
printexp('''Running (runtime = 3s):
Running foo()
Waiting for slow process.
Putting result onto queue
I've seen starships burn off the shoulder of Orion...
Time to die...
''', 3)
asyncio.run(queue_go(3))
def test(n):
if n == 0:
print_tests() # Print this list.
elif n == 1:
ack_test() # Test message acknowledge.
elif n == 2:
msg_test() # Test Messge and Lock objects.
elif n == 3:
barrier_test() # Test the Barrier class.
elif n == 4:
semaphore_test(False) # Test Semaphore
elif n == 5:
semaphore_test(True) # Test BoundedSemaphore.
elif n == 6:
condition_test() # Test the Condition class.
elif n == 7:
queue_test() # Test the Queue class.

Wyświetl plik

@ -1,137 +0,0 @@
# Test/demo programs for Switch and Pushbutton classes
# Tested on Pyboard but should run on other microcontroller platforms
# running MicroPython with uasyncio library.
# Author: Peter Hinch.
# Copyright Peter Hinch 2017-2020 Released under the MIT license.
# To run:
# from primitives.tests.switches import *
# test_sw() # For example
from machine import Pin
from pyb import LED
from primitives.switch import Switch
from primitives.pushbutton import Pushbutton
import uasyncio as asyncio
helptext = '''
Test using switch or pushbutton between X1 and gnd.
Ground pin X2 to terminate test.
Soft reset (ctrl-D) after each test.
'''
tests = '''
Available tests:
test_sw Switch test
test_swcb Switch with callback
test_btn Pushutton launching coros
test_btncb Pushbutton launching callbacks
'''
print(tests)
# Pulse an LED (coroutine)
async def pulse(led, ms):
led.on()
await asyncio.sleep_ms(ms)
led.off()
# Toggle an LED (callback)
def toggle(led):
led.toggle()
# Quit test by connecting X2 to ground
async def killer():
pin = Pin('X2', Pin.IN, Pin.PULL_UP)
while pin.value():
await asyncio.sleep_ms(50)
# Test for the Switch class passing coros
def test_sw():
s = '''
close pulses green
open pulses red
'''
print('Test of switch scheduling coroutines.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
sw = Switch(pin)
# Register coros to launch on contact close and open
sw.close_func(pulse, (green, 1000))
sw.open_func(pulse, (red, 1000))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
# Test for the switch class with a callback
def test_swcb():
s = '''
close toggles red
open toggles green
'''
print('Test of switch executing callbacks.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
sw = Switch(pin)
# Register a coro to launch on contact close
sw.close_func(toggle, (red,))
sw.open_func(toggle, (green,))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
# Test for the Pushbutton class (coroutines)
# Pass True to test suppress
def test_btn(suppress=False, lf=True, df=True):
s = '''
press pulses red
release pulses green
double click pulses yellow
long press pulses blue
'''
print('Test of pushbutton scheduling coroutines.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
yellow = LED(3)
blue = LED(4)
pb = Pushbutton(pin, suppress)
pb.press_func(pulse, (red, 1000))
pb.release_func(pulse, (green, 1000))
if df:
print('Doubleclick enabled')
pb.double_func(pulse, (yellow, 1000))
if lf:
print('Long press enabled')
pb.long_func(pulse, (blue, 1000))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
# Test for the Pushbutton class (callbacks)
def test_btncb():
s = '''
press toggles red
release toggles green
double click toggles yellow
long press toggles blue
'''
print('Test of pushbutton executing callbacks.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
yellow = LED(3)
blue = LED(4)
pb = Pushbutton(pin)
pb.press_func(toggle, (red,))
pb.release_func(toggle, (green,))
pb.double_func(toggle, (yellow,))
pb.long_func(toggle, (blue,))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())