pull/84/head v0.0.4
Ciro 2023-01-08 16:29:26 -03:00
rodzic 4f585bd84b
commit 981f65c8d7
5 zmienionych plików z 320 dodań i 43 usunięć

Wyświetl plik

@ -1,7 +1,7 @@
from socketify import App, AppOptions, OpCode, CompressOptions
remaining_clients = 16
app = App(websocket_factory_max_itens=1_500_000)
app = App(websocket_factory_max_items=1_500_000)
def ws_open(ws):

Wyświetl plik

@ -18,6 +18,7 @@ from .status_codes import status_codes
from .helpers import static_route
from dataclasses import dataclass
from .helpers import DecoratorRouter
from typing import Union
mimetypes.init()
@ -76,6 +77,27 @@ def uws_websocket_factory_drain_handler(ws, user_data):
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_drain_handler_with_extension(ws, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_methods(ws)
handler = handlers.drain
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws))
else:
handler(ws)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_drain_handler(ws, user_data):
if user_data != ffi.NULL:
@ -216,6 +238,52 @@ def uws_websocket_subscription_handler(
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char *, size_t, int, int, void*)")
def uws_websocket_subscription_handler_with_extension(
ws,
topic_name,
topic_name_length,
new_number_of_subscriber,
old_number_of_subscriber,
user_data,
):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_meth
handler = handlers.subscription
if topic_name == ffi.NULL:
topic = None
else:
topic = ffi.unpack(topic_name, topic_name_length).decode("utf-8")
if inspect.iscoroutinefunction(handler):
app.run_async(
handler(
ws,
topic,
int(new_number_of_subscriber),
int(old_number_of_subscriber),
)
)
else:
handler(
ws,
topic,
int(new_number_of_subscriber),
int(old_number_of_subscriber),
)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_factory_open_handler(ws, user_data):
if user_data != ffi.NULL:
@ -248,6 +316,28 @@ def uws_websocket_factory_open_handler(ws, user_data):
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_open_handler_with_extension(ws, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_meth
handler = handlers.open
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws))
else:
handler(ws)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_open_handler(ws, user_data):
@ -307,6 +397,39 @@ def uws_websocket_factory_message_handler(ws, message, length, opcode, user_data
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)")
def uws_websocket_message_handler_with_extension(
ws, message, length, opcode, user_data
):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_meth
if message == ffi.NULL:
data = None
else:
data = ffi.unpack(message, length)
opcode = OpCode(opcode)
if opcode == OpCode.TEXT:
data = data.decode("utf-8")
handler = handlers.message
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws, data, opcode))
else:
handler(ws, data, opcode)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)")
def uws_websocket_message_handler(ws, message, length, opcode, user_data):
if user_data != ffi.NULL:
@ -372,6 +495,32 @@ def uws_websocket_factory_pong_handler(ws, message, length, user_data):
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)")
def uws_websocket_pong_handler_with_extension(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_meth
if message == ffi.NULL:
data = None
else:
data = ffi.unpack(message, length)
handler = handlers.pong
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws, data))
else:
handler(ws, data)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)")
def uws_websocket_pong_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
@ -433,6 +582,34 @@ def uws_websocket_factory_ping_handler(ws, message, length, user_data):
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)")
def uws_websocket_ping_handler_with_extension(ws, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_meth
if message == ffi.NULL:
data = None
else:
data = ffi.unpack(message, length)
handler = handlers.ping
if inspect.iscoroutinefunction(handler):
app.run_async(handler(ws, data))
else:
handler(ws, data)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)")
def uws_websocket_ping_handler(ws, message, length, user_data):
if user_data != ffi.NULL:
@ -506,6 +683,51 @@ def uws_websocket_factory_close_handler(ws, code, message, length, user_data):
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)")
def uws_websocket_close_handler_with_extension(ws, code, message, length, user_data):
if user_data != ffi.NULL:
try:
handlers, app = ffi.from_handle(user_data)
# pass to free data on WebSocket if needed
ws = WebSocket(ws, app)
# bind methods to websocket
app._ws_extension.set_properties(ws)
# set default value in properties
app._ws_extension.bind_meth
if message == ffi.NULL:
data = None
else:
data = ffi.unpack(message, length)
handler = handlers.close
if handler is None:
return
if inspect.iscoroutinefunction(handler):
async def wrapper(app, handler, ws, data, code, dispose):
try:
return await handler(ws, code, data)
finally:
key = ws.get_user_data_uuid()
if key is not None:
app._socket_refs.pop(key, None)
app.run_async(wrapper(app, handler, ws, data, int(code)))
else:
handler(ws, int(code), data)
key = ws.get_user_data_uuid()
if key is not None:
app._socket_refs.pop(key, None)
except Exception as err:
logging.error(
"Uncaught Exception: %s" % str(err)
) # just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)")
def uws_websocket_close_handler(ws, code, message, length, user_data):
if user_data != ffi.NULL:
@ -1506,11 +1728,11 @@ class AppResponse:
def cork_send(
self,
message,
content_type: str | bytes = b"text/plain",
status: str | bytes | int = b"200 OK",
message: any,
content_type: Union[str, bytes] = b"text/plain",
status: Union[str, bytes, int] = b"200 OK",
headers=None,
end_connection=False,
end_connection: bool = False,
):
self.cork(
lambda res: res.send(message, content_type, status, headers, end_connection)
@ -1520,10 +1742,10 @@ class AppResponse:
def send(
self,
message: any,
content_type: str | bytes = b"text/plain",
status: str | bytes | int = b"200 OK",
content_type: Union[str, bytes] = b"text/plain",
status: Union[str, bytes, int] = b"200 OK",
headers=None,
end_connection=False,
end_connection: bool = False,
):
if self.aborted:
return self
@ -2779,71 +3001,101 @@ class App:
native_behavior.upgrade = uws_websocket_upgrade_handler_with_extension
else:
native_behavior.upgrade = uws_websocket_upgrade_handler
else:
native_behavior.upgrade = ffi.NULL
if open_handler:
handlers.open = open_handler
native_behavior.open = (
uws_websocket_factory_open_handler
if self._ws_factory
else uws_websocket_open_handler
)
if self._factory:
native_behavior.open = uws_websocket_factory_open_handler
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.open = uws_websocket_open_handler_with_extension
else:
native_behavior.open = uws_websocket_open_handler
else:
native_behavior.open = ffi.NULL
if message_handler:
handlers.message = message_handler
native_behavior.message = (
uws_websocket_factory_message_handler
if self._ws_factory
else uws_websocket_message_handler
)
if self._factory:
native_behavior.message = uws_websocket_factory_message_handler
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.message = uws_websocket_message_handler_with_extension
else:
native_behavior.message = uws_websocket_message_handler
else:
native_behavior.message = ffi.NULL
if drain_handler:
handlers.drain = drain_handler
native_behavior.drain = (
uws_websocket_factory_drain_handler
if self._ws_factory
else uws_websocket_drain_handler
)
else:
if self._factory:
native_behavior.drain = uws_websocket_factory_drain_handler
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.drain = uws_websocket_drain_handler_with_extension
else:
native_behavior.drain = uws_websocket_drain_handler
native_behavior.drain = ffi.NULL
if ping_handler:
handlers.ping = ping_handler
native_behavior.ping = (
uws_websocket_factory_ping_handler
if self._ws_factory
else uws_websocket_ping_handler
)
if self._factory:
native_behavior.ping = uws_websocket_factory_ping_handler
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.ping = uws_websocket_ping_handler_with_extension
else:
native_behavior.ping = uws_websocket_ping_handler
else:
native_behavior.ping = ffi.NULL
if pong_handler:
handlers.pong = pong_handler
native_behavior.pong = (
uws_websocket_factory_pong_handler
if self._ws_factory
else uws_websocket_pong_handler
)
if self._factory:
native_behavior.pong = uws_websocket_factory_pong_handler
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.pong = uws_websocket_pong_handler_with_extension
else:
native_behavior.pong = uws_websocket_pong_handler
else:
native_behavior.pong = ffi.NULL
if close_handler:
handlers.close = close_handler
native_behavior.close = (
uws_websocket_factory_close_handler
if self._ws_factory
else uws_websocket_close_handler
)
if self._factory:
native_behavior.close = uws_websocket_factory_close_handler
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.close = uws_websocket_close_handler_with_extension
else:
native_behavior.close = uws_websocket_close_handler
else: # always keep an close
native_behavior.close = uws_websocket_close_handler
if subscription_handler:
handlers.subscription = subscription_handler
if self._factory:
native_behavior.subscription = (
uws_websocket_factory_subscription_handler
)
elif self._ws_extension and not self._ws_extension.empty:
native_behavior.subscription = (
uws_websocket_subscription_handler_with_extension
)
else:
native_behavior.subscription = uws_websocket_subscription_handler
native_behavior.subscription = (
uws_websocket_factory_subscription_handler
if self._ws_factory

Wyświetl plik

@ -187,8 +187,8 @@ class SSGIWebSocket:
self._close_handler = on_close_handler
class SSGI:
def __init__(self, app, options=None, request_response_factory_max_items=0, websocket_factory_max_itens=0):
self.server = App(options, request_response_factory_max_items, websocket_factory_max_itens)
def __init__(self, app, options=None, request_response_factory_max_items=0, websocket_factory_max_items=0):
self.server = App(options, request_response_factory_max_items, websocket_factory_max_items)
self.SERVER_PORT = None
self.SERVER_HOST = ''
self.SERVER_SCHEME = 'https' if self.server.options else 'http'

@ -1 +1 @@
Subproject commit a4fd720e0c373aa9e8c6703f9368239d1369606f
Subproject commit 9b13a6b02886c792189252e948290d2eea9aeda9

Wyświetl plik

@ -1,4 +1,4 @@
from socketify import App
from socketify import App, CompressOptions, OpCode
app = App()
@ -39,6 +39,31 @@ def home(res, req, data=None):
res.send({"Hello": "World!"}, headers=(("X-Rate-Limit-Remaining", "10"), (b'Another-Headers', b'Value')))
def ws_open(ws):
print("A WebSocket got connected!")
ws.send("Hello World!", OpCode.TEXT)
def ws_message(ws, message, opcode):
print(message, opcode)
# Ok is false if backpressure was built up, wait for drain
ok = ws.send(message, opcode)
app.ws(
"/*",
{
"compression": CompressOptions.SHARED_COMPRESSOR,
"max_payload_length": 16 * 1024 * 1024,
"idle_timeout": 12,
"open": ws_open,
"message": ws_message,
"drain": lambda ws: print(
"WebSocket backpressure: %s", ws.get_buffered_amount()
),
"close": lambda ws, code, message: print("WebSocket closed"),
},
)
app.listen(
3000,
lambda config: print("Listening on port http://localhost:%d now\n" % config.port),