add async upgrade

pull/39/head
Ciro 2022-11-07 14:35:06 -03:00
rodzic c12825506c
commit 3872860684
4 zmienionych plików z 125 dodań i 105 usunięć

Wyświetl plik

@ -0,0 +1,31 @@
from socketify import App, AppOptions, OpCode, CompressOptions
import asyncio
def ws_open(ws):
print('A WebSocket got connected!')
ws.send("Hello World!", OpCode.TEXT)
def ws_message(ws, message, opcode):
print(message, opcode)
#Ok is false if backpressure was built up, wait for drain
ok = ws.send(message, opcode)
async def ws_upgrade(res, req, socket_context):
key = req.get_header("sec-websocket-key")
protocol = req.get_header("sec-websocket-protocol")
extensions = req.get_header("sec-websocket-extensions")
await asyncio.sleep(2)
res.upgrade(key, protocol, extensions, socket_context)
app = App()
app.ws("/*", {
'compression': CompressOptions.SHARED_COMPRESSOR,
'max_payload_length': 16 * 1024 * 1024,
'idle_timeout': 12,
'open': ws_open,
'message': ws_message,
'upgrade': ws_upgrade
})
app.any("/", lambda res,req: res.end("Nothing to see here!'"))
app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port)))
app.run()

Wyświetl plik

@ -1,2 +1,2 @@
from .socketify import App, AppOptions, AppListenOptions, OpCode, SendStatus
from .socketify import App, AppOptions, AppListenOptions, OpCode, SendStatus, CompressOptions
from .helpers import sendfile

Wyświetl plik

