first cli.py and some improvements

pull/75/head
Ciro 2022-12-07 09:38:42 -03:00
rodzic ee5ced290e
commit 7c623703b1
16 zmienionych plików z 568 dodań i 70 usunięć

Wyświetl plik

@ -24,4 +24,4 @@ home = Home()
app.add_route("/", home)
if __name__ == "__main__":
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=8)

Wyświetl plik

@ -23,5 +23,4 @@ home = Home()
app.add_route("/", home)
if __name__ == "__main__":
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run(workers=2)

Wyświetl plik

@ -0,0 +1,12 @@
from socketify import App
# App will be created by the cli with all things you want configured
def run(app: App):
# add your routes here
app.get("/", lambda res, req: res.end("Hello World!"))
# python -m socketify hello_world_cli:run --port 8080 --workers 2
# python3 -m socketify hello_world_cli:run --port 8080 --workers 2
# pypy3 -m socketify hello_world_cli:run --port 8080 --workers 2
# see options in with: python3 -m socketify --help

Wyświetl plik

@ -0,0 +1,18 @@
from socketify import App, OpCode
def run(app: App):
# add your routes here
app.get("/", lambda res, req: res.end("Hello World!"))
# cli will use this configuration for serving in "/*" route, you can still use .ws("/*", config) if you want but --ws* options will not have effect
websocket = {
"open": lambda ws: ws.send("Hello World!", OpCode.TEXT),
"message": lambda ws, message, opcode: ws.send(message, opcode),
"close": lambda ws, code, message: print("WebSocket closed"),
}
# python -m socketify hello_world_cli_ws:run --ws hello_world_cli_ws:websocket --port 8080 --workers 2
# python3 -m socketify hello_world_cli_ws:run --ws hello_world_cli_ws:websocket--port 8080 --workers 2
# pypy3 -m socketify hello_world_cli_ws:run --ws hello_world_cli_ws:websocket--port 8080 --workers 2
# see options in with: python3 -m socketify --help

Wyświetl plik

@ -0,0 +1,3 @@
import sys
from .cli import execute
execute(sys.argv)

Wyświetl plik

@ -1,7 +1,7 @@
from socketify import App, CompressOptions, OpCode
from queue import SimpleQueue
from .native import lib, ffi
import asyncio
import os
EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False }
@ -50,6 +50,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
extensions = None
else:
extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8')
compress = app.ws_compression
ws = ASGIWebSocket(app.server.loop)
scope = {
'type': 'websocket',
@ -77,10 +78,10 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
data = options.get("bytes", None)
if ws.ws:
if data:
lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.BINARY))
lib.socketify_ws_cork_send_with_options(ssl, ws.ws, data, len(data), int(OpCode.BINARY), int(compress), 0)
else:
data = options.get('text', '').encode('utf8')
lib.socketify_ws_cork_send(ssl, ws.ws, data, len(data), int(OpCode.TEXT))
lib.socketify_ws_cork_send_with_options(ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 0)
return True
return False
if type == 'websocket.accept': # upgrade!
@ -136,9 +137,9 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
if type == 'websocket.publish': # publish extension
data = options.get("bytes", None)
if data:
app.server.publish(options.get('topic'), data)
app.server.publish(options.get('topic'), data, OpCode.BINARY, compress)
else:
app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT)
app.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT, compress)
return True
if type == 'websocket.subscribe': # subscribe extension
if ws.ws:
@ -421,15 +422,16 @@ def asgi(ssl, response, info, user_data, aborted):
return False
app.server.loop.run_async(app.app(scope, receive, send))
class ASGI:
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
class _ASGI:
def __init__(self, app, options=None, websocket=True, websocket_options=None):
self.server = App(options)
self.SERVER_PORT = None
self.SERVER_HOST = ''
self.SERVER_SCHEME = 'https' if self.server.options else 'http'
self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws'
self.app = app
self.ws_compression = False
# optimized in native
self._ptr = ffi.new_handle(self)
self.asgi_http_info = lib.socketify_add_asgi_http_handler(
@ -438,53 +440,65 @@ class ASGI:
asgi,
self._ptr
)
self.asgi_ws_info = None
if isinstance(websocket, dict): #serve websocket as socketify.py
if websocket_options:
websocket.update(websocket_options)
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
native_behavior.maxPayloadLength = ffi.cast(
"unsigned int",
16 * 1024 * 1024,
)
native_behavior.idleTimeout = ffi.cast(
"unsigned short",
0,
)
native_behavior.maxBackpressure = ffi.cast(
"unsigned int",
1024 * 1024 * 1024,
)
native_behavior.compression = ffi.cast(
"uws_compress_options_t", 0
)
native_behavior.maxLifetime = ffi.cast(
"unsigned short", 0
)
native_behavior.closeOnBackpressureLimit = ffi.cast(
"int", 0
)
native_behavior.resetIdleTimeoutOnSend = ffi.cast(
"int", 0
)
native_behavior.sendPingsAutomatically = ffi.cast(
"int", 0
)
self.server.ws("/*", websocket)
elif websocket: #serve websocket as ASGI
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
native_behavior.upgrade = ffi.NULL # will be set first on C++
if not websocket_options:
websocket_options = {}
native_behavior.open = ws_open
native_behavior.message = ws_message
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL,
self.server.app,
native_behavior,
ws_upgrade,
self._ptr
)
self.ws_compression = bool(websocket_options.get('compression', False))
native_behavior.maxPayloadLength = ffi.cast(
"unsigned int",
int(websocket_options.get('max_payload_length', 16777216)),
)
native_behavior.idleTimeout = ffi.cast(
"unsigned short",
int(websocket_options.get('idle_timeout', 20)),
)
native_behavior.maxBackpressure = ffi.cast(
"unsigned int",
int(websocket_options.get('max_backpressure', 16777216)),
)
native_behavior.compression = ffi.cast(
"uws_compress_options_t", int(self.ws_compression)
)
native_behavior.maxLifetime = ffi.cast(
"unsigned short", int(websocket_options.get('max_lifetime', 0))
)
native_behavior.closeOnBackpressureLimit = ffi.cast(
"int", int(websocket_options.get('close_on_backpressure_limit', 0)),
)
native_behavior.resetIdleTimeoutOnSend = ffi.cast(
"int", bool(websocket_options.get('reset_idle_timeout_on_send', True))
)
native_behavior.sendPingsAutomatically = ffi.cast(
"int", bool(websocket_options.get('send_pings_automatically', True))
)
native_behavior.upgrade = ffi.NULL # will be set first on C++
native_behavior.open = ws_open
native_behavior.message = ws_message
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL,
self.server.app,
native_behavior,
ws_upgrade,
self._ptr
)
def listen(self, port_or_options, handler=None):
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
@ -492,11 +506,47 @@ class ASGI:
self.server.listen(port_or_options, handler)
return self
def run(self):
self.server.run()
self.server.run() # run app on the main process too :)
return self
def __del__(self):
if self.asgi_http_info:
lib.socketify_destroy_asgi_app_info(self.asgi_http_info)
if self.asgi_ws_info:
lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info)
lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info)
# "Public" ASGI interface to allow easy forks/workers
class ASGI:
def __init__(self, app, options=None, websocket=True, websocket_options=None):
self.app = app
self.options = options
self.websocket = websocket
self.websocket_options = websocket_options
self.listen_options = None
def listen(self, port_or_options, handler=None):
self.listen_options = (port_or_options, handler)
return self
def run(self, workers=1):
def run_app():
server = _ASGI(self.app, self.options, self.websocket, self.websocket_options)
if self.listen_options:
(port_or_options, handler) = self.listen_options
server.listen(port_or_options, handler)
server.run()
def create_fork():
n = os.fork()
# n greater than 0 means parent process
if not n > 0:
run_app()
# fork limiting the cpu count - 1
for i in range(1, workers):
create_fork()
run_app() # run app on the main process too :)
return self

Wyświetl plik

