update to uWebSockets v20.35.0

pull/75/head
Ciro 2023-01-05 09:01:05 -03:00
rodzic 719fecb7b4
commit 83c8cc7258
11 zmienionych plików z 151 dodań i 78 usunięć

Wyświetl plik

@ -153,8 +153,9 @@ app.ws("/*", {
'idle_timeout': 12,
'open': ws_open,
'message': ws_message,
'drain': lambda ws: print('WebSocket backpressure: %i' % ws.get_buffered_amount()),
'close': lambda ws, code, message: print('WebSocket closed')
'drain': lambda ws: print(f'WebSocket backpressure: {ws.get_buffered_amount()}'),
'close': lambda ws, code, message: print('WebSocket closed'),
'subscription': lambda ws, topic, subscriptions, subscriptions_before: print(f'subscription/unsubscription on topic {topic} {subscriptions} {subscriptions_before}'),
})
app.any("/", lambda res,req: res.end("Nothing to see here!'"))
app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port)))

Wyświetl plik

@ -13,10 +13,9 @@ app.ws(
"idle_timeout": 12,
"open": ws_open,
"message": ws_message,
"drain": lambda ws: print(
"WebSocket backpressure: %s", ws.get_buffered_amount()
),
'drain': lambda ws: print(f'WebSocket backpressure: {ws.get_buffered_amount()}'),
"close": lambda ws, code, message: print("WebSocket closed"),
"subscription": lambda ws, topic, subscriptions, subscriptions_before: print(f'subscription/unsubscription on topic {topic} {subscriptions} {subscriptions_before}'),
},
)
```

Wyświetl plik

@ -8,23 +8,21 @@ def ws_open(ws):
def ws_message(ws, message, opcode):
# Ok is false if backpressure was built up, wait for drain
ok = ws.send(message, opcode)
# Broadcast this message
ws.publish("broadcast", message, opcode)
app = App()
app.ws(
"/*",
{
"compression": CompressOptions.SHARED_COMPRESSOR,
"max_payload_length": 16 * 1024 * 1024,
"idle_timeout": 12,
"idle_timeout": 60,
"open": ws_open,
"message": ws_message,
# The library guarantees proper unsubscription at close
"close": lambda ws, code, message: print("WebSocket closed"),
"subscription": lambda ws, topic, subscriptions, subscriptions_before: print(f'subscription/unsubscription on topic {topic} {subscriptions} {subscriptions_before}'),
},
)
app.any("/", lambda res, req: res.end("Nothing to see here!"))

Wyświetl plik

@ -110,7 +110,6 @@ def ws_message(ws, message, opcode):
history = history[::100]
#broadcast
ws.send(message_data, OpCode.TEXT)
ws.publish(room, message_data, OpCode.TEXT)
except:

Wyświetl plik

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "socketify"
version = "0.0.2"
version = "0.0.3"
authors = [
{ name="Ciro Spaciari", email="ciro.spaciari@gmail.com" },
]

Wyświetl plik

@ -58,7 +58,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
setuptools.setup(
name="socketify",
version="0.0.2",
version="0.0.3",
platforms=["any"],
author="Ciro Spaciari",
author_email="ciro.spaciari@gmail.com",

Wyświetl plik

@ -598,6 +598,7 @@ class _ASGI:
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
native_behavior.subscription = ffi.NULL
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr

Wyświetl plik

@ -123,6 +123,8 @@ typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *m
typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data);
typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data);
typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data);
typedef void (*uws_websocket_subscription_handler)(uws_websocket_t *ws, const char *topic_name, size_t topic_name_length, int new_number_of_subscriber, int old_number_of_subscriber, void* user_data);
typedef struct
{
uws_compress_options_t compression;
@ -140,6 +142,7 @@ typedef struct
uws_websocket_ping_pong_handler ping;
uws_websocket_ping_pong_handler pong;
uws_websocket_close_handler close;
uws_websocket_subscription_handler subscription;
} uws_socket_behavior_t;
typedef struct {

Wyświetl plik

@ -79,7 +79,7 @@ def uws_websocket_drain_handler(ws, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
ws = WebSocket(ws, app)
handler = handlers.drain
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws))
@ -90,6 +90,66 @@ def uws_websocket_drain_handler(ws, user_data):
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char *, size_t, int, int, void*)")
def uws_websocket_factory_subscription_handler(ws, topic_name, topic_name_length, new_number_of_subscriber, old_number_of_subscriber, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
instances = app._ws_factory.get(app, ws)
ws, dispose = instances
try:
if topic_name == ffi.NULL:
topic = None
else:
topic = ffi.unpack(topic_name, topic_name_length).decode("utf-8")
handler = handlers.subscription
if inspect.iscoroutinefunction(handler):
if dispose:
async def wrapper(app, instances, handler, ws, topic, new_number_of_subscriber, old_number_of_subscriber):
try:
await handler(ws, topic, new_number_of_subscriber, old_number_of_subscriber)
finally:
app._ws_factory.dispose(instances)
app.run_async(wrapper(app, instances, handler, ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber)))
else:
app.run_async(handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber)))
else:
handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber))
if dispose:
app._ws_factory.dispose(instances)
except Exception as err:
if dispose:
app._ws_factory.dispose(instances)
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char *, size_t, int, int, void*)")
def uws_websocket_subscription_handler(ws, topic_name, topic_name_length, new_number_of_subscriber, old_number_of_subscriber, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
handler = handlers.subscription
if topic_name == ffi.NULL:
topic = None
else:
topic = ffi.unpack(topic_name, topic_name_length).decode("utf-8")
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber)))
else:
handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber))
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_factory_open_handler(ws, user_data):
@ -127,7 +187,7 @@ def uws_websocket_open_handler(ws, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
ws = WebSocket(ws, app)
handler = handlers.open
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws))
@ -183,7 +243,7 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
ws = WebSocket(ws, app)
if message == ffi.NULL:
data = None
@ -245,7 +305,7 @@ def uws_websocket_pong_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
ws = WebSocket(ws, app)
if message == ffi.NULL:
data = None
else:
@ -304,7 +364,7 @@ def uws_websocket_ping_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
ws = WebSocket(ws, app)
if message == ffi.NULL:
data = None
@ -374,7 +434,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data):
try:
handlers, app = ffi.from_handle(user_data)
# pass to free data on WebSocket if needed
ws = WebSocket(ws, app.SSL, app.loop)
ws = WebSocket(ws, app)
if message == ffi.NULL:
data = None
@ -492,8 +552,8 @@ def uws_req_for_each_topic_handler(topic, topic_size, user_data):
if user_data != ffi.NULL:
try:
ws = ffi.from_handle(user_data)
header_name = ffi.unpack(topic, topic_size).decode("utf-8")
ws.trigger_for_each_topic_handler(header_name, header_value)
topic = ffi.unpack(topic, topic_size).decode("utf-8")
ws.trigger_for_each_topic_handler(topic)
except Exception: # invalid utf-8
return
@ -714,11 +774,10 @@ class SendStatus(IntEnum):
class WebSocket:
def __init__(self, websocket, ssl, loop):
def __init__(self, websocket, app):
self.ws = websocket
self.SSL = ssl
self._ptr = ffi.new_handle(self)
self.loop = loop
self.app = app
self._cork_handler = None
self._for_each_topic_handler = None
self.socket_data_id = None
@ -743,7 +802,7 @@ class WebSocket:
try:
if self.got_socket_data:
return self.socket_data_id
user_data = lib.uws_ws_get_user_data(self.SSL, self.ws)
user_data = lib.uws_ws_get_user_data(self.app.SSL, self.ws)
if user_data == ffi.NULL:
return None
(data, socket_data_id) = ffi.from_handle(user_data)
@ -758,7 +817,7 @@ class WebSocket:
try:
if self.got_socket_data:
return self.socket_data
user_data = lib.uws_ws_get_user_data(self.SSL, self.ws)
user_data = lib.uws_ws_get_user_data(self.app.SSL, self.ws)
if user_data == ffi.NULL:
return None
(data, socket_data_id) = ffi.from_handle(user_data)
@ -770,7 +829,7 @@ class WebSocket:
return None
def get_buffered_amount(self):
return int(lib.uws_ws_get_buffered_amount(self.SSL, self.ws))
return int(lib.uws_ws_get_buffered_amount(self.app.SSL, self.ws))
def subscribe(self, topic):
try:
@ -781,7 +840,7 @@ class WebSocket:
else:
return False
return bool(lib.uws_ws_subscribe(self.SSL, self.ws, data, len(data)))
return bool(lib.uws_ws_subscribe(self.app.SSL, self.ws, data, len(data)))
except:
return False
@ -794,7 +853,7 @@ class WebSocket:
else:
return False
return bool(lib.uws_ws_unsubscribe(self.SSL, self.ws, data, len(data)))
return bool(lib.uws_ws_unsubscribe(self.app.SSL, self.ws, data, len(data)))
except:
return False
@ -807,42 +866,45 @@ class WebSocket:
else:
return False
return bool(lib.uws_ws_is_subscribed(self.SSL, self.ws, data, len(data)))
return bool(lib.uws_ws_is_subscribed(self.app.SSL, self.ws, data, len(data)))
except:
return False
def publish(self, topic, message, opcode=OpCode.BINARY, compress=False):
try:
if isinstance(topic, str):
topic_data = topic.encode("utf-8")
elif isinstance(topic, bytes):
topic_data = topic
else:
return False
# publish in app just send to everyone and default uws_ws_publish ignores the current connection
# so we use the same publish in app to keep the same behavior
return self.app.publish(topic, message, opcode, compress)
# try:
# if isinstance(topic, str):
# topic_data = topic.encode("utf-8")
# elif isinstance(topic, bytes):
# topic_data = topic
# else:
# return False
if isinstance(message, str):
data = message.encode("utf-8")
elif isinstance(message, bytes):
data = message
elif message is None:
data = b""
else:
data = json.dumps(message).encode("utf-8")
# if isinstance(message, str):
# data = message.encode("utf-8")
# elif isinstance(message, bytes):
# data = message
# elif message is None:
# data = b""
# else:
# data = json.dumps(message).encode("utf-8")
return bool(
lib.uws_ws_publish_with_options(
self.SSL,
self.ws,
topic_data,
len(topic_data),
data,
len(data),
int(opcode),
bool(compress),
)
)
except:
return False
# return bool(
# lib.uws_ws_publish_with_options(
# self.app.SSL,
# self.ws,
# topic_data,
# len(topic_data),
# data,
# len(data),
# int(opcode),
# bool(compress),
# )
# )
# except:
# return False
def get_topics(self):
topics = []
@ -856,12 +918,12 @@ class WebSocket:
def for_each_topic(self, handler):
self._for_each_topic_handler = handler
lib.uws_ws_iterate_topics(
self.SSL, self.ws, uws_req_for_each_topic_handler, self._ptr
self.app.SSL, self.ws, uws_req_for_each_topic_handler, self._ptr
)
def get_remote_address_bytes(self):
buffer = ffi.new("char**")
length = lib.uws_ws_get_remote_address(self.SSL, self.ws, buffer)
length = lib.uws_ws_get_remote_address(self.app.SSL, self.ws, buffer)
buffer_address = ffi.addressof(buffer, 0)[0]
if buffer_address == ffi.NULL:
return None
@ -872,7 +934,7 @@ class WebSocket:
def get_remote_address(self):
buffer = ffi.new("char**")
length = lib.uws_ws_get_remote_address_as_text(self.SSL, self.ws, buffer)
length = lib.uws_ws_get_remote_address_as_text(self.app.SSL, self.ws, buffer)
buffer_address = ffi.addressof(buffer, 0)[0]
if buffer_address == ffi.NULL:
return None
@ -888,13 +950,13 @@ class WebSocket:
elif isinstance(message, bytes):
data = message
elif message is None:
lib.uws_ws_send_fragment(self.SSL, self.ws, b"", 0, compress)
lib.uws_ws_send_fragment(self.app.SSL, self.ws, b"", 0, compress)
return self
else:
data = json.dumps(message).encode("utf-8")
return SendStatus(
lib.uws_ws_send_fragment(self.SSL, self.ws, data, len(data), compress)
lib.uws_ws_send_fragment(self.app.SSL, self.ws, data, len(data), compress)
)
except:
return None
@ -906,14 +968,14 @@ class WebSocket:
elif isinstance(message, bytes):
data = message
elif message is None:
lib.uws_ws_send_last_fragment(self.SSL, self.ws, b"", 0, compress)
lib.uws_ws_send_last_fragment(self.app.SSL, self.ws, b"", 0, compress)
return self
else:
data = json.dumps(message).encode("utf-8")
return SendStatus(
lib.uws_ws_send_last_fragment(
self.SSL, self.ws, data, len(data), compress
self.app.SSL, self.ws, data, len(data), compress
)
)
except:
@ -927,7 +989,7 @@ class WebSocket:
data = message
elif message is None:
lib.uws_ws_send_first_fragment_with_opcode(
self.SSL, self.ws, b"", 0, int(opcode), compress
self.app.SSL, self.ws, b"", 0, int(opcode), compress
)
return self
else:
@ -935,7 +997,7 @@ class WebSocket:
return SendStatus(
lib.uws_ws_send_first_fragment_with_opcode(
self.SSL, self.ws, data, len(data), int(opcode), compress
self.app.SSL, self.ws, data, len(data), int(opcode), compress
)
)
except:
@ -953,7 +1015,7 @@ class WebSocket:
data = message
elif message is None:
lib.uws_ws_send_with_options(
self.SSL, self.ws, b"", 0, int(opcode), compress, fin
self.app.SSL, self.ws, b"", 0, int(opcode), compress, fin
)
return self
else:
@ -961,7 +1023,7 @@ class WebSocket:
return SendStatus(
lib.uws_ws_send_with_options(
self.SSL, self.ws, data, len(data), int(opcode), compress, fin
self.app.SSL, self.ws, data, len(data), int(opcode), compress, fin
)
)
except:
@ -980,22 +1042,22 @@ class WebSocket:
elif isinstance(message, bytes):
data = message
elif message is None:
lib.uws_ws_end(self.SSL, self.ws, b"", 0)
lib.uws_ws_end(self.app.SSL, self.ws, b"", 0)
return self
else:
data = json.dumps(message).encode("utf-8")
lib.uws_ws_end(self.SSL, self.ws, code, data, len(data))
lib.uws_ws_end(self.app.SSL, self.ws, code, data, len(data))
finally:
return self
def close(self):
lib.uws_ws_close(self.SSL, self.ws)
lib.uws_ws_close(self.app.SSL, self.ws)
return self
def cork(self, callback):
self._cork_handler = callback
lib.uws_ws_cork(self.SSL, self.ws, uws_ws_cork_handler, self._ptr)
lib.uws_ws_cork(self.app.SSL, self.ws, uws_ws_cork_handler, self._ptr)
def __del__(self):
self.ws = ffi.NULL
@ -1011,17 +1073,18 @@ class WSBehaviorHandlers:
self.ping = None
self.pong = None
self.close = None
self.subscription = None
class WebSocketFactory:
def __init__(self, app, max_size):
self.factory_queue = []
for _ in range(0, max_size):
websocket = WebSocket(None, app.SSL, app.loop)
websocket = WebSocket(None, app)
self.factory_queue.append((websocket, True))
def get(self, app, ws):
if len(self.factory_queue) == 0:
response = WebSocket(ws, app.SSL, app.loop)
response = WebSocket(ws, app)
return response, False
instances = self.factory_queue.pop()
@ -2031,7 +2094,7 @@ class App:
elif isinstance(topic, bytes):
topic_data = topic
else:
raise RuntimeError("topic need to be an String or Bytes")
return False
if isinstance(message, str):
message_data = message.encode("utf-8")
@ -2151,6 +2214,7 @@ class App:
ping_handler = None
pong_handler = None
close_handler = None
subscription_handler = None
if behavior is None:
raise RuntimeError("behavior must be an dict or WSBehavior")
@ -2174,6 +2238,7 @@ class App:
ping_handler = behavior.get("ping", None)
pong_handler = behavior.get("pong", None)
close_handler = behavior.get("close", None)
subscription_handler = behavior.get("subscription", None)
native_behavior.maxPayloadLength = ffi.cast(
"unsigned int",
@ -2246,6 +2311,13 @@ class App:
else: # always keep an close
native_behavior.close = uws_websocket_close_handler
if subscription_handler:
handlers.subscription = subscription_handler
native_behavior.subscription = uws_websocket_factory_subscription_handler if self._ws_factory else uws_websocket_subscription_handler
else: # always keep an close
native_behavior.subscription = ffi.NULL
user_data = ffi.new_handle((handlers, self))
self.handlers.append(user_data) # Keep alive handlers
lib.uws_ws(self.SSL, self.app, path.encode("utf-8"), native_behavior, user_data)

@ -1 +1 @@
Subproject commit 5773238f673348d560e769b10a424c6dbe598730
Subproject commit a4fd720e0c373aa9e8c6703f9368239d1369606f

Wyświetl plik

@ -353,7 +353,7 @@ class _WSGI:
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
native_behavior.subscription = ffi.NULL
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr