diff --git a/extmod/modopenamp.c b/extmod/modopenamp.c index 7a19e55a66..2223397b06 100644 --- a/extmod/modopenamp.c +++ b/extmod/modopenamp.c @@ -33,6 +33,8 @@ #include "py/nlr.h" #include "py/runtime.h" #include "py/mpprint.h" +#include "py/stream.h" +#include "py/mperrno.h" #include "metal/sys.h" #include "metal/alloc.h" @@ -44,6 +46,7 @@ #include "openamp/open_amp.h" #include "openamp/remoteproc.h" #include "openamp/remoteproc_loader.h" +#include "lib/rpmsg/rpmsg_internal.h" #include "modopenamp.h" #if !MICROPY_ENABLE_FINALISER @@ -126,19 +129,52 @@ static MP_DEFINE_CONST_OBJ_TYPE( ); // ###################### RPMsg Endpoint class ###################### +// The number of RPMsg buffers to hold for an Endpoint. Note if this number matches +// the number of buffers per VRING, a full ring buffer/queue will cause the other +// side to block, because all buffers will be held by the application. If it's less +// than the number of buffers in a VRING, messages will be dropped if this side is not +// receiving messages. +#define ENDPOINT_RPMSG_RING_SIZE VRING_NUM_BUFFS typedef struct _endpoint_obj_t { mp_obj_base_t base; mp_obj_t name; mp_obj_t callback; struct rpmsg_endpoint ep; + volatile uint32_t head; + volatile uint32_t tail; + void *rpmsg_buf[ENDPOINT_RPMSG_RING_SIZE]; } endpoint_obj_t; static const mp_obj_type_t endpoint_type; +static int endpoint_rpmsg_enqueue(endpoint_obj_t *ept, void *buf) { + if (((ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE) != ept->head) { + ept->rpmsg_buf[ept->tail] = buf; + ept->tail = (ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE; + rpmsg_hold_rx_buffer(&ept->ep, buf); + return 0; + } + return -1; +} + +static size_t endpoint_rpmsg_dequeue(endpoint_obj_t *ept, void *buf, size_t len) { + size_t size = 0; + if (ept->head != ept->tail) { + void *rpmsg = ept->rpmsg_buf[ept->head]; + size = MIN(RPMSG_LOCATE_HDR(rpmsg)->len, len); + memcpy(buf, rpmsg, size); + ept->head = (ept->head + 1) % ENDPOINT_RPMSG_RING_SIZE; + rpmsg_release_rx_buffer(&ept->ep, rpmsg); + } + return size; +} + static int endpoint_recv_callback(struct rpmsg_endpoint *ept, void *data, size_t len, uint32_t src, void *priv) { metal_log(METAL_LOG_DEBUG, "endpoint_recv_callback() message received src: %lu msg len: %d\n", src, len); endpoint_obj_t *self = metal_container_of(ept, endpoint_obj_t, ep); - if (self->callback != mp_const_none) { + if (self->callback == mp_const_none) { + endpoint_rpmsg_enqueue(self, data); + } else { mp_call_function_2(self->callback, mp_obj_new_int(src), mp_obj_new_bytearray_by_ref(len, data)); } return 0; @@ -193,6 +229,40 @@ static mp_obj_t endpoint_send(uint n_args, const mp_obj_t *pos_args, mp_map_t *k } static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_send_obj, 2, endpoint_send); +static mp_obj_t endpoint_recv(uint n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) { + enum { ARG_timeout }; + static const mp_arg_t allowed_args[] = { + { MP_QSTR_timeout, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = 0 } }, + }; + + // Parse args. + mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)]; + mp_arg_parse_all(n_args - 2, pos_args + 2, kw_args, MP_ARRAY_SIZE(allowed_args), allowed_args, args); + + endpoint_obj_t *self = MP_OBJ_TO_PTR(pos_args[0]); + vstr_t vstr; + size_t len = mp_obj_get_int(pos_args[1]); + vstr_init_len(&vstr, len); + + mp_int_t timeout = args[ARG_timeout].u_int; + for (mp_uint_t start = mp_hal_ticks_ms(); ;) { + vstr.len = endpoint_rpmsg_dequeue(self, vstr.buf, len); + if (vstr.len > 0) { + break; + } + if (timeout == 0) { + break; + } + if (timeout > 0 && (mp_hal_ticks_ms() - start > timeout)) { + mp_raise_msg(&mp_type_OSError, MP_ERROR_TEXT("timeout waiting for message")); + } + MICROPY_EVENT_POLL_HOOK + } + + return mp_obj_new_bytes_from_vstr(&vstr); +} +static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_recv_obj, 2, endpoint_recv); + static mp_obj_t endpoint_is_ready(mp_obj_t self_in) { endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); return is_rpmsg_ept_ready(&self->ep) ? mp_const_true : mp_const_false; @@ -209,10 +279,10 @@ static MP_DEFINE_CONST_FUN_OBJ_1(endpoint_deinit_obj, endpoint_deinit); static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *all_args) { enum { ARG_name, ARG_callback, ARG_src, ARG_dest }; static const mp_arg_t allowed_args[] = { - { MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } }, - { MP_QSTR_callback, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } }, - { MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, - { MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, + { MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } }, + { MP_QSTR_callback, MP_ARG_OBJ, {.u_rom_obj = MP_ROM_NONE } }, + { MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, + { MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } }, }; // Parse args. @@ -222,6 +292,7 @@ static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size endpoint_obj_t *self = mp_obj_malloc_with_finaliser(endpoint_obj_t, &endpoint_type); self->name = args[ARG_name].u_obj; self->callback = args[ARG_callback].u_obj; + self->head = self->tail = 0; if (MP_STATE_PORT(virtio_device) == NULL) { openamp_init(); @@ -239,15 +310,66 @@ static const mp_rom_map_elem_t endpoint_locals_dict_table[] = { { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_Endpoint) }, { MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&endpoint_deinit_obj) }, { MP_ROM_QSTR(MP_QSTR_send), MP_ROM_PTR(&endpoint_send_obj) }, + { MP_ROM_QSTR(MP_QSTR_recv), MP_ROM_PTR(&endpoint_recv_obj) }, { MP_ROM_QSTR(MP_QSTR_is_ready), MP_ROM_PTR(&endpoint_is_ready_obj) }, }; static MP_DEFINE_CONST_DICT(endpoint_locals_dict, endpoint_locals_dict_table); +mp_uint_t endpoint_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) { + endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); + if (!is_rpmsg_ept_ready(&self->ep)) { + return MP_STREAM_ERROR; + } + return endpoint_rpmsg_dequeue(self, buf, size); +} + +mp_uint_t endpoint_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) { + endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); + int ret = 0; + if (!is_rpmsg_ept_ready(&self->ep)) { + ret = MP_STREAM_ERROR; + } else { + ret = rpmsg_send(&self->ep, buf, size); + if (ret < 0) { + ret = MP_STREAM_ERROR; + } + } + return ret; +} + +mp_uint_t endpoint_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t arg, int *errcode) { + endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in); + mp_uint_t ret = 0; + if (request == MP_STREAM_CLOSE) { + rpmsg_destroy_ept(&self->ep); + } else if (request == MP_STREAM_POLL) { + if ((arg & MP_STREAM_POLL_RD) && self->head != self->tail) { + ret |= MP_STREAM_POLL_RD; + } + + if ((arg & MP_STREAM_POLL_WR) && is_rpmsg_ept_ready(&self->ep)) { + ret |= MP_STREAM_POLL_WR; + } + } else { + *errcode = MP_EINVAL; + ret = MP_STREAM_ERROR; + } + return ret; +} + +static const mp_stream_p_t endpoint_stream_p = { + .read = endpoint_read, + .write = endpoint_write, + .ioctl = endpoint_ioctl, + .is_text = false, +}; + static MP_DEFINE_CONST_OBJ_TYPE( endpoint_type, MP_QSTR_Endpoint, MP_TYPE_FLAG_NONE, make_new, endpoint_make_new, + protocol, &endpoint_stream_p, locals_dict, &endpoint_locals_dict );