@ -0,0 +1,255 @@
import inspect
import os
from . import App, AppOptions, AppListenOptions
help = """
Usage: python -m socketify APP [OPTIONS]
python3 -m socketify APP [OPTIONS]
pypy3 -m socketify APP [OPTIONS]
Options:
--help Show this Help
--host or -h TEXT Bind socket to this host. [default:127.0.0.1]
--port or -p INTEGER Bind socket to this port. [default: 8000]
--workers or -w INTEGER Number of worker processes. Defaults to the WEB_CONCURRENCY
environment variable if available, or 1
--ws [auto|none|module:ws] If informed module:ws will auto detect to use socketify.py or ASGI websockets
interface and disabled if informed none [default: auto]
--ws-max-size INTEGER WebSocket max size message in bytes [default: 16777216]
--ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True]
--ws-idle-timeout INT WebSocket idle timeout [default: 20]
--ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True]
--ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True]
--ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0]
--ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216]
--ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False]
--lifespan [auto|on|off] Lifespan implementation. [default: auto]
--interface [auto|asgi|asgi3|wsgi|ssgi|socketify] Select ASGI (same as ASGI3), ASGI3, WSGI or SSGI as the application interface. [default: auto]
--disable-listen-log BOOLEAN Disable log when start listenning [default: False]
--version or -v Display the socketify.py version and exit.
--ssl-keyfile TEXT SSL key file
--ssl-certfile TEXT SSL certificate file
--ssl-keyfile-password TEXT SSL keyfile password
--ssl-ca-certs TEXT CA certificates file
--ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1]
--req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0]
--ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0]
--uds TEXT Bind to a UNIX domain socket.
--reload Enable auto-reload. This options also disable --workers or -w option.
--reload-dir PATH Set reload directories explicitly, instead of using the current working directory.
--reload-include TEXT Set extensions to include while watching for files.
Includes '.py,.html,.js,.png,.jpeg,.jpg and .webp' by default;
these defaults can be overridden with `--reload-exclude`.
--reload-exclude TEXT Set extensions to include while watching for files.
--reload-delay INT Milliseconds to delay reload between file changes. [default: 1000]
Example:
python3 -m socketify main:app -w 8 -p 8181
"""
def is_wsgi(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 2
def is_asgi(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
def is_ssgi(module):
return False # no spec yet
def is_ssgi(module):
return False # no spec yet
def is_socketify(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 1
def is_factory(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 0
def str_bool(text):
text = text.lower()
return text == "true"
def load_module(file, reload=False):
try:
[full_module, app] = file.split(':')
import importlib
module =importlib.import_module(full_module)
if reload:
importlib.reload(module)
app = getattr(module, app)
# if is an factory just auto call
if is_factory(module):
app = app()
return app
except:
return None
def execute(args):
arguments_length = len(args)
if arguments_length <= 2:
if arguments_length == 2 and (args[1] == "--help"):
return print(help)
if arguments_length == 2 and (args[1] == "--version" or args[1] == "-v"):
import pkg_resources # part of setuptools
import platform
version = pkg_resources.require("socketify")[0].version
return print(f"Running socketify {version} with {platform.python_implementation()} {platform.python_version()} on {platform.system()}")
elif arguments_length < 2:
return print(help)
file = (args[1]).lower()
module = load_module(file)
if not module:
return print(f"Cannot load module {file}")
options_list = args[2:]
options = {}
selected_option = None
for option in options_list:
if selected_option:
options[selected_option] = option
selected_option = None
else:
selected_option = option
if selected_option: # --factory
options[selected_option] = True
interface = (options.get("--interface", "auto")).lower()
if interface == "auto":
if is_asgi(module):
from . import ASGI as Interface
interface = "asgi"
elif is_wsgi(module):
from . import WSGI as Interface
interface = "wsgi"
elif is_ssgi(module):
from . import SSGI as Interface
interface = "ssgi"
else:
interface = "socketify"
elif interface == "asgi" or interface == "asgi3":
from . import ASGI as Interface
interface = "asgi"
# you may use ASGI in SSGI so no checks here
if is_wsgi(module):
return print("Cannot use WSGI interface as ASGI interface")
if not is_asgi(module):
return print("ASGI interface must be callable with 3 parameters async def app(scope, receive and send)")
elif interface == "wsgi":
from . import WSGI as Interface
# you may use WSGI in SSGI so no checks here
if is_asgi(module):
return print("Cannot use ASGI interface as WSGI interface")
if not is_wsgi(module):
return print("WSGI interface must be callable with 2 parameters def app(environ, start_response)")
elif interface == "ssgi":
# if not is_ssgi(module):
# return print("SSGI is in development yet but is comming soon")
# from . import SSGI as Interface
# interface = "ssgi"
return print("SSGI is in development yet but is comming soon")
elif interface != "socketify":
return print(f"{interface} interface is not supported yet")
workers = int(options.get("--workers", options.get("-w", os.environ.get('WEB_CONCURRENCY', 1))))
if workers < 1:
workers = 1
port = int(options.get("--port", options.get("-p", 8000)))
host = options.get("--host", options.get("-h", "127.0.0.1"))
disable_listen_log = options.get("--disable-listen-log", False)
websockets = options.get("--ws", "auto")
if websockets == "none":
# disable websockets
websockets = None
elif websockets == "auto":
# if is ASGI serve websocket by default
if is_asgi(module):
websockets = True
elif is_wsgi(module):
# if is WSGI no websockets using auto
websockets = False
else: # if is socketify websockets must be set in app
websockets = False
else:
#websocket dedicated module
ws_module = load_module(websockets)
if not ws_module:
return print(f"Cannot load websocket module {websockets}")
websockets = ws_module
key_file_name = options.get('--ssl-keyfile', None)
if key_file_name:
ssl_options = AppOptions(
key_file_name=options.get('--ssl-keyfile', None),
cert_file_name=options.get('--ssl-certfile', None),
passphrase=options.get('--ssl-keyfile-password', None),
ca_file_name=options.get('--ssl-ca-certs', None),
ssl_ciphers=options.get('--ssl-ciphers', None)
)
else:
ssl_options = None
def listen_log(config):
if not disable_listen_log:
print(f"Listening on {'https' if ssl_options else 'http'}://{config.host if config.host and len(config.host) > 1 else '127.0.0.1' }:{config.port} now\n")
if websockets:
websocket_options = {
'compression': int(1 if options.get('--ws-per-message-deflate', True) else 0),
'max_payload_length': int(options.get('--ws-max-size', 16777216)),
'idle_timeout': int(options.get('--ws-idle-timeout', 20)),
'send_pings_automatically': str_bool(options.get('--ws-auto-ping', True)),
'reset_idle_timeout_on_send': str_bool(options.get('--ws-reset-idle-on-send', True)),
'max_lifetime': int(options.get('--ws-max-lifetime', 0)),
'max_backpressure': int(options.get('--max-backpressure', 16777216)),
'close_on_backpressure_limit': str_bool(options.get('--ws-close-on-backpressure-limit', False))
}
else:
websocket_options = None
if interface == "socketify":
if is_asgi(websockets):
return print("Cannot mix ASGI websockets with socketify.py interface yet")
if is_asgi(module):
return print("Cannot use ASGI interface as socketify interface")
elif is_wsgi(module):
return print("Cannot use WSGI interface as socketify interface")
elif not is_socketify(module):
return print("socketify interface must be callable with 1 parameter def run(app: App)")
# run app with the settings desired
def run_app():
fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)))
module(fork_app) # call module factory
if websockets: # if socketify websockets are added using --ws in socketify interface we can set here
websockets.update(websocket_options) # set websocket options
fork_app.ws("/*", websockets)
fork_app.listen(AppListenOptions(port=port, host=host), listen_log)
fork_app.run()
# now we can start all over again
def create_fork(_):
n = os.fork()
# n greater than 0 means parent process
if not n > 0:
run_app()
return n
# run in all forks
pid_list = list(map(create_fork, range(1, workers)))
# run in this process
run_app()
# sigint everything to gracefull shutdown
import signal
for pid in pid_list:
os.kill(pid, signal.SIGINT)
else:
#Generic WSGI, ASGI, SSGI Interface
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers)

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -65,9 +65,10 @@ class Loop:
def stop(self):
# Just mark as started = False and wait
self.started = False
self.loop.stop()
if self.started:
# Just mark as started = False and wait
self.started = False
self.loop.stop()
# Exposes native loop for uWS
def get_native_loop(self):
@ -87,6 +88,10 @@ class Loop:
return future
def dispose(self):
if self.uv_loop:
self.uv_loop.dispose()
self.uv_loop = None
# if sys.version_info >= (3, 11)
# with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:

