diff --git a/README.md b/README.md index 2cc5fa7..5ff7676 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,8 @@ app.run() git clone https://github.com/cirospaciari/socketify.py.git cd ./socketify.py git submodule update --init --recursive --remote +#you can use make linux, make macos or call Make.bat from Visual Studio Development Prompt to build +cd ./src/socketify/native/ && make linux && cd ../../../ #install local pip pypy3 -m pip install . #install in editable mode diff --git a/src/socketify/__init__.py b/src/socketify/__init__.py index a1368c6..31b9daa 100644 --- a/src/socketify/__init__.py +++ b/src/socketify/__init__.py @@ -1,2 +1,2 @@ -from .socketify import App, AppOptions, AppListenOptions +from .socketify import App, AppOptions, AppListenOptions, OpCode, SendStatus from .helpers import sendfile \ No newline at end of file diff --git a/src/socketify/socketify.py b/src/socketify/socketify.py index 96839ba..a3db01e 100644 --- a/src/socketify/socketify.py +++ b/src/socketify/socketify.py @@ -1,5 +1,6 @@ import cffi from datetime import datetime +from enum import IntEnum from http import cookies import inspect import json @@ -135,12 +136,12 @@ typedef struct uws_req_s uws_req_t; typedef struct uws_res_s uws_res_t; typedef struct uws_socket_context_s uws_socket_context_t; typedef struct uws_websocket_s uws_websocket_t; -typedef void (*uws_websocket_handler)(uws_websocket_t *ws); -typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode); -typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length); -typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length); -typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context); +typedef void (*uws_websocket_handler)(uws_websocket_t *ws, void* user_data); +typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, void* user_data); +typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data); +typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data); +typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data); typedef struct { uws_compress_options_t compression; @@ -168,7 +169,7 @@ typedef struct { typedef void (*uws_listen_handler)(struct us_listen_socket_t *listen_socket, uws_app_listen_config_t config, void *user_data); typedef void (*uws_method_handler)(uws_res_t *response, uws_req_t *request, void *user_data); typedef void (*uws_filter_handler)(uws_res_t *response, int, void *user_data); -typedef void (*uws_missing_server_handler)(const char *hostname, void *user_data); +typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data); typedef void (*uws_get_headers_server_handler)(const char *header_name, size_t header_name_size, const char *header_value, size_t header_value_size, void *user_data); @@ -193,14 +194,13 @@ bool uws_constructor_failed(int ssl, uws_app_t *app); unsigned int uws_num_subscribers(int ssl, uws_app_t *app, const char *topic, size_t topic_length); bool uws_publish(int ssl, uws_app_t *app, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress); void *uws_get_native_handle(int ssl, uws_app_t *app); -void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern); -void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern); -void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, struct us_socket_context_options_t options); +void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); +void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); +void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options); void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data); void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); - void uws_res_end(int ssl, uws_res_t *res, const char *data, size_t length, bool close_connection); void uws_res_pause(int ssl, uws_res_t *res); void uws_res_resume(int ssl, uws_res_t *res); @@ -213,6 +213,7 @@ void uws_res_write_header_int(int ssl, uws_res_t *res, const char *key, size_t k void uws_res_end_without_body(int ssl, uws_res_t *res, bool close_connection); bool uws_res_write(int ssl, uws_res_t *res, const char *data, size_t length); uintmax_t uws_res_get_write_offset(int ssl, uws_res_t *res); +void *uws_res_get_native_handle(int ssl, uws_res_t *res); bool uws_res_has_responded(int ssl, uws_res_t *res); void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data); void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), void *opcional_data); @@ -233,6 +234,18 @@ size_t uws_req_get_parameter(uws_req_t *res, unsigned short index, const char ** size_t uws_req_get_full_url(uws_req_t *res, const char **dest); void uws_req_for_each_header(uws_req_t *res, uws_get_headers_server_handler handler, void *user_data); +void uws_ws(int ssl, uws_app_t *app, const char *pattern, uws_socket_behavior_t behavior, void* user_data); +void *uws_ws_get_user_data(int ssl, uws_websocket_t *ws); +void uws_ws_close(int ssl, uws_websocket_t *ws); +uws_sendstatus_t uws_ws_send(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode); +uws_sendstatus_t uws_ws_send_with_options(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress, bool fin); +uws_sendstatus_t uws_ws_send_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); +uws_sendstatus_t uws_ws_send_first_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); +uws_sendstatus_t uws_ws_send_first_fragment_with_opcode(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress); +uws_sendstatus_t uws_ws_send_last_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress); +void uws_ws_end(int ssl, uws_websocket_t *ws, int code, const char *message, size_t length); +void uws_ws_cork(int ssl, uws_websocket_t *ws, void (*handler)(void *user_data), void *user_data); + """) library_extension = "dll" if platform.system().lower() == "windows" else "so" @@ -241,6 +254,181 @@ library_path = os.path.join(os.path.dirname(__file__), "libsocketify_%s_%s.%s" % lib = ffi.dlopen(library_path) + +@ffi.callback("void(const char *, size_t, void *)") +def uws_missing_server_name(hostname, hostname_length, user_data): + if not user_data == ffi.NULL: + app = ffi.from_handle(user_data) + try: + if hostname == ffi.NULL: + data = None + else: + data = ffi.unpack(hostname, hostname_length).decode('utf-8') + + handler = app._missing_server_handler + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(data)) + else: + handler(data) + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + +@ffi.callback("void(uws_websocket_t *, void *)") +def uws_websocket_drain_handler(ws, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + try: + handler = handlers.drain + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(ws)) + else: + handler(ws) + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + +@ffi.callback("void(uws_websocket_t *, void *)") +def uws_websocket_open_handler(ws, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + try: + handler = handlers.open + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(ws)) + else: + handler(ws) + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + +@ffi.callback("void(uws_websocket_t*, const char *,size_t, uws_opcode_t, void*)") +def uws_websocket_message_handler(ws, message, length, opcode, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + opcode = OpCode(opcode) + if opcode == OpCode.TEXT: + data = data.decode('utf-8') + + handler = handlers.message + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(ws, data, opcode)) + else: + handler(ws, data, opcode) + + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + +@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") +def uws_websocket_pong_handler(ws, message, length, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.pong + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(ws, data)) + else: + handler(ws, data) + + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + +@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)") +def uws_websocket_ping_handler(ws, message,length, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + ws = WebSocket(ws, app.SSL, app.loop) + + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.ping + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(ws, data)) + else: + handler(ws, data) + + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + + +@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)") +def uws_websocket_close_handler(ws, code, message, length, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + #pass to free data on WebSocket if needed + ws = WebSocket(ws, app.SSL, app.loop, True) + + try: + if message == ffi.NULL: + data = None + else: + data = ffi.unpack(message, length) + + handler = handlers.close + + if handler is None: + return + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(ws, int(code), data)) + else: + handler(ws, int(code), data) + + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + + +@ffi.callback("void(uws_res_t*, uws_req_t*, uws_socket_context_t*, void*)") +def uws_websocket_upgrade_handler(res, req, context, user_data): + if not user_data == ffi.NULL: + (handlers, app) = ffi.from_handle(user_data) + response = AppResponse(res, app.loop, app.SSL) + request = AppRequest(req) + try: + handler = handlers.upgrade + if inspect.iscoroutinefunction(handler): + response.grab_aborted_handler() + response.run_async(handler(response, request, context)) + else: + handler(response, request, context) + + except Exception as err: + print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention + + +@ffi.callback("void(const char *topic, size_t length, void *user_data)") +def uws_req_for_each_topic_handler(topic, topic_size, user_data): + if not user_data == ffi.NULL: + ws = ffi.from_handle(user_data) + try: + header_name = ffi.unpack(topic, topic_size).decode("utf-8") + ws.trigger_for_each_topic_handler(header_name, header_value) + except Exception: #invalid utf-8 + return + @ffi.callback("void(const char *, size_t, const char *, size_t, void *)") def uws_req_for_each_header_handler(header_name, header_name_size, header_value, header_value_size, user_data): if not user_data == ffi.NULL: @@ -259,7 +447,7 @@ def uws_req_for_each_header_handler(header_name, header_name_size, header_value, def uws_generic_method_handler(res, req, user_data): if not user_data == ffi.NULL: (handler, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, False) + response = AppResponse(res, app.loop, app.SSL) request = AppRequest(req) try: if inspect.iscoroutinefunction(handler): @@ -271,21 +459,6 @@ def uws_generic_method_handler(res, req, user_data): response.grab_aborted_handler() app.trigger_error(err, response, request) -@ffi.callback("void(uws_res_t *, uws_req_t *, void *)") -def uws_generic_ssl_method_handler(res, req, user_data): - if not user_data == ffi.NULL: - (handler, app) = ffi.from_handle(user_data) - response = AppResponse(res, app.loop, True) - request = AppRequest(req) - try: - if inspect.iscoroutinefunction(handler): - response.grab_aborted_handler() - response.run_async(handler(response, request)) - else: - handler(response, request) - except Exception as err: - response.grab_aborted_handler() - app.trigger_error(err, response, request) @ffi.callback("void(struct us_listen_socket_t *, uws_app_listen_config_t, void *)") def uws_generic_listen_handler(listen_socket, config, user_data): @@ -337,9 +510,303 @@ def uws_generic_cork_handler(res, user_data): response._cork_handler(response) except Exception as err: print("Error on cork handler %s" % str(err)) - # response.grab_aborted_handler() - # app.trigger_error(err, response, request) + +@ffi.callback("void(void*)") +def uws_ws_cork_handler(user_data): + if not user_data == ffi.NULL: + ws = ffi.from_handle(user_data) + try: + if inspect.iscoroutinefunction(ws._cork_handler): + raise RuntimeError("Calls inside cork must be sync") + ws._cork_handler(ws) + except Exception as err: + print("Error on cork handler %s" % str(err)) + + + +class OpCode(IntEnum): + CONTINUATION = 0 + TEXT = 1 + BINARY = 2 + CLOSE = 8 + PING = 9 + PONG = 10 + +class SendStatus(IntEnum): + BACKPRESSURE = 0 + SUCCESS = 1 + DROPPED = 2 + +#dict to keep socket data alive until closed if needed +SocketRefs = {} + +class WebSocket: + def __init__(self, websocket, ssl, loop, free_socket_data=False): + self.ws = websocket + self.SSL = ssl + self._ptr = ffi.new_handle(self) + self.loop = loop + self._cork_handler = None + self._for_each_topic_handler = None + self.free_socket_data = free_socket_data + self.socket_data_id = None + self.socket_data = None + self.got_socket_data = False + + + def trigger_for_each_topic_handler(self, topic): + if hasattr(self, "_for_each_topic_handler") and hasattr(self._for_each_topic_handler, '__call__'): + try: + if inspect.iscoroutinefunction(self._for_each_topic_handler): + raise RuntimeError("WebSocket.for_each_topic_handler must be synchronous") + self._for_each_topic_handler(topic) + except Exception as err: + print("Error on for each topic handler %s" % str(err)) + + #uuid for socket data, used to free data after socket closes + def get_user_data_uuid(self): + if self.got_socket_data: + return self.socket_data_id + user_data = lib.uws_ws_get_user_data(self.SSL, self._ptr) + if user_data == ffi.NULL: + return None + (data, socket_data_id) = ffi.from_handle(user_data) + self.socket_data_id = socket_data_id + self.socket_data = data + self.got_socket_data = True + return socket_data_id + + def get_user_data(self): + if self.got_socket_data: + return self.socket_data + user_data = lib.uws_ws_get_user_data(self.SSL, self._ptr) + if user_data == ffi.NULL: + return None + (data, socket_data_id) = ffi.from_handle(user_data) + self.socket_data_id = socket_data_id + self.socket_data = data + self.got_socket_data = True + return data + + def get_buffered_amount(self): + return int(lib.uws_ws_get_buffered_amount(self.SSL, self._ptr)) + + def subscribe(self, topic): + try: + if isinstance(topic, str): + data = topic.encode("utf-8") + elif isinstance(topic, bytes): + data = topic + else: + return False + + return bool(lib.uws_ws_subscribe(self.SSL, self.ws, data, len(data))) + except: + return False + + def unsubscribe(self, topic): + try: + if isinstance(topic, str): + data = topic.encode("utf-8") + elif isinstance(topic, bytes): + data = topic + else: + return False + + return bool(lib.uws_ws_unsubscribe(self.SSL, self.ws, data, len(data))) + except: + return False + + def is_subscribed(self, topic): + try: + if isinstance(topic, str): + data = topic.encode("utf-8") + elif isinstance(topic, bytes): + data = topic + else: + return False + + return bool(lib.uws_ws_is_subscribed(self.SSL, self.ws, data, len(data))) + except: + return False + + def publish(self, topic, message, opcode=OpCode.BINARY, compress=False): + try: + if isinstance(topic, str): + topic_data = topic.encode("utf-8") + elif isinstance(topic, bytes): + topic_data = topic + else: + return False + + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + data = b'' + else: + data = json.dumps(message).encode("utf-8") + + return bool(lib.uws_ws_publish_with_options(self.SSL, self.ws, topic_data, len(topic_data),data, len(data), int(opcode), bool(compress))) + except: + return False + + def get_topics(self): + topics = [] + def copy_topics(topic): + topics.append(value) + + self.for_each_topic(copy_topics) + return topics + + def for_each_topic(self, handler): + self._for_each_topic_handler = handler + lib.uws_ws_iterate_topics(self.SSL, self.ws, uws_req_for_each_topic_handler, self._ptr) + + def get_remote_address_bytes(self): + buffer = ffi.new("char**") + length = lib.uws_ws_get_remote_address(self.SSL, self.ws, buffer) + buffer_address = ffi.addressof(buffer, 0)[0] + if buffer_address == ffi.NULL: + return None + try: + return ffi.unpack(buffer_address, length) + except Exception: #invalid + return None + + def get_remote_address(self): + buffer = ffi.new("char**") + length = lib.uws_ws_get_remote_address_as_text(self.SSL, self.ws, buffer) + buffer_address = ffi.addressof(buffer, 0)[0] + if buffer_address == ffi.NULL: + return None + try: + return ffi.unpack(buffer_address, length).decode("utf-8") + except Exception: #invalid utf-8 + return None + + def send_fragment(self, message, compress=False): + try: + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + lib.uws_ws_send_fragment(self.SSL, self.ws, b'', 0, compress) + return self + else: + data = json.dumps(message).encode("utf-8") + + return SendStatus(lib.uws_ws_send_fragment(self.SSL, self.ws, data, len(data),compress)) + except: + return None + + def send_last_fragment(self, message, compress=False): + try: + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + lib.uws_ws_send_last_fragment(self.SSL, self.ws, b'', 0, compress) + return self + else: + data = json.dumps(message).encode("utf-8") + + return SendStatus(lib.uws_ws_send_last_fragment(self.SSL, self.ws, data, len(data),compress)) + except: + return None + + def send_first_fragment(self, message, opcode=OpCode.BINARY, compress=False): + try: + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + lib.uws_ws_send_first_fragment_with_opcode(self.SSL, self.ws, b'', 0, int(opcode), compress) + return self + else: + data = json.dumps(message).encode("utf-8") + + return SendStatus(lib.uws_ws_send_first_fragment_with_opcode(self.SSL, self.ws, data, len(data), int(opcode), compress)) + except: + return None + + def cork_send(self, message, opcode=OpCode.BINARY, compress=False, fin=True): + self.cork(lambda ws: ws.send(message, opcode, compress, fin)) + return self + + def send(self, message, opcode=OpCode.BINARY, compress=False, fin=True): + try: + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + lib.uws_ws_send_with_options(self.SSL, self.ws, b'', 0, int(opcode), compress, fin) + return self + else: + data = json.dumps(message).encode("utf-8") + + return SendStatus(lib.uws_ws_send_with_options(self.SSL, self.ws, data, len(data), int(opcode), compress, fin)) + except: + return None + + def cork_end(self, code=0, message=None): + self.cork(lambda ws: ws.end(message, code, message)) + return self + + def end(self, code=0, message=None): + try: + if not isinstance(code, int): + raise RuntimeError("code must be an int") + if isinstance(message, str): + data = message.encode("utf-8") + elif isinstance(message, bytes): + data = message + elif message == None: + lib.uws_ws_end(self.SSL, self.ws, b'', 0) + return self + else: + data = json.dumps(message).encode("utf-8") + + lib.uws_ws_end(self.SSL, self.ws, code, data, len(data)) + finally: + return self + def close(self): + lib.uws_ws_close(self.SSL, self._ptr) + return self + + def cork(self, callback): + self._cork_handler = callback + if is_python: #call is enqueued to garantee corking works properly in python3 + self.loop.enqueue(lambda instance: lib.uws_ws_cork(instance.SSL, instance.ws, uws_ws_cork_handler, instance._ptr), self) + else: #just add to uvloop in next tick to garantee corking works properly in pypy3 + self.loop.set_timeout(0, lambda instance: lib.uws_ws_cork(instance.SSL, instance.ws, uws_ws_cork_handler, instance._ptr), self) + + def __del__(self): + #free SocketRefs when if needed + if self.free_socket_data: + key = self.get_user_data_uuid() + if not key is None: + SocketRefs.pop(key, None) + + self.ws = ffi.NULL + self._ptr = ffi.NULL + +class WSBehaviorHandlers: + def __init__(self): + self.upgrade = None + self.open = None + self.message = None + self.drain = None + self.ping = None + self.pong = None + self.close = None + class AppRequest: def __init__(self, request): self.req = request @@ -477,9 +944,9 @@ class AppRequest: self._ptr = ffi.NULL class AppResponse: - def __init__(self, response, loop, is_ssl): + def __init__(self, response, loop, ssl): self.res = response - self.SSL = ffi.cast("int", 1 if is_ssl else 0) + self.SSL = ssl self.aborted = False self.loop = loop self._aborted_handler = None @@ -804,6 +1271,42 @@ class AppResponse: lib.uws_res_on_data(self.SSL, self.res, uws_generic_on_data_handler, self._ptr) return self + def upgrade(self, sec_web_socket_key, sec_web_socket_protocol, sec_web_socket_extensions, socket_context, user_data=None): + if self.aborted: + return False + + + if isinstance(sec_web_socket_key, str): + sec_web_socket_key_data = sec_web_socket_key.encode('utf-8') + elif isinstance(sec_web_socket_key, bytes): + sec_web_socket_key_data = sec_web_socket_key + else: + raise RuntimeError("sec_web_socket_key need to be an String or Bytes") + + if isinstance(sec_web_socket_protocol, str): + sec_web_socket_protocol_data = sec_web_socket_protocol.encode('utf-8') + elif isinstance(sec_web_socket_protocol, bytes): + sec_web_socket_protocol_data = sec_web_socket_protocol + else: + raise RuntimeError("sec_web_socket_protocol need to be an String or Bytes") + + if isinstance(sec_web_socket_extensions, str): + sec_web_socket_extensions_data = sec_web_socket_extensions.encode('utf-8') + elif isinstance(sec_web_socket_extensions, bytes): + sec_web_socket_extensions_data = sec_web_socket_extensions + else: + raise RuntimeError("sec_web_socket_protocol need to be an String or Bytes") + + user_data_ptr = ffi.NULL + if not user_data is None: + _id = uuid.uuid4() + user_data_ptr = (ffi.new_handle(user_data), _id) + #keep alive data + SocketRefs[_id] = user_data_ptr + + lib.uws_res_upgrade(self.SSL, self.res, user_data_ptr, sec_web_socket_key_data, len(sec_web_socket_key_data),sec_web_socket_protocol_data, len(sec_web_socket_protocol_data),sec_web_socket_extensions_data, len(sec_web_socket_extensions_data), socket_context) + + def on_writable(self, handler): if not self.aborted: if hasattr(handler, '__call__'): @@ -812,13 +1315,12 @@ class AppResponse: lib.uws_res_on_writable(self.SSL, self.res, uws_generic_on_writable_handler, self._ptr) return self + def get_native_handle(self): + return lib.uws_res_get_native_handle(self.SSL, self.res) + def __del__(self): self.res = ffi.NULL self._ptr = ffi.NULL -# void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data); -# void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws); -# void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data); - class App: def __init__(self, options=None): @@ -853,66 +1355,251 @@ class App: self.handlers = [] self.error_handler = None + self._missing_server_handler = None - def static(self, route, directory): + + + def static(self, route, dire1ctory): static_route(self, route, directory) return self def get(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_get(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_get(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def post(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_post(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_post(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def options(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_options(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_options(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def delete(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_delete(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_delete(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def patch(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_patch(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_patch(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def put(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_put(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_put(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def head(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_head(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_head(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def connect(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_connect(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_connect(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def trace(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_trace(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_trace(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self def any(self, path, handler): user_data = ffi.new_handle((handler, self)) self.handlers.append(user_data) #Keep alive handler - lib.uws_app_any(self.SSL, self.app, path.encode("utf-8"), uws_generic_ssl_method_handler if self.is_ssl else uws_generic_method_handler, user_data) + lib.uws_app_any(self.SSL, self.app, path.encode("utf-8"), uws_generic_method_handler, user_data) return self - def listen(self, port_or_options, handler=None): + def get_native_handle(self): + return lib.uws_get_native_handle(self.SSL, self.app) + + def num_subscribers(self, topic): + if isinstance(topic, str): + topic_data = topic.encode('utf-8') + elif isinstance(topic, bytes): + topic_data = topic + else: + raise RuntimeError("topic need to be an String or Bytes") + return int(lib.uws_num_subscribers(self.SSL, self.app, topic_data, len(topic_data))) + + def publish(self, topic, message, opcode=OpCode.BINARY, compress=False): + + if isinstance(topic, str): + topic_data = topic.encode('utf-8') + elif isinstance(topic, bytes): + topic_data = topic + else: + raise RuntimeError("topic need to be an String or Bytes") + + if isinstance(message, str): + message_data = message.encode('utf-8') + elif isinstance(message, bytes): + message_data = message + elif message == None: + data = b'' + else: + data = json.dumps(message).encode("utf-8") + + return bool(lib.uws_publish(self.SSL, self.app, topic_data, len(topic_data), message_data, len(message_data), int(opcode), bool(compress))) + + def remove_server_name(self, hostname): + if isinstance(hostname, str): + hostname_data = hostname.encode('utf-8') + elif isinstance(hostname, bytes): + hostname_data = hostname + else: + raise RuntimeError("hostname need to be an String or Bytes") + + lib.uws_remove_server_name(self.SSL, self.app, hostname_data, len(hostname_data)) + return self + + def add_server_name(self, hostname, options=None): + if isinstance(hostname, str): + hostname_data = hostname.encode('utf-8') + elif isinstance(hostname, bytes): + hostname_data = hostname + else: + raise RuntimeError("hostname need to be an String or Bytes") + + if options is None: + lib.uws_add_server_name(self.SSL, self.app, hostname_data, len(hostname_data)) + else: + socket_options_ptr = ffi.new("struct us_socket_context_options_t *") + socket_options = socket_options_ptr[0] + socket_options.key_file_name = ffi.NULL if options.key_file_name == None else ffi.new("char[]", options.key_file_name.encode("utf-8")) + socket_options.key_file_name = ffi.NULL if options.key_file_name == None else ffi.new("char[]", options.key_file_name.encode("utf-8")) + socket_options.cert_file_name = ffi.NULL if options.cert_file_name == None else ffi.new("char[]", options.cert_file_name.encode("utf-8")) + socket_options.passphrase = ffi.NULL if options.passphrase == None else ffi.new("char[]", options.passphrase.encode("utf-8")) + socket_options.dh_params_file_name = ffi.NULL if options.dh_params_file_name == None else ffi.new("char[]", options.dh_params_file_name.encode("utf-8")) + socket_options.ca_file_name = ffi.NULL if options.ca_file_name == None else ffi.new("char[]", options.ca_file_name.encode("utf-8")) + socket_options.ssl_ciphers = ffi.NULL if options.ssl_ciphers == None else ffi.new("char[]", options.ssl_ciphers.encode("utf-8")) + socket_options.ssl_prefer_low_memory_usage = ffi.cast("int", options.ssl_prefer_low_memory_usage) + lib.uws_add_server_name_with_options(self.SSL, self.app, hostname_data, len(hostname_data), socket_options) + return self + + def missing_server_name(self, handler): + self._missing_server_handler = handler + lib.uws_missing_server_name(self.SSL, self.app, uws_missing_server_name, self._ptr) + + def ws(self, path, behavior): + native_options = ffi.new("uws_socket_behavior_t *") + native_behavior = native_options[0] + + max_payload_length = None + idle_timeout = None + max_backpressure = None + close_on_backpressure_limit = None + reset_idle_timeout_on_send = None + send_pings_automatically = None + max_lifetime = None + compression = None + upgrade_handler = None + open_handler = None + message_handler = None + drain_handler = None + ping_handler = None + pong_handler = None + close_handler = None + + if behavior is None: + raise RuntimeError("behavior must be an dict or WSBehavior") + elif isinstance(behavior, dict): + max_payload_length = behavior.get("max_payload_length", 16 * 1024) + idle_timeout = behavior.get("idle_timeout", 60 * 2) + max_backpressure = behavior.get("max_backpressure", 64 * 1024) + close_on_backpressure_limit = behavior.get("close_on_backpressure_limit", False) + reset_idle_timeout_on_send = behavior.get("reset_idle_timeout_on_send", False) + send_pings_automatically = behavior.get("send_pings_automatically", False) + max_lifetime = behavior.get("max_lifetime", 0) + compression = behavior.get("compression", 0) + upgrade_handler = behavior.get("upgrade", None) + open_handler = behavior.get("open", None) + message_handler = behavior.get("message", None) + drain_handler = behavior.get("drain", None) + ping_handler = behavior.get("ping", None) + pong_handler = behavior.get("pong", None) + close_handler = behavior.get("close", None) + + + + native_behavior.maxPayloadLength = ffi.cast("unsigned int", max_payload_length if isinstance(max_payload_length, int) else 16 * 1024) + native_behavior.idleTimeout = ffi.cast("unsigned short", idle_timeout if isinstance(idle_timeout, int) else 16 * 1024) + native_behavior.maxBackpressure = ffi.cast("unsigned int", max_backpressure if isinstance(max_backpressure, int) else 64 * 1024) + native_behavior.compression = ffi.cast("uws_compress_options_t", compression if isinstance(compression, int) else 0) + native_behavior.maxLifetime = ffi.cast("unsigned short", max_lifetime if isinstance(max_lifetime, int) else 0) + native_behavior.closeOnBackpressureLimit = ffi.cast("int", 1 if close_on_backpressure_limit else 0) + native_behavior.resetIdleTimeoutOnSend = ffi.cast("int", 1 if reset_idle_timeout_on_send else 0) + native_behavior.sendPingsAutomatically = ffi.cast("int", 1 if send_pings_automatically else 0) + + handlers = WSBehaviorHandlers() + if upgrade_handler: + handlers.upgrade = upgrade_handler + native_behavior.upgrade = uws_websocket_upgrade_handler + else: + native_behavior.upgrade = ffi.NULL + + if open_handler: + handlers.open = open_handler + native_behavior.open = uws_websocket_open_handler + else: + native_behavior.open = ffi.NULL + + if message_handler: + handlers.message = message_handler + native_behavior.message = uws_websocket_message_handler + else: + native_behavior.message = ffi.NULL + + if drain_handler: + handlers.drain = drain_handler + native_behavior.drain = uws_websocket_drain_handler + else: + native_behavior.drain = ffi.NULL + + if ping_handler: + handlers.ping = ping_handler + native_behavior.ping = uws_websocket_ping_handler + else: + native_behavior.ping = ffi.NULL + + if pong_handler: + handlers.pong = pong_handler + native_behavior.pong = uws_websocket_pong_handler + else: + native_behavior.pong = ffi.NULL + + if close_handler: + handlers.close = close_handler + native_behavior.close = uws_websocket_close_handler + else: #always keep an close + native_behavior.close = uws_websocket_close_handler + + user_data = ffi.new_handle((handlers, self)) + self.handlers.append(user_data) #Keep alive handlers + lib.uws_ws(self.SSL, self.app, path.encode("utf-8"), native_behavior, user_data) + return self + + def listen(self, port_or_options=None, handler=None): self._listen_handler = handler - if isinstance(port_or_options, int): + if port_or_options is None: + lib.uws_app_listen(self.SSL, self.app, ffi.cast("int", 0), uws_generic_listen_handler, self._ptr) + elif isinstance(port_or_options, int): lib.uws_app_listen(self.SSL, self.app, ffi.cast("int", port_or_options), uws_generic_listen_handler, self._ptr) + elif isinstance(port_or_options, dict): + native_options = ffi.new("uws_app_listen_config_t *") + options = native_options[0] + port = port_or_options.get("port", 0) + options = port_or_options.get("options", 0) + host = port_or_options.get("host", "0.0.0.0") + options.port = ffi.cast("int", port, 0) if isinstance(port, int) else ffi.cast("int", 0) + options.host = ffi.new("char[]", host.encode("utf-8")) if isinstance(host, str) else ffi.NULL + options.options = ffi.cast("int", port) if isinstance(options, int) else ffi.cast("int", 0) + self.native_options_listen = native_options #Keep alive native_options + lib.uws_app_listen_with_config(self.SSL, self.app, options, uws_generic_listen_handler, self._ptr) else: native_options = ffi.new("uws_app_listen_config_t *") options = native_options[0] @@ -980,7 +1667,7 @@ class AppListenOptions: self.port = port self.host = host self.options = options - + class AppOptions: def __init__(self, key_file_name=None, cert_file_name=None, passphrase=None, dh_params_file_name=None, ca_file_name=None, ssl_ciphers=None, ssl_prefer_low_memory_usage=0): if key_file_name != None and not isinstance(key_file_name, str): raise RuntimeError("key_file_name must be an String or None") diff --git a/src/socketify/uWebSockets b/src/socketify/uWebSockets index 6da3fd5..4536669 160000 --- a/src/socketify/uWebSockets +++ b/src/socketify/uWebSockets @@ -1 +1 @@ -Subproject commit 6da3fd5b0260d56664d85d62cc4e3dfeaa528e7d +Subproject commit 4536669a85a8cd4ba72e489f33f44a57ff825cfd diff --git a/src/tests.py b/src/tests.py index 99c2697..8baf79c 100644 --- a/src/tests.py +++ b/src/tests.py @@ -3,73 +3,23 @@ # import os.path -# def in_directory(file, directory): -# #make both absolute -# directory = os.path.join(os.path.realpath(directory), '') -# file = os.path.realpath(file) + # DLL_EXPORT typedef void (*uws_listen_domain_handler)(struct us_listen_socket_t *listen_socket, const char* domain, size_t domain_length, int options, void *user_data); + # DLL_EXPORT typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data); -# #return true, if the common prefix of both is equal to directory -# #e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b -# return os.path.commonprefix([file, directory]) == directory +# DLL_EXPORT void uws_app_listen_domain(int ssl, uws_app_t *app, const char *domain,size_t server_name_length, uws_listen_domain_handler handler, void *user_data); + # DLL_EXPORT void uws_app_listen_domain_with_options(int ssl, uws_app_t *app, const char *domain,size_t server_name_length, int options, uws_listen_domain_handler handler, void *user_data); + # DLL_EXPORT void uws_app_domain(int ssl, uws_app_t *app, const char* server_name, size_t server_name_length); -# application/x-www-form-urlencoded -# application/x-www-form-urlencoded -# multipart/form-data + # DLL_EXPORT void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); + # DLL_EXPORT void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length); + # DLL_EXPORT void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options); + # DLL_EXPORT void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data); + # DLL_EXPORT void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); -# void uws_res_prepare_for_sendfile(int ssl, uws_res_t *res) { -# if (ssl) { -# uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; -# auto pair = uwsRes->getSendBuffer(2); -# char *ptr = pair.first; -# ptr[0] = '\r'; -# ptr[1] = '\n'; -# uwsRes->uncork(); -# } else { -# uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; -# auto pair = uwsRes->getSendBuffer(2); -# char *ptr = pair.first; -# ptr[0] = '\r'; -# ptr[1] = '\n'; -# uwsRes->uncork(); -# } -# } - -# int uws_res_state(int ssl, uws_res_t *res) { -# if (ssl) { -# uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; -# return uwsRes->getHttpResponseData()->state; -# } else { -# uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; -# return uwsRes->getHttpResponseData()->state; -# } -# } - -#uws_res_get_native_handle - - -# void *uws_res_get_native_handle(int ssl, uws_res_t *res) { -# if (ssl) { -# uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; -# return uwsRes->getNativeHandle(); -# } else { -# uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; -# return uwsRes->getNativeHandle(); -# } -# } -# unsigned int uws_num_subscribers(int ssl, uws_app_t *app, const char *topic); -# bool uws_publish(int ssl, uws_app_t *app, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress); -# void *uws_get_native_handle(int ssl, uws_app_t *app); -# void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern); -# void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern); -# void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, struct us_socket_context_options_t options); -# void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data); -# void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data); - -# https://github.com/uNetworking/uWebSockets.js/blob/master/examples/VideoStreamer.js # from socketify import App -from socketify import App, AppOptions +from socketify import App, AppOptions, OpCode # import os import multiprocessing @@ -78,32 +28,47 @@ import time import mimetypes mimetypes.init() -#need to fix get_data using sel._data etc + async def home(res, req): - print("full", req.get_full_url()) - print("normal", req.get_url()) - print("method", req.get_method()) + res.end("Home") - res.end("Test") +def ws_open(ws): + print("Upgrated!") + print(ws.send("Xablau!", OpCode.TEXT)) -def run_app(): - app = App() - app.get("/", home) - app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port))) - app.run() +def ws_message(ws, message, opcode): + print(message, opcode) -def create_fork(): - n = os.fork() - # n greater than 0 means parent process - if not n > 0: - run_app() +async def ws_upgrade(res, req, socket_context): + key = req.get_header("sec-websocket-key") + protocol = req.get_header("sec-websocket-protocol") + extensions = req.get_header("sec-websocket-extensions") + await asyncio.sleep(2) + print("request upgrade!") + res.upgrade(key, protocol, extensions, socket_context) + +app = App() +app.ws("/*", { + 'open': ws_open, + 'message': ws_message, + 'upgrade': ws_upgrade +}) +app.get("/", home) +app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port))) +app.run() + +# def create_fork(): +# n = os.fork() +# # n greater than 0 means parent process +# if not n > 0: +# run_app() #openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -passout pass:1234 -keyout ./misc/key.pem -out ./misc/cert.pem # # fork limiting the cpu count - 1 # for i in range(1, multiprocessing.cpu_count()): # create_fork() -run_app() # run app on the main process too :) +# run_app() # run app on the main process too :) # from datetime import datetime # raw = "_ga=GA1.1.1871393672.1649875681; affclick=null; __udf_j=d31b9af0d332fec181c1a893320322c0cb33ce95d7bdbd21a4cc4ee66d6d8c23817686b4ba59dd0e015cb95e8196157c"