@ -255,11 +255,13 @@ library_path = os.path.join(os.path.dirname(__file__), "libsocketify_%s_%s.%s" %
lib = ffi.dlopen(library_path)
@ffi.callback("void(const char *, size_t, void *)")
@ffi.callback("void(const char*, size_t, void*)")
def uws_missing_server_name(hostname, hostname_length, user_data):
if not user_data == ffi.NULL:
app = ffi.from_handle(user_data)
try:
app = ffi.from_handle(user_data)
if hostname == ffi.NULL:
data = None
else:
@ -274,12 +276,12 @@ def uws_missing_server_name(hostname, hostname_length, user_data):
except Exception as err:
print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention
@ffi.callback("void(uws_websocket_t *, void *)")
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_drain_handler(ws, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
try:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
handler = handlers.drain
if inspect.iscoroutinefunction(handler):
response.grab_aborted_handler()
@ -289,12 +291,13 @@ def uws_websocket_drain_handler(ws, user_data):
except Exception as err:
print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention
@ffi.callback("void(uws_websocket_t *, void *)")
@ffi.callback("void(uws_websocket_t*, void*)")
def uws_websocket_open_handler(ws, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
if not user_data == ffi.NULL:
try:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
handler = handlers.open
if inspect.iscoroutinefunction(handler):
response.grab_aborted_handler()
@ -304,13 +307,13 @@ def uws_websocket_open_handler(ws, user_data):
except Exception as err:
print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention
@ffi.callback("void(uws_websocket_t*, const char *,size_t, uws_opcode_t, void*)")
@ffi.callback("void(uws_websocket_t*, const char*, size_t, uws_opcode_t, void*)")
def uws_websocket_message_handler(ws, message, length, opcode, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
try:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
if message == ffi.NULL:
data = None
else:
@ -332,10 +335,9 @@ def uws_websocket_message_handler(ws, message, length, opcode, user_data):
@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)")
def uws_websocket_pong_handler(ws, message, length, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
try:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
if message == ffi.NULL:
data = None
else:
@ -354,10 +356,10 @@ def uws_websocket_pong_handler(ws, message, length, user_data):
@ffi.callback("void(uws_websocket_t*, const char*, size_t, void*)")
def uws_websocket_ping_handler(ws, message,length, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
try:
(handlers, app) = ffi.from_handle(user_data)
ws = WebSocket(ws, app.SSL, app.loop)
if message == ffi.NULL:
data = None
else:
@ -377,11 +379,11 @@ def uws_websocket_ping_handler(ws, message,length, user_data):
@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)")
def uws_websocket_close_handler(ws, code, message, length, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
#pass to free data on WebSocket if needed
ws = WebSocket(ws, app.SSL, app.loop, True)
try:
(handlers, app) = ffi.from_handle(user_data)
#pass to free data on WebSocket if needed
ws = WebSocket(ws, app.SSL, app.loop, True)
if message == ffi.NULL:
data = None
else:
@ -404,10 +406,10 @@ def uws_websocket_close_handler(ws, code, message, length, user_data):
@ffi.callback("void(uws_res_t*, uws_req_t*, uws_socket_context_t*, void*)")
def uws_websocket_upgrade_handler(res, req, context, user_data):
if not user_data == ffi.NULL:
(handlers, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, app.SSL)
request = AppRequest(req)
try:
(handlers, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, app.SSL)
request = AppRequest(req)
handler = handlers.upgrade
if inspect.iscoroutinefunction(handler):
response.grab_aborted_handler()
@ -419,22 +421,21 @@ def uws_websocket_upgrade_handler(res, req, context, user_data):
print("Uncaught Exception: %s" % str(err)) #just log in console the error to call attention
@ffi.callback("void(const char *topic, size_t length, void *user_data)")
@ffi.callback("void(const char*, size_t, void*)")
def uws_req_for_each_topic_handler(topic, topic_size, user_data):
if not user_data == ffi.NULL:
ws = ffi.from_handle(user_data)
try:
ws = ffi.from_handle(user_data)
header_name = ffi.unpack(topic, topic_size).decode("utf-8")
ws.trigger_for_each_topic_handler(header_name, header_value)
except Exception: #invalid utf-8
return
@ffi.callback("void(const char *, size_t, const char *, size_t, void *)")
@ffi.callback("void(const char*, size_t, const char*, size_t, void*)")
def uws_req_for_each_header_handler(header_name, header_name_size, header_value, header_value_size, user_data):
if not user_data == ffi.NULL:
req = ffi.from_handle(user_data)
try:
req = ffi.from_handle(user_data)
header_name = ffi.unpack(header_name, header_name_size).decode("utf-8")
header_value = ffi.unpack(header_value, header_value_size).decode("utf-8")
@ -443,13 +444,13 @@ def uws_req_for_each_header_handler(header_name, header_name_size, header_value,
return
@ffi.callback("void(uws_res_t *, uws_req_t *, void *)")
@ffi.callback("void(uws_res_t*, uws_req_t*, void*)")
def uws_generic_method_handler(res, req, user_data):
if not user_data == ffi.NULL:
(handler, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, app.SSL)
request = AppRequest(req)
try:
(handler, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, app.SSL)
request = AppRequest(req)
if inspect.iscoroutinefunction(handler):
response.grab_aborted_handler()
response.run_async(handler(response, request))
@ -460,7 +461,7 @@ def uws_generic_method_handler(res, req, user_data):
app.trigger_error(err, response, request)
@ffi.callback("void(struct us_listen_socket_t *, uws_app_listen_config_t, void *)")
@ffi.callback("void(struct us_listen_socket_t*, uws_app_listen_config_t, void*)")
def uws_generic_listen_handler(listen_socket, config, user_data):
if listen_socket == ffi.NULL:
raise RuntimeError("Failed to listen on port %d" % int(config.port))
@ -472,7 +473,7 @@ def uws_generic_listen_handler(listen_socket, config, user_data):
app.socket = listen_socket
app._listen_handler(None if config == ffi.NULL else AppListenOptions(port=int(config.port),host=None if config.host == ffi.NULL else ffi.string(config.host).decode("utf-8"), options=int(config.options)))
@ffi.callback("void(uws_res_t *, void*)")
@ffi.callback("void(uws_res_t*, void*)")
def uws_generic_aborted_handler(response, user_data):
if not user_data == ffi.NULL:
try:
@ -480,7 +481,7 @@ def uws_generic_aborted_handler(response, user_data):
res.trigger_aborted()
except:
pass
@ffi.callback("void(uws_res_t *, const char *, size_t, bool, void*)")
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data):
if not user_data == ffi.NULL:
res = ffi.from_handle(user_data)
@ -491,7 +492,7 @@ def uws_generic_on_data_handler(res, chunk, chunk_length, is_end, user_data):
res.trigger_data_handler(data, bool(is_end))
@ffi.callback("bool(uws_res_t *, uintmax_t, void*)")
@ffi.callback("bool(uws_res_t*, uintmax_t, void*)")
def uws_generic_on_writable_handler(res, offset, user_data):
if not user_data == ffi.NULL:
res = ffi.from_handle(user_data)
@ -500,7 +501,7 @@ def uws_generic_on_writable_handler(res, offset, user_data):
return False
@ffi.callback("void(uws_res_t *, void*)")
@ffi.callback("void(uws_res_t*, void*)")
def uws_generic_cork_handler(res, user_data):
if not user_data == ffi.NULL:
response = ffi.from_handle(user_data)
@ -524,6 +525,38 @@ def uws_ws_cork_handler(user_data):
print("Error on cork handler %s" % str(err))
# Compressor mode is 8 lowest bits where HIGH4(windowBits), LOW4(memLevel).
# Decompressor mode is 8 highest bits LOW4(windowBits).
# If compressor or decompressor bits are 1, then they are shared.
# If everything is just simply 0, then everything is disabled.
class CompressOptions(IntEnum):
#Disabled, shared, shared are "special" values
DISABLED = lib.DISABLED
SHARED_COMPRESSOR = lib.SHARED_COMPRESSOR
SHARED_DECOMPRESSOR = lib.SHARED_DECOMPRESSOR
#Highest 4 bits describe decompressor
DEDICATED_DECOMPRESSOR_32KB = lib.DEDICATED_DECOMPRESSOR_32KB
DEDICATED_DECOMPRESSOR_16KB = lib.DEDICATED_DECOMPRESSOR_16KB
DEDICATED_DECOMPRESSOR_8KB = lib.DEDICATED_DECOMPRESSOR_8KB
DEDICATED_DECOMPRESSOR_4KB = lib.DEDICATED_DECOMPRESSOR_4KB
DEDICATED_DECOMPRESSOR_2KB = lib.DEDICATED_DECOMPRESSOR_2KB
DEDICATED_DECOMPRESSOR_1KB = lib.DEDICATED_DECOMPRESSOR_1KB
DEDICATED_DECOMPRESSOR_512B = lib.DEDICATED_DECOMPRESSOR_512B
#Same as 32kb
DEDICATED_DECOMPRESSOR = lib.DEDICATED_DECOMPRESSOR,
#Lowest 8 bit describe compressor
DEDICATED_COMPRESSOR_3KB = lib.DEDICATED_COMPRESSOR_3KB
DEDICATED_COMPRESSOR_4KB = lib.DEDICATED_COMPRESSOR_4KB
DEDICATED_COMPRESSOR_8KB = lib.DEDICATED_COMPRESSOR_8KB
DEDICATED_COMPRESSOR_16KB = lib.DEDICATED_COMPRESSOR_16KB
DEDICATED_COMPRESSOR_32KB = lib.DEDICATED_COMPRESSOR_32KB
DEDICATED_COMPRESSOR_64KB = lib.DEDICATED_COMPRESSOR_64KB
DEDICATED_COMPRESSOR_128KB = lib.DEDICATED_COMPRESSOR_128KB
DEDICATED_COMPRESSOR_256KB = lib.DEDICATED_COMPRESSOR_256KB
#Same as 256kb
DEDICATED_COMPRESSOR = lib.DEDICATED_COMPRESSOR
class OpCode(IntEnum):
CONTINUATION = 0
@ -1275,27 +1308,26 @@ class AppResponse:
if self.aborted:
return False
if isinstance(sec_web_socket_key, str):
sec_web_socket_key_data = sec_web_socket_key.encode('utf-8')
elif isinstance(sec_web_socket_key, bytes):
sec_web_socket_key_data = sec_web_socket_key
else:
raise RuntimeError("sec_web_socket_key need to be an String or Bytes")
sec_web_socket_key_data = b''
if isinstance(sec_web_socket_protocol, str):
sec_web_socket_protocol_data = sec_web_socket_protocol.encode('utf-8')
elif isinstance(sec_web_socket_protocol, bytes):
sec_web_socket_protocol_data = sec_web_socket_protocol
else:
raise RuntimeError("sec_web_socket_protocol need to be an String or Bytes")
sec_web_socket_protocol_data = b''
if isinstance(sec_web_socket_extensions, str):
sec_web_socket_extensions_data = sec_web_socket_extensions.encode('utf-8')
elif isinstance(sec_web_socket_extensions, bytes):
sec_web_socket_extensions_data = sec_web_socket_extensions
else:
raise RuntimeError("sec_web_socket_protocol need to be an String or Bytes")
sec_web_socket_extensions_data = b''
user_data_ptr = ffi.NULL
if not user_data is None:
@ -1305,7 +1337,7 @@ class AppResponse:
SocketRefs[_id] = user_data_ptr
lib.uws_res_upgrade(self.SSL, self.res, user_data_ptr, sec_web_socket_key_data, len(sec_web_socket_key_data),sec_web_socket_protocol_data, len(sec_web_socket_protocol_data),sec_web_socket_extensions_data, len(sec_web_socket_extensions_data), socket_context)
return True
def on_writable(self, handler):
if not self.aborted:

Wyświetl plik

@ -3,85 +3,42 @@
# import os.path
# DLL_EXPORT typedef void (*uws_listen_domain_handler)(struct us_listen_socket_t *listen_socket, const char* domain, size_t domain_length, int options, void *user_data);
# DLL_EXPORT typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data);
# DLL_EXPORT void uws_app_listen_domain(int ssl, uws_app_t *app, const char *domain,size_t server_name_length, uws_listen_domain_handler handler, void *user_data);
# DLL_EXPORT void uws_app_listen_domain_with_options(int ssl, uws_app_t *app, const char *domain,size_t server_name_length, int options, uws_listen_domain_handler handler, void *user_data);
# DLL_EXPORT void uws_app_domain(int ssl, uws_app_t *app, const char* server_name, size_t server_name_length);
# DLL_EXPORT void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length);
# DLL_EXPORT void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length);
# DLL_EXPORT void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options);
# DLL_EXPORT void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data);
# DLL_EXPORT void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data);
# DLL_EXPORT typedef void (*uws_listen_domain_handler)(struct us_listen_socket_t *listen_socket, const char* domain, size_t domain_length, int options, void *user_data);
# DLL_EXPORT void uws_app_listen_domain(int ssl, uws_app_t *app, const char *domain,size_t server_name_length,_listen_domain_handler handler, void *user_data);
# DLL_EXPORT void uws_app_listen_domain_with_options(int ssl, uws_app_t *app, const char *domain,size_t servere_length, int options, uws_listen_domain_handler handler, void *user_data);
# DLL_EXPORT void uws_app_domain(int ssl, uws_app_t *app, const char* server_name, size_t server_name_length);
# DLL_EXPORT void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data);
# from socketify import App
from socketify import App, AppOptions, OpCode
# import os
import multiprocessing
from socketify import App, AppOptions, OpCode, CompressOptions
import asyncio
import time
import mimetypes
mimetypes.init()
async def home(res, req):
res.end("Home")
def ws_open(ws):
print("Upgrated!")
print(ws.send("Xablau!", OpCode.TEXT))
print('A WebSocket got connected!')
ws.send("Hello World!", OpCode.TEXT)
def ws_message(ws, message, opcode):
print(message, opcode)
#Ok is false if backpressure was built up, wait for drain
ok = ws.send(message, opcode)
async def ws_upgrade(res, req, socket_context):
key = req.get_header("sec-websocket-key")
protocol = req.get_header("sec-websocket-protocol")
extensions = req.get_header("sec-websocket-extensions")
await asyncio.sleep(2)
print("request upgrade!")
res.upgrade(key, protocol, extensions, socket_context)
app = App()
app.ws("/*", {
'compression': CompressOptions.SHARED_COMPRESSOR,
'max_payload_length': 16 * 1024 * 1024,
'idle_timeout': 12,
'open': ws_open,
'message': ws_message,
'upgrade': ws_upgrade
})
app.get("/", home)
app.any("/", lambda res,req: res.end("Nothing to see here!'"))
app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % (config.port)))
app.run()
# def create_fork():
# n = os.fork()
# # n greater than 0 means parent process
# if not n > 0:
# run_app()
#openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -passout pass:1234 -keyout ./misc/key.pem -out ./misc/cert.pem
# # fork limiting the cpu count - 1
# for i in range(1, multiprocessing.cpu_count()):
# create_fork()
# run_app() # run app on the main process too :)
# from datetime import datetime
# raw = "_ga=GA1.1.1871393672.1649875681; affclick=null; __udf_j=d31b9af0d332fec181c1a893320322c0cb33ce95d7bdbd21a4cc4ee66d6d8c23817686b4ba59dd0e015cb95e8196157c"
# jar = Cookies(None)
# jar.set("session_id", "123132", {
# "path": "/",
# "domain": "*.test.com",
# "httponly": True,
# "expires": datetime.now()
# })
# print(jar.output())
# jar = cookies.SimpleCookie(raw)
# print(jar["_gaasasd"])
# print(split_header_words(raw))
#git submodule sync
app.run()