Wyświetl plik

@ -376,6 +376,9 @@ void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app);
void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length);
void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection);
void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode);
void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool close_connection);
"""
)

Wyświetl plik

@ -329,6 +329,23 @@ void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size
}
void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool fin){
if (ssl)
{
uWS::WebSocket<true, true, void *> *uws = (uWS::WebSocket<true, true, void *> *)ws;
uws->cork([&](){
uws->send(std::string_view(data, length), (uWS::OpCode)(unsigned char) opcode, compress, fin);
});
}
else
{
uWS::WebSocket<false, true, void *> *uws = (uWS::WebSocket<false, true, void *> *)ws;
uws->cork([&](){
uws->send(std::string_view(data, length), (uWS::OpCode)(unsigned char) opcode, compress, fin);
});
}
}
socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data){
socksocketify_asgi_ws_app_info* info = (socksocketify_asgi_ws_app_info*)malloc(sizeof(socksocketify_asgi_ws_app_info));

Wyświetl plik

@ -137,6 +137,7 @@ DLL_EXPORT void socketify_res_cork_end(int ssl, uws_res_t *response, const char*
DLL_EXPORT socksocketify_asgi_ws_app_info* socketify_add_asgi_ws_handler(int ssl, uws_app_t* app, uws_socket_behavior_t behavior, socketify_asgi_ws_method_handler handler, void* user_data);
DLL_EXPORT void socketify_destroy_asgi_ws_app_info(socksocketify_asgi_ws_app_info* app);
DLL_EXPORT void socketify_ws_cork_send(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode);
DLL_EXPORT void socketify_ws_cork_send_with_options(int ssl, uws_websocket_t *ws, const char* data, size_t length, uws_opcode_t opcode, bool compress, bool fin);
#endif
#ifdef __cplusplus
}

Wyświetl plik

@ -540,12 +540,13 @@ def uws_generic_listen_handler(listen_socket, config, user_data):
app.socket = listen_socket
app._listen_handler(
None
if config == ffi.NULL
else AppListenOptions(
port=int(config.port),
host=None
if config.host == ffi.NULL
else ffi.string(config.host).decode("utf-8"),
else ffi.string(config.host).decode("utf8"),
options=int(config.options),
)
)
@ -2261,7 +2262,8 @@ class App:
if hasattr(self, "socket"):
if not self.socket == ffi.NULL:
lib.us_listen_socket_close(self.SSL, self.socket)
self.loop.stop()
self.socket = ffi.NULL
self.loop.stop()
return self
def set_error_handler(self, handler):
@ -2297,9 +2299,24 @@ class App:
finally:
pass
def dispose(self):
if self.app: #only destroy if exists
self.close()
lib.uws_app_destroy(self.SSL, self.app)
self.app = None
if self.loop:
self.loop.dispose()
self.loop = None
def __del__(self):
if self.app: #only destroy if exists
self.close()
lib.uws_app_destroy(self.SSL, self.app)
if self.loop:
self.loop.dispose()
self.loop = None
class AppListenOptions:
@ -2348,3 +2365,4 @@ class AppOptions:
self.ca_file_name = ca_file_name
self.ssl_ciphers = ssl_ciphers
self.ssl_prefer_low_memory_usage = ssl_prefer_low_memory_usage

@ -1 +1 @@
Subproject commit 8fef69b3a3903b771839ad6ea4a0c9bd3c009a9e
Subproject commit 5773238f673348d560e769b10a424c6dbe598730

Wyświetl plik

@ -70,9 +70,17 @@ class UVLoop:
def get_native_loop(self):
return lib.socketify_get_native_loop(self._loop)
def dispose(self):
if self._loop != ffi.NULL:
lib.socketify_destroy_loop(self._loop)
self._handler_data = None
self._loop = ffi.NULL
def __del__(self):
lib.socketify_destroy_loop(self._loop)
self._handler_data = None
if self._loop != ffi.NULL:
lib.socketify_destroy_loop(self._loop)
self._handler_data = None
self._loop = ffi.NULL
def run_nowait(self):
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT)

Wyświetl plik

@ -1,7 +1,7 @@
import os
import inspect
from socketify import App
from .asgi import ws_close, ws_upgrade, ws_open, ws_message
from io import BytesIO
from .native import lib, ffi
@ -132,12 +132,17 @@ def wsgi(ssl, response, info, user_data, aborted):
app_iter.close()
lib.uws_res_end_without_body(ssl, response, 0)
class WSGI:
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
class _WSGI:
def __init__(self, app, options=None, websocket=None, websocket_options=None):
self.server = App(options)
self.SERVER_HOST = None
self.SERVER_PORT = None
self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws'
self.app = app
self.BASIC_ENVIRON = dict(os.environ)
self.ws_compression = False
self._ptr = ffi.new_handle(self)
self.asgi_http_info = lib.socketify_add_asgi_http_handler(
@ -146,9 +151,70 @@ class WSGI:
wsgi,
self._ptr
)
self.asgi_ws_info = None
if isinstance(websocket, dict): #serve websocket as socketify.py
if websocket_options:
websocket.update(websocket_options)
self.server.ws("/*", websocket)
elif inspect.iscoroutine(websocket):
# detect ASGI to use as WebSocket as mixed protocol
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
if not websocket_options:
websocket_options = {}
self.ws_compression = websocket_options.get('compression', False)
native_behavior.maxPayloadLength = ffi.cast(
"unsigned int",
int(websocket_options.get('max_payload_length', 16777216)),
)
native_behavior.idleTimeout = ffi.cast(
"unsigned short",
int(websocket_options.get('idle_timeout', 20)),
)
native_behavior.maxBackpressure = ffi.cast(
"unsigned int",
int(websocket_options.get('max_backpressure', 16777216)),
)
native_behavior.compression = ffi.cast(
"uws_compress_options_t", int(self.ws_compression)
)
native_behavior.maxLifetime = ffi.cast(
"unsigned short", int(websocket_options.get('max_lifetime', 0))
)
native_behavior.closeOnBackpressureLimit = ffi.cast(
"int", int(websocket_options.get('close_on_backpressure_limit', 0)),
)
native_behavior.resetIdleTimeoutOnSend = ffi.cast(
"int", bool(websocket_options.get('reset_idle_timeout_on_send', True))
)
native_behavior.sendPingsAutomatically = ffi.cast(
"int", bool(websocket_options.get('send_pings_automatically', True))
)
native_behavior.upgrade = ffi.NULL # will be set first on C++
native_behavior.open = ws_open
native_behavior.message = ws_message
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL,
self.server.app,
native_behavior,
ws_upgrade,
self._ptr
)
def listen(self, port_or_options, handler=None):
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host
self.BASIC_ENVIRON.update({
'GATEWAY_INTERFACE': 'CGI/1.1',
'SERVER_PORT': str(self.SERVER_PORT),
@ -170,3 +236,46 @@ class WSGI:
def run(self):
self.server.run()
return self
def __del__(self):
if self.asgi_http_info:
lib.socketify_destroy_asgi_app_info(self.asgi_http_info)
if self.asgi_ws_info:
lib.socketify_destroy_asgi_ws_app_info(self.asgi_ws_info)
# "Public" WSGI interface to allow easy forks/workers
class WSGI:
def __init__(self, app, options=None, websocket=None, websocket_options=None):
self.app = app
self.options = options
self.websocket = websocket
self.websocket_options = websocket_options
self.listen_options = None
def listen(self, port_or_options, handler=None):
self.listen_options = (port_or_options, handler)
return self
def run(self, workers=1):
def run_app():
server = _WSGI(self.app, self.options, self.websocket, self.websocket_options)
if self.listen_options:
(port_or_options, handler) = self.listen_options
server.listen(port_or_options, handler)
server.run()
def create_fork():
n = os.fork()
# n greater than 0 means parent process
if not n > 0:
run_app()
# fork limiting the cpu count - 1
for i in range(1, workers):
create_fork()
run_app() # run app on the main process too :)
return self