diff --git a/bench/websockets/socketify_server.py b/bench/websockets/socketify_server.py index d66fdc6..80add8e 100644 --- a/bench/websockets/socketify_server.py +++ b/bench/websockets/socketify_server.py @@ -1,7 +1,7 @@ from socketify import App, AppOptions, OpCode, CompressOptions remaining_clients = 16 -app = App(websocket_factory_max_itens=1_500_000) +app = App(websocket_factory_max_items=1_500_000) def ws_open(ws): diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 1a3e90c..8de418e 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -18,6 +18,7 @@ from .status_codes import status_codes from .helpers import static_route from dataclasses import dataclass from .helpers import DecoratorRouter +from typing import Union mimetypes.init() @@ -76,6 +77,27 @@ def uws_websocket_factory_drain_handler(ws, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, void*)") +def uws_websocket_drain_handler_with_extension(ws, user_data): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_methods(ws) + handler = handlers.drain + if inspect.iscoroutinefunction(handler): + app.run_async(handler(ws)) + else: + handler(ws) + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_drain_handler(ws, user_data): if user_data != ffi.NULL: @@ -216,6 +238,52 @@ def uws_websocket_subscription_handler( ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char *, size_t, int, int, void*)") +def uws_websocket_subscription_handler_with_extension( + ws, + topic_name, + topic_name_length, + new_number_of_subscriber, + old_number_of_subscriber, + user_data, +): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_meth + handler = handlers.subscription + + if topic_name == ffi.NULL: + topic = None + else: + topic = ffi.unpack(topic_name, topic_name_length).decode("utf-8") + + if inspect.iscoroutinefunction(handler): + app.run_async( + handler( + ws, + topic, + int(new_number_of_subscriber), + int(old_number_of_subscriber), + ) + ) + else: + handler( + ws, + topic, + int(new_number_of_subscriber), + int(old_number_of_subscriber), + ) + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_factory_open_handler(ws, user_data): if user_data != ffi.NULL: @@ -248,6 +316,28 @@ def uws_websocket_factory_open_handler(ws, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, void*)") +def uws_websocket_open_handler_with_extension(ws, user_data): + + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_meth + handler = handlers.open + if inspect.iscoroutinefunction(handler): + app.run_async(handler(ws)) + else: + handler(ws) + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, void*)") def uws_websocket_open_handler(ws, user_data): @@ -307,6 +397,39 @@ def uws_websocket_factory_message_handler(ws, message, length, opcode, user_data ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") +def uws_websocket_message_handler_with_extension( + ws, message, length, opcode, user_data +): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_meth + + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + opcode = OpCode(opcode) + if opcode == OpCode.TEXT: + data = data.decode("utf-8") + + handler = handlers.message + if inspect.iscoroutinefunction(handler): + app.run_async(handler(ws, data, opcode)) + else: + handler(ws, data, opcode) + + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)") def uws_websocket_message_handler(ws, message, length, opcode, user_data): if user_data != ffi.NULL: @@ -372,6 +495,32 @@ def uws_websocket_factory_pong_handler(ws, message, length, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") +def uws_websocket_pong_handler_with_extension(ws, message, length, user_data): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_meth + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.pong + if inspect.iscoroutinefunction(handler): + app.run_async(handler(ws, data)) + else: + handler(ws, data) + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") def uws_websocket_pong_handler(ws, message, length, user_data): if user_data != ffi.NULL: @@ -433,6 +582,34 @@ def uws_websocket_factory_ping_handler(ws, message, length, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") +def uws_websocket_ping_handler_with_extension(ws, message, length, user_data): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_meth + + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.ping + if inspect.iscoroutinefunction(handler): + app.run_async(handler(ws, data)) + else: + handler(ws, data) + + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") def uws_websocket_ping_handler(ws, message, length, user_data): if user_data != ffi.NULL: @@ -506,6 +683,51 @@ def uws_websocket_factory_close_handler(ws, code, message, length, user_data): ) # just log in console the error to call attention +@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") +def uws_websocket_close_handler_with_extension(ws, code, message, length, user_data): + if user_data != ffi.NULL: + try: + handlers, app = ffi.from_handle(user_data) + # pass to free data on WebSocket if needed + ws = WebSocket(ws, app) + # bind methods to websocket + app._ws_extension.set_properties(ws) + # set default value in properties + app._ws_extension.bind_meth + + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.close + + if handler is None: + return + + if inspect.iscoroutinefunction(handler): + + async def wrapper(app, handler, ws, data, code, dispose): + try: + return await handler(ws, code, data) + finally: + key = ws.get_user_data_uuid() + if key is not None: + app._socket_refs.pop(key, None) + + app.run_async(wrapper(app, handler, ws, data, int(code))) + else: + handler(ws, int(code), data) + key = ws.get_user_data_uuid() + if key is not None: + app._socket_refs.pop(key, None) + + except Exception as err: + logging.error( + "Uncaught Exception: %s" % str(err) + ) # just log in console the error to call attention + + @ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") def uws_websocket_close_handler(ws, code, message, length, user_data): if user_data != ffi.NULL: @@ -1506,11 +1728,11 @@ class AppResponse: def cork_send( self, - message, - content_type: str | bytes = b"text/plain", - status: str | bytes | int = b"200 OK", + message: any, + content_type: Union[str, bytes] = b"text/plain", + status: Union[str, bytes, int] = b"200 OK", headers=None, - end_connection=False, + end_connection: bool = False, ): self.cork( lambda res: res.send(message, content_type, status, headers, end_connection) @@ -1520,10 +1742,10 @@ class AppResponse: def send( self, message: any, - content_type: str | bytes = b"text/plain", - status: str | bytes | int = b"200 OK", + content_type: Union[str, bytes] = b"text/plain", + status: Union[str, bytes, int] = b"200 OK", headers=None, - end_connection=False, + end_connection: bool = False, ): if self.aborted: return self @@ -2779,71 +3001,101 @@ class App: native_behavior.upgrade = uws_websocket_upgrade_handler_with_extension else: native_behavior.upgrade = uws_websocket_upgrade_handler + else: native_behavior.upgrade = ffi.NULL if open_handler: handlers.open = open_handler - native_behavior.open = ( - uws_websocket_factory_open_handler - if self._ws_factory - else uws_websocket_open_handler - ) + + if self._factory: + native_behavior.open = uws_websocket_factory_open_handler + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.open = uws_websocket_open_handler_with_extension + else: + native_behavior.open = uws_websocket_open_handler + else: native_behavior.open = ffi.NULL if message_handler: handlers.message = message_handler - native_behavior.message = ( - uws_websocket_factory_message_handler - if self._ws_factory - else uws_websocket_message_handler - ) + + if self._factory: + native_behavior.message = uws_websocket_factory_message_handler + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.message = uws_websocket_message_handler_with_extension + else: + native_behavior.message = uws_websocket_message_handler + else: native_behavior.message = ffi.NULL if drain_handler: handlers.drain = drain_handler - native_behavior.drain = ( - uws_websocket_factory_drain_handler - if self._ws_factory - else uws_websocket_drain_handler - ) - else: + + if self._factory: + native_behavior.drain = uws_websocket_factory_drain_handler + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.drain = uws_websocket_drain_handler_with_extension + else: + native_behavior.drain = uws_websocket_drain_handler + native_behavior.drain = ffi.NULL if ping_handler: handlers.ping = ping_handler - native_behavior.ping = ( - uws_websocket_factory_ping_handler - if self._ws_factory - else uws_websocket_ping_handler - ) + + if self._factory: + native_behavior.ping = uws_websocket_factory_ping_handler + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.ping = uws_websocket_ping_handler_with_extension + else: + native_behavior.ping = uws_websocket_ping_handler + else: native_behavior.ping = ffi.NULL if pong_handler: handlers.pong = pong_handler - native_behavior.pong = ( - uws_websocket_factory_pong_handler - if self._ws_factory - else uws_websocket_pong_handler - ) + + if self._factory: + native_behavior.pong = uws_websocket_factory_pong_handler + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.pong = uws_websocket_pong_handler_with_extension + else: + native_behavior.pong = uws_websocket_pong_handler + else: native_behavior.pong = ffi.NULL if close_handler: handlers.close = close_handler - native_behavior.close = ( - uws_websocket_factory_close_handler - if self._ws_factory - else uws_websocket_close_handler - ) + + if self._factory: + native_behavior.close = uws_websocket_factory_close_handler + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.close = uws_websocket_close_handler_with_extension + else: + native_behavior.close = uws_websocket_close_handler + else: # always keep an close native_behavior.close = uws_websocket_close_handler if subscription_handler: handlers.subscription = subscription_handler + + if self._factory: + native_behavior.subscription = ( + uws_websocket_factory_subscription_handler + ) + elif self._ws_extension and not self._ws_extension.empty: + native_behavior.subscription = ( + uws_websocket_subscription_handler_with_extension + ) + else: + native_behavior.subscription = uws_websocket_subscription_handler + native_behavior.subscription = ( uws_websocket_factory_subscription_handler if self._ws_factory diff --git a/src/socketify/ssgi.py b/src/socketify/ssgi.py index 3787d4b..2372269 100644 --- a/src/socketify/ssgi.py +++ b/src/socketify/ssgi.py @@ -187,8 +187,8 @@ class SSGIWebSocket: self._close_handler = on_close_handler class SSGI: - def __init__(self, app, options=None, request_response_factory_max_items=0, websocket_factory_max_itens=0): - self.server = App(options, request_response_factory_max_items, websocket_factory_max_itens) + def __init__(self, app, options=None, request_response_factory_max_items=0, websocket_factory_max_items=0): + self.server = App(options, request_response_factory_max_items, websocket_factory_max_items) self.SERVER_PORT = None self.SERVER_HOST = '' self.SERVER_SCHEME = 'https' if self.server.options else 'http' diff --git a/src/socketify/uWebSockets b/src/socketify/uWebSockets index a4fd720..9b13a6b 160000 --- a/src/socketify/uWebSockets +++ b/src/socketify/uWebSockets @@ -1 +1 @@ -Subproject commit a4fd720e0c373aa9e8c6703f9368239d1369606f +Subproject commit 9b13a6b02886c792189252e948290d2eea9aeda9 diff --git a/src/tests.py b/src/tests.py index 6eed059..2f8e12c 100644 --- a/src/tests.py +++ b/src/tests.py @@ -1,4 +1,4 @@ -from socketify import App +from socketify import App, CompressOptions, OpCode app = App() @@ -39,6 +39,31 @@ def home(res, req, data=None): res.send({"Hello": "World!"}, headers=(("X-Rate-Limit-Remaining", "10"), (b'Another-Headers', b'Value'))) +def ws_open(ws): + print("A WebSocket got connected!") + ws.send("Hello World!", OpCode.TEXT) + + +def ws_message(ws, message, opcode): + print(message, opcode) + # Ok is false if backpressure was built up, wait for drain + ok = ws.send(message, opcode) + + +app.ws( + "/*", + { + "compression": CompressOptions.SHARED_COMPRESSOR, + "max_payload_length": 16 * 1024 * 1024, + "idle_timeout": 12, + "open": ws_open, + "message": ws_message, + "drain": lambda ws: print( + "WebSocket backpressure: %s", ws.get_buffered_amount() + ), + "close": lambda ws, code, message: print("WebSocket closed"), + }, +) app.listen( 3000, lambda config: print("Listening on port http://localhost:%d now\n" % config.port),