pull/75/head
Ciro 2022-12-18 15:24:45 -03:00
rodzic 76572b15cb
commit 26d0c89a2c
8 zmienionych plików z 221 dodań i 46 usunięć

Wyświetl plik

@ -0,0 +1,36 @@
import falcon
import falcon.asgi
class Home:
def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Hello, World!"
class WebSocket:
async def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Connect via ws protocol!"
async def on_websocket(self, req, ws):
try:
await ws.accept()
while True:
payload = await ws.receive_text()
if payload:
await ws.send_text(payload)
except falcon.WebSocketDisconnected:
print("Disconnected!")
# falcon WSGI APP
app = falcon.App()
home = Home()
app.add_route("/", home)
# ASGI WebSockets Falcon APP
ws = falcon.asgi.App()
ws.add_route("/", WebSocket())

Wyświetl plik

@ -22,6 +22,8 @@ class SomeResource:
app = falcon.asgi.App()
app.ws_options.max_receive_queue = 20_000_000 # this virtual disables queue but adds overhead
app.ws_options.enable_buffered_receiver = False # this disable queue but for now only available on cirospaciari/falcon
app.add_route("/", SomeResource())
# python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
# pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker

Wyświetl plik

@ -24,7 +24,7 @@ Options:
--ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True]
--ws-idle-timeout INT WebSocket idle timeout [default: 20]
--ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True]
--ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True]
--ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: False]
--ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0]
--ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216]
--ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False]
@ -39,7 +39,7 @@ Options:
--ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1]
--req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0]
--ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0]
--task-factory-maxitems INT Pre allocated instances of Task objects for socketify, ASGI interface [default: 100000]
Example:
python3 -m socketify main:app -w 8 -p 8181
@ -59,10 +59,19 @@ def run(app: App):
WebSockets can be in the same or another module, you can still use .ws("/*) to serve Websockets
```bash
python3 -m socketify hello_world_cli:run --ws hello_world_cli:websocket --port 8080 --workers 2
python3 -m socketify hello_world_cli:run --ws hello_world_cli:ws --port 8080 --workers 2
```
Socketify.py hello world + websockets:
```python
websocket = {
from socketify import App, OpCode
# App will be created by the cli with all things you want configured
def run(app: App):
# add your routes here
app.get("/", lambda res, req: res.end("Hello World!"))
ws = {
"open": lambda ws: ws.send("Hello World!", OpCode.TEXT),
"message": lambda ws, message, opcode: ws.send(message, opcode),
"close": lambda ws, code, message: print("WebSocket closed"),
@ -71,13 +80,76 @@ def run(app: App):
When running ASGI websocket will be served by default, but you can disabled it
```bash
python3 -m socketify falcon_asgi:app --ws none --port 8080 --workers 2
python3 -m socketify falcon_asgi:app --ws none --port 8080 --workers 2
```
When running WSGI or ASGI you can still use socketify.py or ASGI websockets in the same server, mixing all available methods
When running WSGI or ASGI you can still use socketify.py or ASGI websockets in the same server, mixing all available methods
You can use WSGI to more throughput in HTTP and use ASGI for websockets for example or you can use ASGI/WSGI for HTTP to keep compatibility and just re-write the websockets to use socketify interface with pub/sub and all features
```bash
python3 -m socketify falcon_wsgi:app --ws falcon:ws none --port 8080 --workers 2
python3 -m socketify falcon_wsgi:app --ws falcon_wsgi:ws --port 8080 --workers 2
```
Falcon WSGI + socketify websocket code sample
```python
import falcon
from socketify import OpCode
class Home:
def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Hello, World!"
# falcon APP
app = falcon.App()
home = Home()
app.add_route("/", home)
# socketify websocket app
ws = {
"open": lambda ws: ws.send("Hello World!", OpCode.TEXT),
"message": lambda ws, message, opcode: ws.send(message, opcode),
"close": lambda ws, code, message: print("WebSocket closed"),
}
```
Mixing ASGI websockets + WSGI HTTP
```bash
python3 -m socketify main:app --ws main:ws --port 8080 --workers 2
```
```python
import falcon
import falcon.asgi
class Home:
def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Hello, World!"
class WebSocket:
async def on_websocket(self, req, ws):
try:
await ws.accept()
while True:
payload = await ws.receive_text()
if payload:
await ws.send_text(payload)
except falcon.WebSocketDisconnected:
print("Disconnected!")
# falcon WSGI APP
app = falcon.App()
home = Home()
app.add_route("/", home)
# ASGI WebSockets Falcon APP
ws = falcon.asgi.App()
ws.add_route("/", WebSocket())
```
### Next [API Reference](api.md)

Wyświetl plik

@ -1,12 +1,22 @@
from socketify import App, CompressOptions, OpCode
from socketify import App, OpCode
from queue import SimpleQueue
from .native import lib, ffi
from .tasks import create_task, create_task_with_factory
import os
import platform
import sys
import logging
import uuid
is_pypy = platform.python_implementation() == "PyPy"
async def task_wrapper(task):
try:
return await task
except Exception as error:
try:
# just log in console the error to call attention
logging.error("Uncaught Exception: %s" % str(error))
finally:
return None
EMPTY_RESPONSE = {"type": "http.request", "body": b"", "more_body": False}
@ -19,7 +29,7 @@ def ws_message(ws, message, length, opcode, user_data):
message = message.decode("utf8")
socket_data.message(ws, message, OpCode(opcode))
@ffi.callback("void(uws_websocket_t*, int, const char*, size_t, void*)")
def ws_close(ws, code, message, length, user_data):
@ -68,6 +78,7 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
extensions = ffi.unpack(info.extensions, info.extensions_size).decode("utf8")
compress = app.ws_compression
ws = ASGIWebSocket(app.server.loop)
scope = {
"type": "websocket",
"asgi": {"version": "3.0", "spec_version": "2.3"},
@ -107,12 +118,12 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
len(data),
int(OpCode.BINARY),
int(compress),
0,
1,
)
else:
data = options.get("text", "").encode("utf8")
lib.socketify_ws_cork_send_with_options(
ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 0
ssl, ws.ws, data, len(data), int(OpCode.TEXT), int(compress), 1
)
return True
return False
@ -147,7 +158,12 @@ def ws_upgrade(ssl, response, info, socket_context, user_data, aborted):
sec_web_socket_extensions_data = extensions
else:
sec_web_socket_extensions_data = b""
_id = uuid.uuid4()
app.server._socket_refs[_id] = ws
def unregister():
app.server._socket_refs.pop(_id, None)
ws.unregister = unregister
lib.uws_res_upgrade(
ssl,
response,
@ -249,6 +265,7 @@ class ASGIWebSocket:
self._code = None
self._message = None
self._ptr = ffi.new_handle(self)
self.unregister = None
def accept(self):
self.accept_future = self.loop.create_future()
@ -287,6 +304,8 @@ class ASGIWebSocket:
future.set_result(
{"type": "websocket.disconnect", "code": code, "message": message}
)
if self.unregister is not None:
self.unregister()
def message(self, ws, value, opcode):
self.ws = ws
@ -478,7 +497,7 @@ def asgi(ssl, response, info, user_data, aborted):
class _ASGI:
def __init__(self, app, options=None, websocket=True, websocket_options=None, task_factory_max_items=0):
def __init__(self, app, options=None, websocket=True, websocket_options=None, task_factory_max_items=100_000):
self.server = App(options)
self.SERVER_PORT = None
self.SERVER_HOST = ""
@ -493,24 +512,26 @@ class _ASGI:
factory = create_task_with_factory(task_factory_max_items)
def run_task(task):
factory(loop, task)
factory(loop, task_wrapper(task))
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
create_task(loop, task)
create_task(loop, task_wrapper(task))
loop._run_once()
self._run_task = run_task
else:
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
def run_task(task):
loop.create_task(task, name='socketify.py-request-task')
future = loop.create_task(task_wrapper(task), name='socketify.py-request-task')
future._log_destroy_pending = False
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
loop.create_task(task)
future = loop.create_task(task_wrapper(task))
future._log_destroy_pending = False
loop._run_once()
self._run_task = run_task

Wyświetl plik

@ -20,7 +20,7 @@ Options:
--ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True]
--ws-idle-timeout INT WebSocket idle timeout [default: 20]
--ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True]
--ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True]
--ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: False]
--ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0]
--ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216]
--ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False]
@ -35,6 +35,7 @@ Options:
--ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1]
--req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0]
--ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0]
--task-factory-maxitems INT Pre allocated instances of Task objects for socketify, ASGI interface [default: 100000]
Example:
python3 -m socketify main:app -w 8 -p 8181
@ -64,7 +65,7 @@ def is_factory(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 0
def str_bool(text):
text = text.lower()
text = str(text).lower()
return text == "true"
def load_module(file, reload=False):
@ -166,7 +167,7 @@ def execute(args):
port = int(options.get("--port", options.get("-p", 8000)))
host = options.get("--host", options.get("-h", "127.0.0.1"))
uds = options.get('--uds', None)
task_factory_maxitems = int(options.get("--task-factory-maxitems", 100000))
disable_listen_log = options.get("--disable-listen-log", False)
websockets = options.get("--ws", "auto")
@ -213,7 +214,7 @@ def execute(args):
if websockets:
websocket_options = {
'compression': int(1 if options.get('--ws-per-message-deflate', True) else 0),
'compression': int(1 if options.get('--ws-per-message-deflate', False) else 0),
'max_payload_length': int(options.get('--ws-max-size', 16777216)),
'idle_timeout': int(options.get('--ws-idle-timeout', 20)),
'send_pings_automatically': str_bool(options.get('--ws-auto-ping', True)),
@ -236,7 +237,7 @@ def execute(args):
return print("socketify interface must be callable with 1 parameter def run(app: App)")
# run app with the settings desired
def run_app():
fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)))
fork_app = App(ssl_options, int(options.get("--req-res-factory-maxitems", 0)), int(options.get("--ws-factory-maxitems", 0)), task_factory_maxitems)
module(fork_app) # call module factory
if websockets: # if socketify websockets are added using --ws in socketify interface we can set here
@ -268,6 +269,6 @@ def execute(args):
else:
if uds:
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers)
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options, task_factory_max_items=task_factory_maxitems).listen(AppListenOptions(domain=uds), listen_log).run(workers=workers)
else:
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers)
Interface(module,options=ssl_options, websocket=websockets, websocket_options=websocket_options, task_factory_max_items=task_factory_maxitems).listen(AppListenOptions(port=port, host=host), listen_log).run(workers=workers)

Wyświetl plik

@ -346,7 +346,7 @@ def uws_websocket_factory_close_handler(ws, code, message, length, user_data):
if inspect.iscoroutinefunction(handler):
async def wrapper(app, instances, handler, ws, data, code, dispose):
try:
await handler(ws, code, data)
return await handler(ws, code, data)
finally:
key = ws.get_user_data_uuid()
if key is not None:
@ -389,7 +389,7 @@ def uws_websocket_close_handler(ws, code, message, length, user_data):
if inspect.iscoroutinefunction(handler):
async def wrapper(app, handler, ws, data, code, dispose):
try:
await handler(ws, code, data)
return await handler(ws, code, data)
finally:
key = ws.get_user_data_uuid()
if key is not None:
@ -473,7 +473,7 @@ def uws_websocket_factory_upgrade_handler(res, req, context, user_data):
def uws_websocket_upgrade_handler(res, req, context, user_data):
if user_data != ffi.NULL:
handlers, app = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, app.SSL, app._template)
response = AppResponse(res, app.loop, app.SSL, app._template, app._socket_refs)
request = AppRequest(req)
try:
handler = handlers.upgrade
@ -548,7 +548,7 @@ def uws_generic_factory_method_handler(res, req, user_data):
def uws_generic_method_handler(res, req, user_data):
if user_data != ffi.NULL:
(handler, app) = ffi.from_handle(user_data)
response = AppResponse(res, app.loop, app.SSL, app._template)
response = AppResponse(res, app.loop, app.SSL, app._template, app._socket_refs)
request = AppRequest(req)
try:
@ -1039,13 +1039,13 @@ class RequestResponseFactory:
def __init__(self, app, max_size):
self.factory_queue = []
for _ in range(0, max_size):
response = AppResponse(None, app.loop, app.SSL, app._template)
response = AppResponse(None, app.loop, app.SSL, app._template, app._socket_refs)
request = AppRequest(None)
self.factory_queue.append((response, request, True))
def get(self, app, res, req):
if len(self.factory_queue) == 0:
response = AppResponse(res, app.loop, app.SSL, app._template)
response = AppResponse(res, app.loop, app.SSL, app._template, app._socket_refs)
request = AppRequest(req)
return response, request, False
@ -1310,9 +1310,10 @@ class AppRequest:
class AppResponse:
def __init__(self, response, loop, ssl, render=None):
def __init__(self, response, loop, ssl, render, socket_refs):
self.res = response
self.SSL = ssl
self._socket_refs = socket_refs
self.aborted = False
self.loop = loop
self._aborted_handler = None
@ -1764,7 +1765,7 @@ class AppResponse:
_id = uuid.uuid4()
user_data_ptr = ffi.new_handle((user_data, _id))
# keep alive data
SocketRefs[_id] = user_data_ptr
self._socket_refs[_id] = user_data_ptr
lib.uws_res_upgrade(
self.SSL,

Wyświetl plik

@ -83,13 +83,17 @@ class UVLoop:
self._loop = ffi.NULL
def run_nowait(self):
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT)
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_NOWAIT)
def run(self):
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT)
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_DEFAULT)
def run_once(self):
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE)
if self._loop != ffi.NULL:
return lib.socketify_loop_run(self._loop, lib.SOCKETIFY_RUN_ONCE)
def stop(self):
lib.socketify_loop_stop(self._loop)
if self._loop != ffi.NULL:
lib.socketify_loop_stop(self._loop)

Wyświetl plik

@ -4,7 +4,10 @@ from socketify import App
from .asgi import ws_close, ws_upgrade, ws_open, ws_message
from io import BytesIO
from .native import lib, ffi
import platform
is_pypy = platform.python_implementation() == "PyPy"
from .tasks import create_task, create_task_with_factory
import sys
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
@ -124,7 +127,7 @@ def wsgi(ssl, response, info, user_data, aborted):
return
ssl = data_response.app.server.SSL
app_iter = data_response.app.app(
app_iter = data_response.app.wsgi(
data_response.environ, data_response.start_response
)
try:
@ -148,7 +151,7 @@ def wsgi(ssl, response, info, user_data, aborted):
lib.uws_res_on_data(ssl, response, wsgi_on_data_handler, data_response._ptr)
else:
environ["wsgi.input"] = None
app_iter = app.app(environ, start_response)
app_iter = app.wsgi(environ, start_response)
try:
for data in app_iter:
if isinstance(data, bytes):
@ -161,14 +164,16 @@ def wsgi(ssl, response, info, user_data, aborted):
app_iter.close()
lib.uws_res_end_without_body(ssl, response, 0)
def is_asgi(module):
return hasattr(module, "__call__") and len(inspect.signature(module).parameters) == 3
class _WSGI:
def __init__(self, app, options=None, websocket=None, websocket_options=None):
def __init__(self, app, options=None, websocket=None, websocket_options=None, task_factory_max_items=100_000):
self.server = App(options)
self.SERVER_HOST = None
self.SERVER_PORT = None
self.SERVER_WS_SCHEME = "wss" if self.server.options else "ws"
self.app = app
self.wsgi = app
self.BASIC_ENVIRON = dict(os.environ)
self.ws_compression = False
@ -177,12 +182,43 @@ class _WSGI:
self.server.SSL, self.server.app, wsgi, self._ptr
)
self.asgi_ws_info = None
if isinstance(websocket, dict): # serve websocket as socketify.py
if websocket_options:
websocket.update(websocket_options)
self.server.ws("/*", websocket)
elif inspect.iscoroutine(websocket):
elif is_asgi(websocket):
self.app = websocket # set ASGI app
loop = self.server.loop.loop
# ASGI do not use app.run_async to not add any overhead from socketify.py WebFramework
# internally will still use custom task factory for pypy because of Loop
if is_pypy:
if task_factory_max_items > 0:
factory = create_task_with_factory(task_factory_max_items)
def run_task(task):
factory(loop, task)
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
create_task(loop, task)
loop._run_once()
self._run_task = run_task
else:
if sys.version_info >= (3, 8): # name fixed to avoid dynamic name
def run_task(task):
loop.create_task(task, name='socketify.py-request-task')
loop._run_once()
self._run_task = run_task
else:
def run_task(task):
loop.create_task(task)
loop._run_once()
self._run_task = run_task
# detect ASGI to use as WebSocket as mixed protocol
native_options = ffi.new("uws_socket_behavior_t *")
native_behavior = native_options[0]
@ -228,6 +264,7 @@ class _WSGI:
native_behavior.ping = ffi.NULL
native_behavior.pong = ffi.NULL
native_behavior.close = ws_close
self.asgi_ws_info = lib.socketify_add_asgi_ws_handler(
self.server.SSL, self.server.app, native_behavior, ws_upgrade, self._ptr
@ -276,12 +313,13 @@ class _WSGI:
# "Public" WSGI interface to allow easy forks/workers
class WSGI:
def __init__(self, app, options=None, websocket=None, websocket_options=None):
def __init__(self, app, options=None, websocket=None, websocket_options=None, task_factory_max_items=100_000):
self.app = app
self.options = options
self.websocket = websocket
self.websocket_options = websocket_options
self.listen_options = None
self.task_factory_max_items = task_factory_max_items
def listen(self, port_or_options, handler=None):
self.listen_options = (port_or_options, handler)
@ -290,7 +328,7 @@ class WSGI:
def run(self, workers=1):
def run_app():
server = _WSGI(
self.app, self.options, self.websocket, self.websocket_options
self.app, self.options, self.websocket, self.websocket_options, self.task_factory_max_items
)
if self.listen_options:
(port_or_options, handler) = self.listen_options