From 83c8cc7258274639de60d63a16292a02e5fb6f23 Mon Sep 17 00:00:00 2001 From: Ciro Date: Thu, 5 Jan 2023 09:01:05 -0300 Subject: [PATCH] update to uWebSockets v20.35.0 --- README.md | 5 +- docs/websockets-backpressure.md | 5 +- examples/broadcast.py | 6 +- examples/chat/index.py | 1 - pyproject.toml | 2 +- setup.py | 2 +- src/socketify/asgi.py | 1 + src/socketify/native.py | 3 + src/socketify/socketify.py | 200 ++++++++++++++++++++++---------- src/socketify/uWebSockets | 2 +- src/socketify/wsgi.py | 2 +- 11 files changed, 151 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index dc91da1..6faf968 100644 --- a/README.md +++ b/README.md @@ -153,8 +153,9 @@ app.ws("/*", { 'idle_timeout': 12, 'open': ws_open, 'message': ws_message, - 'drain': lambda ws: print('WebSocket backpressure: %i' % ws.get_buffered_amount()), - 'close': lambda ws, code, message: print('WebSocket closed') + 'drain': lambda ws: print(f'WebSocket backpressure: {ws.get_buffered_amount()}'), + 'close': lambda ws, code, message: print('WebSocket closed'), + 'subscription': lambda ws, topic, subscriptions, subscriptions_before: print(f'subscription/unsubscription on topic {topic} {subscriptions} {subscriptions_before}'), }) app.any("/", lambda res,req: res.end("Nothing to see here!'")) app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port))) diff --git a/docs/websockets-backpressure.md b/docs/websockets-backpressure.md index af7e1d6..e06a49b 100644 --- a/docs/websockets-backpressure.md +++ b/docs/websockets-backpressure.md @@ -13,10 +13,9 @@ app.ws( "idle_timeout": 12, "open": ws_open, "message": ws_message, - "drain": lambda ws: print( - "WebSocket backpressure: %s", ws.get_buffered_amount() - ), + 'drain': lambda ws: print(f'WebSocket backpressure: {ws.get_buffered_amount()}'), "close": lambda ws, code, message: print("WebSocket closed"), + "subscription": lambda ws, topic, subscriptions, subscriptions_before: print(f'subscription/unsubscription on topic {topic} {subscriptions} {subscriptions_before}'), }, ) ``` diff --git a/examples/broadcast.py b/examples/broadcast.py index d04e4fc..3712a52 100644 --- a/examples/broadcast.py +++ b/examples/broadcast.py @@ -8,23 +8,21 @@ def ws_open(ws): def ws_message(ws, message, opcode): - # Ok is false if backpressure was built up, wait for drain - ok = ws.send(message, opcode) # Broadcast this message ws.publish("broadcast", message, opcode) - app = App() app.ws( "/*", { "compression": CompressOptions.SHARED_COMPRESSOR, "max_payload_length": 16 * 1024 * 1024, - "idle_timeout": 12, + "idle_timeout": 60, "open": ws_open, "message": ws_message, # The library guarantees proper unsubscription at close "close": lambda ws, code, message: print("WebSocket closed"), + "subscription": lambda ws, topic, subscriptions, subscriptions_before: print(f'subscription/unsubscription on topic {topic} {subscriptions} {subscriptions_before}'), }, ) app.any("/", lambda res, req: res.end("Nothing to see here!")) diff --git a/examples/chat/index.py b/examples/chat/index.py index 01c3214..20a9160 100644 --- a/examples/chat/index.py +++ b/examples/chat/index.py @@ -110,7 +110,6 @@ def ws_message(ws, message, opcode): history = history[::100] #broadcast - ws.send(message_data, OpCode.TEXT) ws.publish(room, message_data, OpCode.TEXT) except: diff --git a/pyproject.toml b/pyproject.toml index e986ede..92c0010 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "socketify" -version = "0.0.2" +version = "0.0.3" authors = [ { name="Ciro Spaciari", email="ciro.spaciari@gmail.com" }, ] diff --git a/setup.py b/setup.py index c25b5e5..76f3724 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ with open("README.md", "r", encoding="utf-8") as fh: setuptools.setup( name="socketify", - version="0.0.2", + version="0.0.3", platforms=["any"], author="Ciro Spaciari", author_email="ciro.spaciari@gmail.com", diff --git a/src/socketify/asgi.py b/src/socketify/asgi.py index 1625d7c..750b11f 100644 --- a/src/socketify/asgi.py +++ b/src/socketify/asgi.py @@ -598,6 +598,7 @@ class _ASGI: native_behavior.ping = ffi.NULL native_behavior.pong = ffi.NULL native_behavior.close = ws_close + native_behavior.subscription = ffi.NULL self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr diff --git a/src/socketify/native.py b/src/socketify/native.py index 3c50a8c..6fbbd49 100644 --- a/src/socketify/native.py +++ b/src/socketify/native.py @@ -123,6 +123,8 @@ typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *m typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data); typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data); typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data); +typedef void (*uws_websocket_subscription_handler)(uws_websocket_t *ws, const char *topic_name, size_t topic_name_length, int new_number_of_subscriber, int old_number_of_subscriber, void* user_data); + typedef struct { uws_compress_options_t compression; @@ -140,6 +142,7 @@ typedef struct uws_websocket_ping_pong_handler ping; uws_websocket_ping_pong_handler pong; uws_websocket_close_handler close; + uws_websocket_subscription_handler subscription; } uws_socket_behavior_t; typedef struct { diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 30a0840..688015e 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -79,7 +79,7 @@ def uws_websocket_drain_handler(ws, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) + ws = WebSocket(ws, app) handler = handlers.drain if inspect.iscoroutinefunction(handler): app.run_async(handler(ws)) @@ -90,6 +90,66 @@ def uws_websocket_drain_handler(ws, user_data): "Uncaught Exception: %s" % str(err) ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char *, size_t, int, int, void*)") +def uws_websocket_factory_subscription_handler(ws, topic_name, topic_name_length, new_number_of_subscriber, old_number_of_subscriber, user_data): + if user_data != ffi.NULL: + handlers, app = ffi.from_handle(user_data) + instances = app._ws_factory.get(app, ws) + ws, dispose = instances + try: + + if topic_name == ffi.NULL: + topic = None + else: + topic = ffi.unpack(topic_name, topic_name_length).decode("utf-8") + + + handler = handlers.subscription + if inspect.iscoroutinefunction(handler): + + if dispose: + async def wrapper(app, instances, handler, ws, topic, new_number_of_subscriber, old_number_of_subscriber): + try: + await handler(ws, topic, new_number_of_subscriber, old_number_of_subscriber) + finally: + app._ws_factory.dispose(instances) + + app.run_async(wrapper(app, instances, handler, ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber))) + else: + app.run_async(handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber))) + else: + handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber)) + if dispose: + app._ws_factory.dispose(instances) + except Exception as err: + if dispose: + app._ws_factory.dispose(instances) + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + +@ffi.callback("void(uws_websocket_t*, const char *, size_t, int, int, void*)") +def uws_websocket_subscription_handler(ws, topic_name, topic_name_length, new_number_of_subscriber, old_number_of_subscriber, user_data): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + handler = handlers.subscription + + if topic_name == ffi.NULL: + topic = None + else: + topic = ffi.unpack(topic_name, topic_name_length).decode("utf-8") + + if inspect.iscoroutinefunction(handler): + app.run_async(handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber))) + else: + handler(ws, topic, int(new_number_of_subscriber), int(old_number_of_subscriber)) + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + @ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_factory_open_handler(ws, user_data): @@ -127,7 +187,7 @@ def uws_websocket_open_handler(ws, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) + ws = WebSocket(ws, app) handler = handlers.open if inspect.iscoroutinefunction(handler): app.run_async(handler(ws)) @@ -183,7 +243,7 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) + ws = WebSocket(ws, app) if message == ffi.NULL: data = None @@ -245,7 +305,7 @@ def uws_websocket_pong_handler(ws, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) + ws = WebSocket(ws, app) if message == ffi.NULL: data = None else: @@ -304,7 +364,7 @@ def uws_websocket_ping_handler(ws, message, length, user_data): if user_data != ffi.NULL: try: handlers, app = ffi.from_handle(user_data) - ws = WebSocket(ws, app.SSL, app.loop) + ws = WebSocket(ws, app) if message == ffi.NULL: data = None @@ -374,7 +434,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data): try: handlers, app = ffi.from_handle(user_data) # pass to free data on WebSocket if needed - ws = WebSocket(ws, app.SSL, app.loop) + ws = WebSocket(ws, app) if message == ffi.NULL: data = None @@ -492,8 +552,8 @@ def uws_req_for_each_topic_handler(topic, topic_size, user_data): if user_data != ffi.NULL: try: ws = ffi.from_handle(user_data) - header_name = ffi.unpack(topic, topic_size).decode("utf-8") - ws.trigger_for_each_topic_handler(header_name, header_value) + topic = ffi.unpack(topic, topic_size).decode("utf-8") + ws.trigger_for_each_topic_handler(topic) except Exception: # invalid utf-8 return @@ -714,11 +774,10 @@ class SendStatus(IntEnum): class WebSocket: - def __init__(self, websocket, ssl, loop): + def __init__(self, websocket, app): self.ws = websocket - self.SSL = ssl self._ptr = ffi.new_handle(self) - self.loop = loop + self.app = app self._cork_handler = None self._for_each_topic_handler = None self.socket_data_id = None @@ -743,7 +802,7 @@ class WebSocket: try: if self.got_socket_data: return self.socket_data_id - user_data = lib.uws_ws_get_user_data(self.SSL, self.ws) + user_data = lib.uws_ws_get_user_data(self.app.SSL, self.ws) if user_data == ffi.NULL: return None (data, socket_data_id) = ffi.from_handle(user_data) @@ -758,7 +817,7 @@ class WebSocket: try: if self.got_socket_data: return self.socket_data - user_data = lib.uws_ws_get_user_data(self.SSL, self.ws) + user_data = lib.uws_ws_get_user_data(self.app.SSL, self.ws) if user_data == ffi.NULL: return None (data, socket_data_id) = ffi.from_handle(user_data) @@ -770,7 +829,7 @@ class WebSocket: return None def get_buffered_amount(self): - return int(lib.uws_ws_get_buffered_amount(self.SSL, self.ws)) + return int(lib.uws_ws_get_buffered_amount(self.app.SSL, self.ws)) def subscribe(self, topic): try: @@ -781,7 +840,7 @@ class WebSocket: else: return False - return bool(lib.uws_ws_subscribe(self.SSL, self.ws, data, len(data))) + return bool(lib.uws_ws_subscribe(self.app.SSL, self.ws, data, len(data))) except: return False @@ -794,7 +853,7 @@ class WebSocket: else: return False - return bool(lib.uws_ws_unsubscribe(self.SSL, self.ws, data, len(data))) + return bool(lib.uws_ws_unsubscribe(self.app.SSL, self.ws, data, len(data))) except: return False @@ -807,42 +866,45 @@ class WebSocket: else: return False - return bool(lib.uws_ws_is_subscribed(self.SSL, self.ws, data, len(data))) + return bool(lib.uws_ws_is_subscribed(self.app.SSL, self.ws, data, len(data))) except: return False def publish(self, topic, message, opcode=OpCode.BINARY, compress=False): - try: - if isinstance(topic, str): - topic_data = topic.encode("utf-8") - elif isinstance(topic, bytes): - topic_data = topic - else: - return False + # publish in app just send to everyone and default uws_ws_publish ignores the current connection + # so we use the same publish in app to keep the same behavior + return self.app.publish(topic, message, opcode, compress) + # try: + # if isinstance(topic, str): + # topic_data = topic.encode("utf-8") + # elif isinstance(topic, bytes): + # topic_data = topic + # else: + # return False - if isinstance(message, str): - data = message.encode("utf-8") - elif isinstance(message, bytes): - data = message - elif message is None: - data = b"" - else: - data = json.dumps(message).encode("utf-8") + # if isinstance(message, str): + # data = message.encode("utf-8") + # elif isinstance(message, bytes): + # data = message + # elif message is None: + # data = b"" + # else: + # data = json.dumps(message).encode("utf-8") - return bool( - lib.uws_ws_publish_with_options( - self.SSL, - self.ws, - topic_data, - len(topic_data), - data, - len(data), - int(opcode), - bool(compress), - ) - ) - except: - return False + # return bool( + # lib.uws_ws_publish_with_options( + # self.app.SSL, + # self.ws, + # topic_data, + # len(topic_data), + # data, + # len(data), + # int(opcode), + # bool(compress), + # ) + # ) + # except: + # return False def get_topics(self): topics = [] @@ -856,12 +918,12 @@ class WebSocket: def for_each_topic(self, handler): self._for_each_topic_handler = handler lib.uws_ws_iterate_topics( - self.SSL, self.ws, uws_req_for_each_topic_handler, self._ptr + self.app.SSL, self.ws, uws_req_for_each_topic_handler, self._ptr ) def get_remote_address_bytes(self): buffer = ffi.new("char**") - length = lib.uws_ws_get_remote_address(self.SSL, self.ws, buffer) + length = lib.uws_ws_get_remote_address(self.app.SSL, self.ws, buffer) buffer_address = ffi.addressof(buffer, 0)[0] if buffer_address == ffi.NULL: return None @@ -872,7 +934,7 @@ class WebSocket: def get_remote_address(self): buffer = ffi.new("char**") - length = lib.uws_ws_get_remote_address_as_text(self.SSL, self.ws, buffer) + length = lib.uws_ws_get_remote_address_as_text(self.app.SSL, self.ws, buffer) buffer_address = ffi.addressof(buffer, 0)[0] if buffer_address == ffi.NULL: return None @@ -888,13 +950,13 @@ class WebSocket: elif isinstance(message, bytes): data = message elif message is None: - lib.uws_ws_send_fragment(self.SSL, self.ws, b"", 0, compress) + lib.uws_ws_send_fragment(self.app.SSL, self.ws, b"", 0, compress) return self else: data = json.dumps(message).encode("utf-8") return SendStatus( - lib.uws_ws_send_fragment(self.SSL, self.ws, data, len(data), compress) + lib.uws_ws_send_fragment(self.app.SSL, self.ws, data, len(data), compress) ) except: return None @@ -906,14 +968,14 @@ class WebSocket: elif isinstance(message, bytes): data = message elif message is None: - lib.uws_ws_send_last_fragment(self.SSL, self.ws, b"", 0, compress) + lib.uws_ws_send_last_fragment(self.app.SSL, self.ws, b"", 0, compress) return self else: data = json.dumps(message).encode("utf-8") return SendStatus( lib.uws_ws_send_last_fragment( - self.SSL, self.ws, data, len(data), compress + self.app.SSL, self.ws, data, len(data), compress ) ) except: @@ -927,7 +989,7 @@ class WebSocket: data = message elif message is None: lib.uws_ws_send_first_fragment_with_opcode( - self.SSL, self.ws, b"", 0, int(opcode), compress + self.app.SSL, self.ws, b"", 0, int(opcode), compress ) return self else: @@ -935,7 +997,7 @@ class WebSocket: return SendStatus( lib.uws_ws_send_first_fragment_with_opcode( - self.SSL, self.ws, data, len(data), int(opcode), compress + self.app.SSL, self.ws, data, len(data), int(opcode), compress ) ) except: @@ -953,7 +1015,7 @@ class WebSocket: data = message elif message is None: lib.uws_ws_send_with_options( - self.SSL, self.ws, b"", 0, int(opcode), compress, fin + self.app.SSL, self.ws, b"", 0, int(opcode), compress, fin ) return self else: @@ -961,7 +1023,7 @@ class WebSocket: return SendStatus( lib.uws_ws_send_with_options( - self.SSL, self.ws, data, len(data), int(opcode), compress, fin + self.app.SSL, self.ws, data, len(data), int(opcode), compress, fin ) ) except: @@ -980,22 +1042,22 @@ class WebSocket: elif isinstance(message, bytes): data = message elif message is None: - lib.uws_ws_end(self.SSL, self.ws, b"", 0) + lib.uws_ws_end(self.app.SSL, self.ws, b"", 0) return self else: data = json.dumps(message).encode("utf-8") - lib.uws_ws_end(self.SSL, self.ws, code, data, len(data)) + lib.uws_ws_end(self.app.SSL, self.ws, code, data, len(data)) finally: return self def close(self): - lib.uws_ws_close(self.SSL, self.ws) + lib.uws_ws_close(self.app.SSL, self.ws) return self def cork(self, callback): self._cork_handler = callback - lib.uws_ws_cork(self.SSL, self.ws, uws_ws_cork_handler, self._ptr) + lib.uws_ws_cork(self.app.SSL, self.ws, uws_ws_cork_handler, self._ptr) def __del__(self): self.ws = ffi.NULL @@ -1011,17 +1073,18 @@ class WSBehaviorHandlers: self.ping = None self.pong = None self.close = None + self.subscription = None class WebSocketFactory: def __init__(self, app, max_size): self.factory_queue = [] for _ in range(0, max_size): - websocket = WebSocket(None, app.SSL, app.loop) + websocket = WebSocket(None, app) self.factory_queue.append((websocket, True)) def get(self, app, ws): if len(self.factory_queue) == 0: - response = WebSocket(ws, app.SSL, app.loop) + response = WebSocket(ws, app) return response, False instances = self.factory_queue.pop() @@ -2031,7 +2094,7 @@ class App: elif isinstance(topic, bytes): topic_data = topic else: - raise RuntimeError("topic need to be an String or Bytes") + return False if isinstance(message, str): message_data = message.encode("utf-8") @@ -2151,6 +2214,7 @@ class App: ping_handler = None pong_handler = None close_handler = None + subscription_handler = None if behavior is None: raise RuntimeError("behavior must be an dict or WSBehavior") @@ -2174,6 +2238,7 @@ class App: ping_handler = behavior.get("ping", None) pong_handler = behavior.get("pong", None) close_handler = behavior.get("close", None) + subscription_handler = behavior.get("subscription", None) native_behavior.maxPayloadLength = ffi.cast( "unsigned int", @@ -2246,6 +2311,13 @@ class App: else: # always keep an close native_behavior.close = uws_websocket_close_handler + if subscription_handler: + handlers.subscription = subscription_handler + native_behavior.subscription = uws_websocket_factory_subscription_handler if self._ws_factory else uws_websocket_subscription_handler + else: # always keep an close + native_behavior.subscription = ffi.NULL + + user_data = ffi.new_handle((handlers, self)) self.handlers.append(user_data) # Keep alive handlers lib.uws_ws(self.SSL, self.app, path.encode("utf-8"), native_behavior, user_data) diff --git a/src/socketify/uWebSockets b/src/socketify/uWebSockets index 5773238..a4fd720 160000 --- a/src/socketify/uWebSockets +++ b/src/socketify/uWebSockets @@ -1 +1 @@ -Subproject commit 5773238f673348d560e769b10a424c6dbe598730 +Subproject commit a4fd720e0c373aa9e8c6703f9368239d1369606f diff --git a/src/socketify/wsgi.py b/src/socketify/wsgi.py index c2ee8a3..1bcb105 100644 --- a/src/socketify/wsgi.py +++ b/src/socketify/wsgi.py @@ -353,7 +353,7 @@ class _WSGI: native_behavior.ping = ffi.NULL native_behavior.pong = ffi.NULL native_behavior.close = ws_close - + native_behavior.subscription = ffi.NULL self.asgi_ws_info = lib.socketify_add_asgi_ws_handler( self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr