extmod/modopenamp: Implement streaming protocol for endpoints.

This patch implements a streaming protocol for Endpoints, to allow them to
be used with select/poll or asyncio. It also implements `Endpoint.recv()`
function, whose behavior matches that of UDP sockets' recv(). Calling
`Endpoint.recv()` returns a single message, truncated to fit the requested
number of bytes. If two messages are pending on the endpoint, the code must
do two separate reads to get them both. Received RPMsg messages are saved
by holding the buffers in the endpoint receive callback, and releasing them
when they are no longer needed (i.e., when `Endpoint.recv` is called). This
avoids the copying buffers twice, saves memory, but also allows the host to
block the remote if it's not receiving messages (assuming the queue size is
the same as the number of buffers in the VRING). Note that the queuing of
RPMsg messages is only enabled if `callback=None` (i.e., if the application
is not processing messages asynchronously).

Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
pull/14181/head
iabdalkader 2024-03-18 09:50:58 +01:00
rodzic 87d821ab49
commit 065ef5ef0e
1 zmienionych plików z 127 dodań i 5 usunięć

Wyświetl plik

@ -33,6 +33,8 @@
#include "py/nlr.h"
#include "py/runtime.h"
#include "py/mpprint.h"
#include "py/stream.h"
#include "py/mperrno.h"
#include "metal/sys.h"
#include "metal/alloc.h"
@ -44,6 +46,7 @@
#include "openamp/open_amp.h"
#include "openamp/remoteproc.h"
#include "openamp/remoteproc_loader.h"
#include "lib/rpmsg/rpmsg_internal.h"
#include "modopenamp.h"
#if !MICROPY_ENABLE_FINALISER
@ -126,19 +129,52 @@ static MP_DEFINE_CONST_OBJ_TYPE(
);
// ###################### RPMsg Endpoint class ######################
// The number of RPMsg buffers to hold for an Endpoint. Note if this number matches
// the number of buffers per VRING, a full ring buffer/queue will cause the other
// side to block, because all buffers will be held by the application. If it's less
// than the number of buffers in a VRING, messages will be dropped if this side is not
// receiving messages.
#define ENDPOINT_RPMSG_RING_SIZE VRING_NUM_BUFFS
typedef struct _endpoint_obj_t {
mp_obj_base_t base;
mp_obj_t name;
mp_obj_t callback;
struct rpmsg_endpoint ep;
volatile uint32_t head;
volatile uint32_t tail;
void *rpmsg_buf[ENDPOINT_RPMSG_RING_SIZE];
} endpoint_obj_t;
static const mp_obj_type_t endpoint_type;
static int endpoint_rpmsg_enqueue(endpoint_obj_t *ept, void *buf) {
if (((ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE) != ept->head) {
ept->rpmsg_buf[ept->tail] = buf;
ept->tail = (ept->tail + 1) % ENDPOINT_RPMSG_RING_SIZE;
rpmsg_hold_rx_buffer(&ept->ep, buf);
return 0;
}
return -1;
}
static size_t endpoint_rpmsg_dequeue(endpoint_obj_t *ept, void *buf, size_t len) {
size_t size = 0;
if (ept->head != ept->tail) {
void *rpmsg = ept->rpmsg_buf[ept->head];
size = MIN(RPMSG_LOCATE_HDR(rpmsg)->len, len);
memcpy(buf, rpmsg, size);
ept->head = (ept->head + 1) % ENDPOINT_RPMSG_RING_SIZE;
rpmsg_release_rx_buffer(&ept->ep, rpmsg);
}
return size;
}
static int endpoint_recv_callback(struct rpmsg_endpoint *ept, void *data, size_t len, uint32_t src, void *priv) {
metal_log(METAL_LOG_DEBUG, "endpoint_recv_callback() message received src: %lu msg len: %d\n", src, len);
endpoint_obj_t *self = metal_container_of(ept, endpoint_obj_t, ep);
if (self->callback != mp_const_none) {
if (self->callback == mp_const_none) {
endpoint_rpmsg_enqueue(self, data);
} else {
mp_call_function_2(self->callback, mp_obj_new_int(src), mp_obj_new_bytearray_by_ref(len, data));
}
return 0;
@ -193,6 +229,40 @@ static mp_obj_t endpoint_send(uint n_args, const mp_obj_t *pos_args, mp_map_t *k
}
static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_send_obj, 2, endpoint_send);
static mp_obj_t endpoint_recv(uint n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) {
enum { ARG_timeout };
static const mp_arg_t allowed_args[] = {
{ MP_QSTR_timeout, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = 0 } },
};
// Parse args.
mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)];
mp_arg_parse_all(n_args - 2, pos_args + 2, kw_args, MP_ARRAY_SIZE(allowed_args), allowed_args, args);
endpoint_obj_t *self = MP_OBJ_TO_PTR(pos_args[0]);
vstr_t vstr;
size_t len = mp_obj_get_int(pos_args[1]);
vstr_init_len(&vstr, len);
mp_int_t timeout = args[ARG_timeout].u_int;
for (mp_uint_t start = mp_hal_ticks_ms(); ;) {
vstr.len = endpoint_rpmsg_dequeue(self, vstr.buf, len);
if (vstr.len > 0) {
break;
}
if (timeout == 0) {
break;
}
if (timeout > 0 && (mp_hal_ticks_ms() - start > timeout)) {
mp_raise_msg(&mp_type_OSError, MP_ERROR_TEXT("timeout waiting for message"));
}
MICROPY_EVENT_POLL_HOOK
}
return mp_obj_new_bytes_from_vstr(&vstr);
}
static MP_DEFINE_CONST_FUN_OBJ_KW(endpoint_recv_obj, 2, endpoint_recv);
static mp_obj_t endpoint_is_ready(mp_obj_t self_in) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
return is_rpmsg_ept_ready(&self->ep) ? mp_const_true : mp_const_false;
@ -209,10 +279,10 @@ static MP_DEFINE_CONST_FUN_OBJ_1(endpoint_deinit_obj, endpoint_deinit);
static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *all_args) {
enum { ARG_name, ARG_callback, ARG_src, ARG_dest };
static const mp_arg_t allowed_args[] = {
{ MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_callback, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
{ MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
{ MP_QSTR_name, MP_ARG_OBJ | MP_ARG_REQUIRED, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_callback, MP_ARG_OBJ, {.u_rom_obj = MP_ROM_NONE } },
{ MP_QSTR_src, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
{ MP_QSTR_dest, MP_ARG_INT | MP_ARG_KW_ONLY, {.u_int = RPMSG_ADDR_ANY } },
};
// Parse args.
@ -222,6 +292,7 @@ static mp_obj_t endpoint_make_new(const mp_obj_type_t *type, size_t n_args, size
endpoint_obj_t *self = mp_obj_malloc_with_finaliser(endpoint_obj_t, &endpoint_type);
self->name = args[ARG_name].u_obj;
self->callback = args[ARG_callback].u_obj;
self->head = self->tail = 0;
if (MP_STATE_PORT(virtio_device) == NULL) {
openamp_init();
@ -239,15 +310,66 @@ static const mp_rom_map_elem_t endpoint_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_Endpoint) },
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&endpoint_deinit_obj) },
{ MP_ROM_QSTR(MP_QSTR_send), MP_ROM_PTR(&endpoint_send_obj) },
{ MP_ROM_QSTR(MP_QSTR_recv), MP_ROM_PTR(&endpoint_recv_obj) },
{ MP_ROM_QSTR(MP_QSTR_is_ready), MP_ROM_PTR(&endpoint_is_ready_obj) },
};
static MP_DEFINE_CONST_DICT(endpoint_locals_dict, endpoint_locals_dict_table);
mp_uint_t endpoint_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
if (!is_rpmsg_ept_ready(&self->ep)) {
return MP_STREAM_ERROR;
}
return endpoint_rpmsg_dequeue(self, buf, size);
}
mp_uint_t endpoint_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
int ret = 0;
if (!is_rpmsg_ept_ready(&self->ep)) {
ret = MP_STREAM_ERROR;
} else {
ret = rpmsg_send(&self->ep, buf, size);
if (ret < 0) {
ret = MP_STREAM_ERROR;
}
}
return ret;
}
mp_uint_t endpoint_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t arg, int *errcode) {
endpoint_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_uint_t ret = 0;
if (request == MP_STREAM_CLOSE) {
rpmsg_destroy_ept(&self->ep);
} else if (request == MP_STREAM_POLL) {
if ((arg & MP_STREAM_POLL_RD) && self->head != self->tail) {
ret |= MP_STREAM_POLL_RD;
}
if ((arg & MP_STREAM_POLL_WR) && is_rpmsg_ept_ready(&self->ep)) {
ret |= MP_STREAM_POLL_WR;
}
} else {
*errcode = MP_EINVAL;
ret = MP_STREAM_ERROR;
}
return ret;
}
static const mp_stream_p_t endpoint_stream_p = {
.read = endpoint_read,
.write = endpoint_write,
.ioctl = endpoint_ioctl,
.is_text = false,
};
static MP_DEFINE_CONST_OBJ_TYPE(
endpoint_type,
MP_QSTR_Endpoint,
MP_TYPE_FLAG_NONE,
make_new, endpoint_make_new,
protocol, &endpoint_stream_p,
locals_dict, &endpoint_locals_dict
);