kopia lustrzana https://github.com/cirospaciari/socketify.py
added a lot of native optimizations
rodzic
2043f84f3b
commit
5158cc13da
|
@ -37,6 +37,7 @@
|
||||||
- Middlewares
|
- Middlewares
|
||||||
- Templates Support (examples with [`Mako`](https://github.com/cirospaciari/socketify.py/tree/main/examples/template_mako.py) and [`Jinja2`](https://github.com/cirospaciari/socketify.py/tree/main/examples/template_jinja2.py))
|
- Templates Support (examples with [`Mako`](https://github.com/cirospaciari/socketify.py/tree/main/examples/template_mako.py) and [`Jinja2`](https://github.com/cirospaciari/socketify.py/tree/main/examples/template_jinja2.py))
|
||||||
- ASGI Server with pub/sub extension for Falcon
|
- ASGI Server with pub/sub extension for Falcon
|
||||||
|
- WSGI Server
|
||||||
|
|
||||||
## :mag_right: Upcoming Features
|
## :mag_right: Upcoming Features
|
||||||
- In-Memory Cache Tools
|
- In-Memory Cache Tools
|
||||||
|
@ -45,7 +46,6 @@
|
||||||
- Full asyncio integration with libuv
|
- Full asyncio integration with libuv
|
||||||
- SSGI Server spec and support
|
- SSGI Server spec and support
|
||||||
- RSGI Server support
|
- RSGI Server support
|
||||||
- WSGI Server compatibility
|
|
||||||
- Full Http3 support
|
- Full Http3 support
|
||||||
- [`HPy`](https://hpyproject.org/) integration to better support [`CPython`](https://github.com/python/cpython), [`PyPy`](https://www.pypy.org/) and [`GraalPython`](https://github.com/oracle/graalpython)
|
- [`HPy`](https://hpyproject.org/) integration to better support [`CPython`](https://github.com/python/cpython), [`PyPy`](https://www.pypy.org/) and [`GraalPython`](https://github.com/oracle/graalpython)
|
||||||
- Hot Reloading
|
- Hot Reloading
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
# First Ideas for SSGI
|
||||||
|
|
||||||
|
```python
|
||||||
|
from typing import Union, Callable, Awaitable, Optional
|
||||||
|
|
||||||
|
class SSGIHttpResponse:
|
||||||
|
aborted: bool = False, # detect if the connection was aborted
|
||||||
|
extensions: Optional[dict] = None # extensions for http
|
||||||
|
|
||||||
|
# if payload is None, request ends without body
|
||||||
|
# if has_more is True, data is written but connection will not end
|
||||||
|
def send(self, payload: Union[str, bytes, bytearray, memoryview, None], has_more: Optional[bool] = False):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# send chunk of data, can be used to perform with less backpressure than using send
|
||||||
|
# total_size is the sum of all lenghts in bytes of all chunks to be sended
|
||||||
|
# connection will end when total_size is met
|
||||||
|
# returns tuple(bool, bool) first bool represents if the chunk is succefully sended, the second if the connection has ended
|
||||||
|
def send_chunk(self, chunk: Union[str, bytes, bytearray, memoryview], total_size: int = False) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# send status code
|
||||||
|
def send_status(self, status_code: Optional[int] = 200):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# send headers to the http response
|
||||||
|
def send_headers(self, headers: iter(tuple(str, str))):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ensure async call for the handler, passing any arguments to it
|
||||||
|
def run_async(self, handler: Awaitable, *arguments) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# get an all data
|
||||||
|
# returns an BytesIO() or None if no payload is availabl
|
||||||
|
def get_data(self) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# get an chunk of data (chunk size is decided by the Server implementation)
|
||||||
|
# returns the BytesIO or None if no more chunks are sent
|
||||||
|
def get_chunk(self) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# on aborted event, calle when the connection abort
|
||||||
|
def on_aborted(self, handler: Union[Awaitable, Callable], *arguments):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class SSGIWebSocket:
|
||||||
|
status: int = 0 # 0 pending upgrade, 1 rejected, 2 closed, 3 accepted
|
||||||
|
extensions: Optional[dict] = None # extensions for websocket
|
||||||
|
|
||||||
|
# accept the connection upgrade
|
||||||
|
# can pass the protocol to accept if None is informed will use sec-websocket-protocol header if available
|
||||||
|
def accept(self, protocol: str = None) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# reject the connection upgrade, you can send status_code, payload and headers if you want, all optional
|
||||||
|
def reject(self, status_code: Optional[int] = 403, payload: Union[bytes, bytearray, memoryview, None] = None, headers: Optional[iter(tuple(str, str))] = None) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# if returns an future, this can be awaited or not
|
||||||
|
def send(self, payload: Union[bytes, bytearray, memoryview]):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# close connection
|
||||||
|
def close(self, code: Optional[int] = 1000):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ensure async call for the handler, passing any arguments to it
|
||||||
|
def run_async(self, handler: Awaitable, *arguments) -> Awaitable:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# on receive event, called when the socket disconnect
|
||||||
|
# passes ws: SSGIWebSocket, msg: Union[str, bytes, bytearray, memoryview], *arguments
|
||||||
|
def on_receive(self, handler: Union[Awaitable, Callable], *arguments):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# on close event, called when the socket disconnect
|
||||||
|
# passes ws: SSGIWebSocket, code: int and reason: Optional[str] = None, *arguments
|
||||||
|
def on_close(self, handler: Union[Awaitable, Callable], *arguments):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# only accepts sync
|
||||||
|
def wsgi(environ, start_response):
|
||||||
|
pass
|
||||||
|
# only accepts async
|
||||||
|
async def asgi(scope, receive, send):
|
||||||
|
pass
|
||||||
|
# async with less overhead
|
||||||
|
async def rsgi(scope, proto):
|
||||||
|
pass
|
||||||
|
# async and sync can be used
|
||||||
|
def ssgi(type: str, server_address: str, remote_address: str, method: str, path: str, query_string: str, get_header: Callable[[Optional[str]=None], [Union[str, iter(tuple(str, str))]], res: Union[SSGIHttpResponse, SSGIWebSocket]):
|
||||||
|
# this is called once every HTTP request, or when an websocket connection wants to upgrade
|
||||||
|
# type can be http or websocket
|
||||||
|
|
||||||
|
# server_address contains {ipv4|ipv6}:{port} being :{port} optional
|
||||||
|
# remote_address contains {ipv4|ipv6}:{port} being :{port} optional
|
||||||
|
|
||||||
|
# here routers can work without call any header, this can improve performance because headers are not allocated in
|
||||||
|
# if passed get_header() without arguments or None, must return all headers in an dict
|
||||||
|
# all headers must be lowercase
|
||||||
|
# headers will only be preserved until the end of this call or if res.run_async is called
|
||||||
|
# headers are not preserved after websocket accept or reject
|
||||||
|
# if this function is an coroutine, data will be preserved, run_async is automatic
|
||||||
|
pass
|
||||||
|
|
||||||
|
# SSGI do not require that SSGI it self to be implemented, allowing other interfaces to be supported by the Server and Framework as will
|
||||||
|
class SSGIFramework:
|
||||||
|
def get_supported(self, supported_interfaces: dict) -> dict:
|
||||||
|
# supported_interfaces { "asgi": "2.3", "wsgi": "2.0", "ssgi": "1.0", "rsgi": "1.0" }
|
||||||
|
# you can use this to check what interface is available
|
||||||
|
|
||||||
|
# returns http and websocket interface supported by the Web Framework
|
||||||
|
# you can use multiple interfaces one for http and other for websockets with SSGI if the Web Framework and Server supports it
|
||||||
|
# if None is passed, Server will not serve the protocol
|
||||||
|
# tuple(interface_name, interface_handler)
|
||||||
|
return {
|
||||||
|
"http": ( "ssgi", ssgi), #or "asgi", "rsgi", "wsgi",
|
||||||
|
"websockets": ("ssgi", ssgi) #or "asgi", "rsgi"
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
|
@ -24,5 +24,3 @@ app.add_route("/", home)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
#pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker
|
|
|
@ -31,8 +31,9 @@ class SomeResource:
|
||||||
remaining_clients = remaining_clients + 1
|
remaining_clients = remaining_clients + 1
|
||||||
print("remaining_clients", remaining_clients)
|
print("remaining_clients", remaining_clients)
|
||||||
|
|
||||||
|
|
||||||
app = falcon.asgi.App()
|
app = falcon.asgi.App()
|
||||||
|
app.ws_options.max_receive_queue = 20_000_000 # this virtual disables queue but adds overhead
|
||||||
|
app.ws_options.enable_buffered_receiver = False # this disable queue but for now only available on cirospaciari/falcon
|
||||||
app.add_route("/", SomeResource())
|
app.add_route("/", SomeResource())
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -25,23 +25,25 @@ class SomeResource:
|
||||||
try:
|
try:
|
||||||
await ws.accept()
|
await ws.accept()
|
||||||
clients.add(ws)
|
clients.add(ws)
|
||||||
remaining_clients = remaining_clients - 1
|
remaining_clients -= 1
|
||||||
|
print("remaining_clients", remaining_clients)
|
||||||
if remaining_clients == 0:
|
if remaining_clients == 0:
|
||||||
await broadcast("ready")
|
await broadcast("ready")
|
||||||
else:
|
|
||||||
print("remaining_clients", remaining_clients)
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
payload = await ws.receive_text()
|
payload = await ws.receive_text()
|
||||||
await broadcast(payload)
|
if payload:
|
||||||
|
await broadcast(payload)
|
||||||
|
|
||||||
except falcon.WebSocketDisconnected:
|
except falcon.WebSocketDisconnected:
|
||||||
clients.remove(ws)
|
clients.remove(ws)
|
||||||
remaining_clients = remaining_clients + 1
|
remaining_clients += 1
|
||||||
print("remaining_clients", remaining_clients)
|
print("remaining_clients", remaining_clients)
|
||||||
|
|
||||||
|
|
||||||
app = falcon.asgi.App()
|
app = falcon.asgi.App()
|
||||||
|
app.ws_options.max_receive_queue = 20_000_000# this virtual disables queue but adds overhead
|
||||||
|
app.ws_options.enable_buffered_receiver = True # this disable queue but for now only available on cirospaciari/falcon
|
||||||
app.add_route("/", SomeResource())
|
app.add_route("/", SomeResource())
|
||||||
# python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
|
# python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
|
||||||
# pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker
|
# pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker
|
||||||
|
|
|
@ -24,5 +24,4 @@ app.add_route("/", home)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
#pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
from flask import Flask
|
||||||
|
from socketify import WSGI
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
def index():
|
||||||
|
return 'Hello, World!'
|
||||||
|
|
||||||
|
def run_app():
|
||||||
|
WSGI(app, request_response_factory_max_itens=200_000).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
|
@ -21,5 +21,3 @@ async def app(scope, receive, send):
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
# python3 -m gunicorn test:app -w 1 -k uvicorn.workers.UvicornWorker
|
|
|
@ -47,9 +47,10 @@ async def app(scope, receive, send):
|
||||||
else:
|
else:
|
||||||
print("remaining_clients", remaining_clients)
|
print("remaining_clients", remaining_clients)
|
||||||
|
|
||||||
scope = await receive()
|
|
||||||
# get data
|
# get data
|
||||||
while True:
|
while True:
|
||||||
|
scope = await receive()
|
||||||
|
|
||||||
type = scope['type']
|
type = scope['type']
|
||||||
# disconnected!
|
# disconnected!
|
||||||
if type == 'websocket.disconnect':
|
if type == 'websocket.disconnect':
|
||||||
|
@ -57,13 +58,14 @@ async def app(scope, receive, send):
|
||||||
print("remaining_clients", remaining_clients)
|
print("remaining_clients", remaining_clients)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
await send({
|
await send({
|
||||||
'type': 'websocket.publish',
|
'type': 'websocket.publish',
|
||||||
'topic': "all",
|
'topic': "all",
|
||||||
'text': scope.get('text', '')
|
'text': scope.get('text', '')
|
||||||
})
|
})
|
||||||
|
|
||||||
scope = await receive()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -71,4 +73,4 @@ async def app(scope, receive, send):
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
# python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
|
# python3 -m gunicorn raw-ws-bench-pubsub:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
|
||||||
|
|
|
@ -44,12 +44,12 @@ async def app(scope, receive, send):
|
||||||
|
|
||||||
if remaining_clients == 0:
|
if remaining_clients == 0:
|
||||||
await broadcast("ready")
|
await broadcast("ready")
|
||||||
else:
|
|
||||||
print("remaining_clients", remaining_clients)
|
|
||||||
|
|
||||||
scope = await receive()
|
|
||||||
# get data
|
# get data
|
||||||
while True:
|
while True:
|
||||||
|
scope = await receive()
|
||||||
|
|
||||||
type = scope['type']
|
type = scope['type']
|
||||||
# disconnected!
|
# disconnected!
|
||||||
if type == 'websocket.disconnect':
|
if type == 'websocket.disconnect':
|
||||||
|
@ -58,7 +58,7 @@ async def app(scope, receive, send):
|
||||||
break
|
break
|
||||||
|
|
||||||
await broadcast(scope.get('text', ''))
|
await broadcast(scope.get('text', ''))
|
||||||
scope = await receive()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,4 +66,5 @@ async def app(scope, receive, send):
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
|
|
||||||
# python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
|
# python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
|
||||||
|
|
|
@ -6,4 +6,3 @@ def app(environ, start_response):
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
from wsgiref.simple_server import make_server
|
from wsgiref.simple_server import make_server
|
||||||
|
|
||||||
import falcon
|
import falcon
|
||||||
|
# check ./asgi_wsgi/falcon-ws-pubsub.py for pub/sub extension
|
||||||
|
|
||||||
class Home:
|
class Home:
|
||||||
def on_get(self, req, resp):
|
def on_get(self, req, resp):
|
||||||
|
|
|
@ -11,4 +11,4 @@ def h(request):
|
||||||
app.start(port=8000)
|
app.start(port=8000)
|
||||||
|
|
||||||
# python3 ./robyn_plaintext.py --processes 4 --log-level CRITICAL
|
# python3 ./robyn_plaintext.py --processes 4 --log-level CRITICAL
|
||||||
# pypy3 did not compile
|
# pypy3 did not compile
|
|
@ -1,11 +1,12 @@
|
||||||
from socketify import App
|
from socketify import App
|
||||||
import os
|
import os
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
|
||||||
|
|
||||||
def run_app():
|
def run_app():
|
||||||
app = App(request_response_factory_max_itens=200_000)
|
app = App(request_response_factory_max_itens=200_000)
|
||||||
app.get("/", lambda res, req: res.end("Hello, World!"))
|
def home(res, req):
|
||||||
|
res.end("Hello, World!")
|
||||||
|
|
||||||
|
app.get("/", home)
|
||||||
app.listen(
|
app.listen(
|
||||||
8000,
|
8000,
|
||||||
lambda config: print(
|
lambda config: print(
|
||||||
|
@ -26,5 +27,5 @@ def create_fork():
|
||||||
# fork limiting the cpu count - 1
|
# fork limiting the cpu count - 1
|
||||||
for i in range(1, multiprocessing.cpu_count()):
|
for i in range(1, multiprocessing.cpu_count()):
|
||||||
create_fork()
|
create_fork()
|
||||||
|
|
||||||
run_app() # run app on the main process too :)
|
run_app() # run app on the main process too :)
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
from socketify import SSGI
|
||||||
|
|
||||||
|
class Application:
|
||||||
|
def get_supported(self, supported_interfaces):
|
||||||
|
|
||||||
|
def ssgi(type, method, path, query_string, get_header, res):
|
||||||
|
# if type == "http":
|
||||||
|
res.send(b'Hello, World!')
|
||||||
|
# else:
|
||||||
|
# res.reject() # reject websocket connections
|
||||||
|
|
||||||
|
return {
|
||||||
|
"http": ("ssgi" if supported_interfaces.get("ssgi", None) else None, ssgi),
|
||||||
|
# "websocket": ("ssgi" if supported_interfaces.get("ssgi", None) else None, ssgi)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
app = Application()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
SSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
|
||||||
|
|
||||||
|
# python3 -m gunicorn test:app -w 1 -k uvicorn.workers.UvicornWorker
|
|
@ -18,5 +18,5 @@ async def app(scope, receive, send):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# python3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker
|
# python3 -m gunicorn uvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker
|
||||||
# pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker
|
# pypy3 -m gunicorn uvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker
|
||||||
|
|
|
@ -12,4 +12,7 @@ from .asgi import (
|
||||||
from .wsgi import (
|
from .wsgi import (
|
||||||
WSGI
|
WSGI
|
||||||
)
|
)
|
||||||
|
from .ssgi import (
|
||||||
|
SSGI
|
||||||
|
)
|
||||||
from .helpers import sendfile, middleware, MiddlewareRouter
|
from .helpers import sendfile, middleware, MiddlewareRouter
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
from socketify import App, CompressOptions, OpCode
|
from socketify import App, CompressOptions, OpCode
|
||||||
import asyncio
|
|
||||||
from queue import SimpleQueue
|
from queue import SimpleQueue
|
||||||
|
from .native import lib, ffi
|
||||||
|
|
||||||
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
|
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
|
||||||
# re encoding data and headers is really dummy (can be consumed directly by ffi), dict ops are really slow
|
# re encoding data and headers is really dummy (can be consumed directly by ffi), dict ops are really slow
|
||||||
EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False }
|
EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False }
|
||||||
|
@ -10,11 +11,14 @@ class ASGIWebSocket:
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.accept_future = None
|
self.accept_future = None
|
||||||
self.ws = None
|
self.ws = None
|
||||||
|
self._disconnected = False
|
||||||
self.receive_queue = SimpleQueue()
|
self.receive_queue = SimpleQueue()
|
||||||
self.miss_receive_queue = SimpleQueue()
|
self.miss_receive_queue = SimpleQueue()
|
||||||
self.miss_receive_queue.put({
|
self.miss_receive_queue.put({
|
||||||
'type': 'websocket.connect'
|
'type': 'websocket.connect'
|
||||||
}, False)
|
}, False)
|
||||||
|
self._code = None
|
||||||
|
self._message = None
|
||||||
|
|
||||||
def accept(self):
|
def accept(self):
|
||||||
self.accept_future = self.loop.create_future()
|
self.accept_future = self.loop.create_future()
|
||||||
|
@ -30,13 +34,23 @@ class ASGIWebSocket:
|
||||||
if not self.miss_receive_queue.empty():
|
if not self.miss_receive_queue.empty():
|
||||||
future.set_result(self.miss_receive_queue.get(False))
|
future.set_result(self.miss_receive_queue.get(False))
|
||||||
return future
|
return future
|
||||||
|
if self._disconnected:
|
||||||
|
future.set_result({
|
||||||
|
'type': 'websocket.disconnect',
|
||||||
|
'code': self._code,
|
||||||
|
'message': self._message
|
||||||
|
})
|
||||||
|
return future
|
||||||
|
|
||||||
self.receive_queue.put(future, False)
|
self.receive_queue.put(future, False)
|
||||||
return future
|
return future
|
||||||
|
|
||||||
|
|
||||||
def disconnect(self, code, message):
|
def disconnect(self, code, message):
|
||||||
self.ws = None
|
self.ws = None
|
||||||
|
self._disconnected = True
|
||||||
|
self._code = code
|
||||||
|
self._message = message
|
||||||
if not self.receive_queue.empty():
|
if not self.receive_queue.empty():
|
||||||
future = self.receive_queue.get(False)
|
future = self.receive_queue.get(False)
|
||||||
future.set_result({
|
future.set_result({
|
||||||
|
@ -44,18 +58,12 @@ class ASGIWebSocket:
|
||||||
'code': code,
|
'code': code,
|
||||||
'message': message
|
'message': message
|
||||||
})
|
})
|
||||||
else:
|
|
||||||
self.miss_receive_queue.put({
|
|
||||||
'type': 'websocket.disconnect',
|
|
||||||
'code': code,
|
|
||||||
'message': message
|
|
||||||
}, False)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def message(self, ws, value, opcode):
|
def message(self, ws, value, opcode):
|
||||||
self.ws = ws
|
self.ws = ws
|
||||||
if self.receive_queue.empty():
|
if self.receive_queue.empty():
|
||||||
if opcode == OpCode.TEXT:
|
if opcode == OpCode.TEXT:
|
||||||
self.miss_receive_queue.put({
|
self.miss_receive_queue.put({
|
||||||
'type': 'websocket.receive',
|
'type': 'websocket.receive',
|
||||||
|
@ -82,6 +90,111 @@ class ASGIWebSocket:
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def write_header(ssl, res, key, value):
|
||||||
|
if isinstance(key, str):
|
||||||
|
key_data = key.encode("utf-8")
|
||||||
|
elif isinstance(key, bytes):
|
||||||
|
key_data = key
|
||||||
|
|
||||||
|
if isinstance(value, int):
|
||||||
|
lib.uws_res_write_header_int(
|
||||||
|
ssl,
|
||||||
|
res,
|
||||||
|
key_data,
|
||||||
|
len(key_data),
|
||||||
|
ffi.cast("uint64_t", value),
|
||||||
|
)
|
||||||
|
elif isinstance(value, str):
|
||||||
|
value_data = value.encode("utf-8")
|
||||||
|
elif isinstance(value, bytes):
|
||||||
|
value_data = value
|
||||||
|
lib.uws_res_write_header(
|
||||||
|
ssl, res, key_data, len(key_data), value_data, len(value_data)
|
||||||
|
)
|
||||||
|
|
||||||
|
@ffi.callback("void(uws_res_t*, void*)")
|
||||||
|
def uws_asgi_corked_response_start_handler(res, user_data):
|
||||||
|
(ssl, status, headers) = ffi.from_handle(user_data)
|
||||||
|
lib.socketify_res_write_int_status(ssl, res, status)
|
||||||
|
for name, value in headers:
|
||||||
|
write_header(ssl, res, name, value)
|
||||||
|
|
||||||
|
|
||||||
|
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)")
|
||||||
|
def asgi(ssl, response, info, user_data, aborted):
|
||||||
|
app = ffi.from_handle(user_data)
|
||||||
|
|
||||||
|
headers = []
|
||||||
|
next_header = info.header_list
|
||||||
|
while next_header != ffi.NULL:
|
||||||
|
header = next_header[0]
|
||||||
|
headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size)))
|
||||||
|
next_header = ffi.cast("socketify_header*", next_header.next)
|
||||||
|
url = ffi.unpack(info.url, info.url_size)
|
||||||
|
scope = {
|
||||||
|
'type': 'http',
|
||||||
|
'asgi': {
|
||||||
|
'version': '3.0',
|
||||||
|
'spec_version': '2.3'
|
||||||
|
},
|
||||||
|
'http_version': '1.1',
|
||||||
|
'server': (app.SERVER_HOST, app.SERVER_PORT),
|
||||||
|
'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None),
|
||||||
|
'scheme': app.SERVER_SCHEME,
|
||||||
|
'method': ffi.unpack(info.method, info.method_size).decode('utf8'),
|
||||||
|
'root_path': '',
|
||||||
|
'path': url.decode('utf8'),
|
||||||
|
'raw_path': url,
|
||||||
|
'query_string': ffi.unpack(info.query_string, info.query_string_size),
|
||||||
|
'headers': headers
|
||||||
|
}
|
||||||
|
async def receive():
|
||||||
|
if bool(aborted[0]):
|
||||||
|
return { 'type': 'http.disconnect'}
|
||||||
|
# if scope.get("content-length", False) or scope.get("transfer-encoding", False):
|
||||||
|
# data = await res.get_data()
|
||||||
|
# if data:
|
||||||
|
# # all at once but could get in chunks
|
||||||
|
# return {
|
||||||
|
# 'type': 'http.request',
|
||||||
|
# 'body': data.getvalue(),
|
||||||
|
# 'more_body': False
|
||||||
|
# }
|
||||||
|
# no body, just empty
|
||||||
|
return EMPTY_RESPONSE
|
||||||
|
async def send(options):
|
||||||
|
if bool(aborted[0]):
|
||||||
|
return False
|
||||||
|
type = options['type']
|
||||||
|
if type == 'http.response.start':
|
||||||
|
#can also be more native optimized to do it in one GIL call
|
||||||
|
#try socketify_res_write_int_status_with_headers and create and socketify_res_cork_write_int_status_with_headers
|
||||||
|
status_code = options.get('status', 200)
|
||||||
|
headers = options.get('headers', [])
|
||||||
|
cork_data = ffi.new_handle((ssl, status_code, headers))
|
||||||
|
lib.uws_res_cork(ssl, response, uws_asgi_corked_response_start_handler, cork_data)
|
||||||
|
return True
|
||||||
|
|
||||||
|
if type == 'http.response.body':
|
||||||
|
|
||||||
|
#native optimized end/send
|
||||||
|
message = options.get('body', b'')
|
||||||
|
|
||||||
|
if isinstance(message, str):
|
||||||
|
data = message.encode("utf-8")
|
||||||
|
elif isinstance(message, bytes):
|
||||||
|
data = message
|
||||||
|
else:
|
||||||
|
data = b''
|
||||||
|
if options.get('more_body', False):
|
||||||
|
lib.socketify_res_cork_write(ssl, response, data, len(data))
|
||||||
|
else:
|
||||||
|
lib.socketify_res_cork_end(ssl, response, data, len(data), 0)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
app.server.loop.run_async(app.app(scope, receive, send))
|
||||||
class ASGI:
|
class ASGI:
|
||||||
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
|
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
|
||||||
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
|
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
|
||||||
|
@ -91,80 +204,40 @@ class ASGI:
|
||||||
self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws'
|
self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws'
|
||||||
|
|
||||||
self.app = app
|
self.app = app
|
||||||
|
# optimized in native
|
||||||
def asgi(res, req):
|
self._ptr = ffi.new_handle(self)
|
||||||
PATH_INFO = req.get_url()
|
self.asgi_http_info = lib.socketify_add_asgi_http_handler(
|
||||||
FULL_PATH_INFO = req.get_full_url()
|
self.server.SSL,
|
||||||
headers = []
|
self.server.app,
|
||||||
req.for_each_header(lambda name, value: headers.append((name.encode('utf8'), value.encode('utf8'))))
|
asgi,
|
||||||
|
self._ptr
|
||||||
scope = {
|
)
|
||||||
'type': 'http',
|
|
||||||
'asgi': {
|
|
||||||
'version': '3.0',
|
|
||||||
'spec_version': '2.0'
|
|
||||||
},
|
|
||||||
'http_version': '1.1',
|
|
||||||
'server': (self.SERVER_HOST, self.SERVER_PORT),
|
|
||||||
'client': (res.get_remote_address(), None),
|
|
||||||
'scheme': self.SERVER_SCHEME,
|
|
||||||
'method': req.get_method(),
|
|
||||||
'root_path': '',
|
|
||||||
'path': PATH_INFO,
|
|
||||||
'raw_path': PATH_INFO.encode('utf8'),
|
|
||||||
'query_string': FULL_PATH_INFO[len(PATH_INFO):].encode('utf8'),
|
|
||||||
'headers': headers
|
|
||||||
}
|
|
||||||
|
|
||||||
async def receive():
|
|
||||||
if res.aborted:
|
|
||||||
return { 'type': 'http.disconnect'}
|
|
||||||
|
|
||||||
if scope.get("content-length", False) or scope.get("transfer-encoding", False):
|
|
||||||
data = await res.get_data()
|
|
||||||
if data:
|
|
||||||
# all at once but could get in chunks
|
|
||||||
return {
|
|
||||||
'type': 'http.request',
|
|
||||||
'body': data.getvalue(),
|
|
||||||
'more_body': False
|
|
||||||
}
|
|
||||||
# no body, just empty
|
|
||||||
return EMPTY_RESPONSE
|
|
||||||
|
|
||||||
async def send(options):
|
|
||||||
if res.aborted: return False
|
|
||||||
type = options['type']
|
|
||||||
if type == 'http.response.start':
|
|
||||||
res.write_status(options.get('status', 200))
|
|
||||||
for header in options.get('headers', []):
|
|
||||||
res.write_header(header[0], header[1])
|
|
||||||
return True
|
|
||||||
|
|
||||||
if type == 'http.response.body':
|
|
||||||
if options.get('more_body', False):
|
|
||||||
res.write(options.get('body', ""))
|
|
||||||
else:
|
|
||||||
res.cork_end(options.get('body', ""))
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
#grab handler
|
|
||||||
res.grab_aborted_handler()
|
|
||||||
asyncio.ensure_future(app(scope, receive, send))
|
|
||||||
self.server.any("/*", asgi)
|
|
||||||
|
|
||||||
def ws_upgrade(res, req, socket_context):
|
def ws_upgrade(res, req, socket_context):
|
||||||
PATH_INFO = req.get_url()
|
info = lib.socketify_asgi_ws_request(res.SSL, req.req, res.res)
|
||||||
FULL_PATH_INFO = req.get_full_url()
|
|
||||||
headers = []
|
headers = []
|
||||||
def filtered_headers(name, value):
|
next_header = info.header_list
|
||||||
if name != "sec-websocket-protocol":
|
while next_header != ffi.NULL:
|
||||||
headers.append((name.encode('utf8'), value.encode('utf8')))
|
header = next_header[0]
|
||||||
|
headers.append((ffi.unpack(header.name, header.name_size),ffi.unpack(header.value, header.value_size)))
|
||||||
|
next_header = ffi.cast("socketify_header*", next_header.next)
|
||||||
|
|
||||||
req.for_each_header(filtered_headers)
|
url = ffi.unpack(info.url, info.url_size)
|
||||||
key = req.get_header("sec-websocket-key")
|
|
||||||
protocol = req.get_header("sec-websocket-protocol")
|
if info.key == ffi.NULL:
|
||||||
extensions = req.get_header("sec-websocket-extensions")
|
key = None
|
||||||
|
else:
|
||||||
|
key = ffi.unpack(info.key, info.key_size).decode('utf8')
|
||||||
|
|
||||||
|
if info.protocol == ffi.NULL:
|
||||||
|
protocol = None
|
||||||
|
else:
|
||||||
|
protocol = ffi.unpack(info.protocol, info.protocol_size).decode('utf8')
|
||||||
|
if info.extensions == ffi.NULL:
|
||||||
|
extensions = None
|
||||||
|
else:
|
||||||
|
extensions = ffi.unpack(info.extensions, info.extensions_size).decode('utf8')
|
||||||
|
|
||||||
ws = ASGIWebSocket(self.server.loop)
|
ws = ASGIWebSocket(self.server.loop)
|
||||||
|
|
||||||
|
@ -172,30 +245,29 @@ class ASGI:
|
||||||
'type': 'websocket',
|
'type': 'websocket',
|
||||||
'asgi': {
|
'asgi': {
|
||||||
'version': '3.0',
|
'version': '3.0',
|
||||||
'spec_version': '2.0'
|
'spec_version': '2.3'
|
||||||
},
|
},
|
||||||
'http_version': '1.1',
|
'http_version': '1.1',
|
||||||
'server': (self.SERVER_HOST, self.SERVER_PORT),
|
'server': (self.SERVER_HOST, self.SERVER_PORT),
|
||||||
'client': (res.get_remote_address(), None),
|
'client': (ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8'), None),
|
||||||
'scheme': self.SERVER_WS_SCHEME,
|
'scheme': self.SERVER_WS_SCHEME,
|
||||||
'method': req.get_method(),
|
'method': ffi.unpack(info.method, info.method_size).decode('utf8'),
|
||||||
'root_path': '',
|
'root_path': '',
|
||||||
'path': PATH_INFO,
|
'path': url.decode('utf8'),
|
||||||
'raw_path': PATH_INFO.encode('utf8'),
|
'raw_path': url,
|
||||||
'query_string': FULL_PATH_INFO[len(PATH_INFO):].encode('utf8'),
|
'query_string': ffi.unpack(info.query_string, info.query_string_size),
|
||||||
'headers': headers,
|
'headers': headers,
|
||||||
'subprotocols': [protocol] if protocol else [],
|
'subprotocols': [protocol] if protocol else [],
|
||||||
'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True }
|
'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True }
|
||||||
}
|
}
|
||||||
server = self.server
|
lib.socketify_destroy_headers(info.header_list)
|
||||||
async def send(options):
|
async def send(options):
|
||||||
nonlocal ws, res, server
|
|
||||||
|
|
||||||
if res.aborted: return False
|
if res.aborted: return False
|
||||||
type = options['type']
|
type = options['type']
|
||||||
if type == 'websocket.send':
|
if type == 'websocket.send':
|
||||||
bytes = options.get("bytes", None)
|
bytes = options.get("bytes", None)
|
||||||
if ws.ws:
|
|
||||||
|
if ws.ws:
|
||||||
if bytes:
|
if bytes:
|
||||||
ws.ws.cork_send(bytes, OpCode.BINARY)
|
ws.ws.cork_send(bytes, OpCode.BINARY)
|
||||||
else:
|
else:
|
||||||
|
@ -204,8 +276,13 @@ class ASGI:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if type == 'websocket.accept': # upgrade!
|
if type == 'websocket.accept': # upgrade!
|
||||||
for header in options.get('headers', []):
|
res_headers = options.get('headers', None)
|
||||||
res.write_header(header[0], header[1])
|
def corked(res):
|
||||||
|
for header in res_headers:
|
||||||
|
res.write_header(header[0], header[1])
|
||||||
|
if res_headers:
|
||||||
|
res.cork(corked)
|
||||||
|
|
||||||
future = ws.accept()
|
future = ws.accept()
|
||||||
upgrade_protocol = options.get('subprotocol', protocol)
|
upgrade_protocol = options.get('subprotocol', protocol)
|
||||||
res.upgrade(key, upgrade_protocol if upgrade_protocol else "", extensions, socket_context, ws)
|
res.upgrade(key, upgrade_protocol if upgrade_protocol else "", extensions, socket_context, ws)
|
||||||
|
@ -213,29 +290,26 @@ class ASGI:
|
||||||
|
|
||||||
if type == 'websocket.close': # code and reason?
|
if type == 'websocket.close': # code and reason?
|
||||||
if ws.ws: ws.ws.close()
|
if ws.ws: ws.ws.close()
|
||||||
else: res.write_status(403).end_without_body()
|
else: res.cork(lambda res: res.write_status(403).end_without_body())
|
||||||
return True
|
return True
|
||||||
if type == 'websocket.publish': # publish extension
|
if type == 'websocket.publish': # publish extension
|
||||||
bytes = options.get("bytes", None)
|
bytes = options.get("bytes", None)
|
||||||
if bytes:
|
if bytes:
|
||||||
server.publish(options.get('topic'), bytes)
|
self.server.publish(options.get('topic'), bytes)
|
||||||
else:
|
else:
|
||||||
server.publish(options.get('topic'), options.get('text'), OpCode.TEXT)
|
self.server.publish(options.get('topic'), options.get('text', ''), OpCode.TEXT)
|
||||||
return True
|
return True
|
||||||
if type == 'websocket.subscribe': # subscribe extension
|
if type == 'websocket.subscribe': # subscribe extension
|
||||||
if ws.ws: ws.ws.subscribe(options.get('topic'))
|
if ws.ws: ws.ws.subscribe(options.get('topic'))
|
||||||
else: res.write_status(403).end_without_body()
|
else: res.cork(lambda res: res.write_status(403).end_without_body())
|
||||||
return True
|
return True
|
||||||
if type == 'websocket.unsubscribe': # unsubscribe extension
|
if type == 'websocket.unsubscribe': # unsubscribe extension
|
||||||
if ws.ws: ws.ws.unsubscribe(options.get('topic'))
|
if ws.ws: ws.ws.unsubscribe(options.get('topic'))
|
||||||
else: res.write_status(403).end_without_body()
|
else: res.cork(lambda res: res.write_status(403).end_without_body())
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
res.run_async(app(scope, ws.receive, send))
|
||||||
#grab handler
|
|
||||||
res.grab_aborted_handler()
|
|
||||||
asyncio.ensure_future(app(scope, ws.receive, send))
|
|
||||||
|
|
||||||
|
|
||||||
self.server.ws("/*", {
|
self.server.ws("/*", {
|
||||||
|
@ -255,4 +329,8 @@ class ASGI:
|
||||||
return self
|
return self
|
||||||
def run(self):
|
def run(self):
|
||||||
self.server.run()
|
self.server.run()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if self.asgi_http_info:
|
||||||
|
lib.socketify_destroy_asgi_app_info(self.asgi_http_info)
|
Plik binarny nie jest wyświetlany.
|
@ -25,7 +25,7 @@ def future_handler(future, loop, exception_handler, response):
|
||||||
|
|
||||||
class Loop:
|
class Loop:
|
||||||
def __init__(self, exception_handler=None):
|
def __init__(self, exception_handler=None):
|
||||||
self.loop = asyncio.new_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
self.uv_loop = UVLoop()
|
self.uv_loop = UVLoop()
|
||||||
|
|
||||||
if hasattr(exception_handler, "__call__"):
|
if hasattr(exception_handler, "__call__"):
|
||||||
|
@ -36,9 +36,7 @@ class Loop:
|
||||||
else:
|
else:
|
||||||
self.exception_handler = None
|
self.exception_handler = None
|
||||||
|
|
||||||
asyncio.set_event_loop(self.loop)
|
|
||||||
self.started = False
|
self.started = False
|
||||||
self.last_defer = False
|
|
||||||
|
|
||||||
def set_timeout(self, timeout, callback, user_data):
|
def set_timeout(self, timeout, callback, user_data):
|
||||||
return self.uv_loop.create_timer(timeout, 0, callback, user_data)
|
return self.uv_loop.create_timer(timeout, 0, callback, user_data)
|
||||||
|
@ -46,14 +44,14 @@ class Loop:
|
||||||
def create_future(self):
|
def create_future(self):
|
||||||
return self.loop.create_future()
|
return self.loop.create_future()
|
||||||
|
|
||||||
def keep_alive(self):
|
def _keep_alive(self):
|
||||||
if self.started:
|
if self.started:
|
||||||
self.uv_loop.run_once()
|
self.uv_loop.run_once()
|
||||||
self.loop.call_soon(self.keep_alive)
|
self.loop.call_soon(self._keep_alive)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.started = True
|
self.started = True
|
||||||
self.loop.call_soon(self.keep_alive)
|
self.loop.call_soon(self._keep_alive)
|
||||||
self.loop.run_forever()
|
self.loop.run_forever()
|
||||||
# clean up uvloop
|
# clean up uvloop
|
||||||
self.uv_loop.stop()
|
self.uv_loop.stop()
|
||||||
|
@ -74,6 +72,7 @@ class Loop:
|
||||||
# Exposes native loop for uWS
|
# Exposes native loop for uWS
|
||||||
def get_native_loop(self):
|
def get_native_loop(self):
|
||||||
return self.uv_loop.get_native_loop()
|
return self.uv_loop.get_native_loop()
|
||||||
|
|
||||||
|
|
||||||
def run_async(self, task, response=None):
|
def run_async(self, task, response=None):
|
||||||
# with run_once
|
# with run_once
|
||||||
|
|
|
@ -0,0 +1,386 @@
|
||||||
|
import cffi
|
||||||
|
import platform
|
||||||
|
import os
|
||||||
|
|
||||||
|
ffi = cffi.FFI()
|
||||||
|
ffi.cdef(
|
||||||
|
"""
|
||||||
|
|
||||||
|
struct us_socket_context_options_t {
|
||||||
|
const char *key_file_name;
|
||||||
|
const char *cert_file_name;
|
||||||
|
const char *passphrase;
|
||||||
|
const char *dh_params_file_name;
|
||||||
|
const char *ca_file_name;
|
||||||
|
const char *ssl_ciphers;
|
||||||
|
int ssl_prefer_low_memory_usage;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct us_socket_context_t {
|
||||||
|
struct us_loop_t *loop;
|
||||||
|
unsigned short timestamp;
|
||||||
|
struct us_socket_t *head;
|
||||||
|
struct us_socket_t *iterator;
|
||||||
|
struct us_socket_context_t *prev, *next;
|
||||||
|
struct us_socket_t *(*on_open)(struct us_socket_t *, int is_client, char *ip, int ip_length);
|
||||||
|
struct us_socket_t *(*on_data)(struct us_socket_t *, char *data, int length);
|
||||||
|
struct us_socket_t *(*on_writable)(struct us_socket_t *);
|
||||||
|
struct us_socket_t *(*on_close)(struct us_socket_t *, int code, void *reason);
|
||||||
|
struct us_socket_t *(*on_socket_timeout)(struct us_socket_t *);
|
||||||
|
struct us_socket_t *(*on_end)(struct us_socket_t *);
|
||||||
|
struct us_socket_t *(*on_connect_error)(struct us_socket_t *, int code);
|
||||||
|
int (*is_low_prio)(struct us_socket_t *);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct us_poll_t {
|
||||||
|
struct {
|
||||||
|
signed int fd : 28;
|
||||||
|
unsigned int poll_type : 4;
|
||||||
|
} state;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct us_socket_t {
|
||||||
|
struct us_poll_t p;
|
||||||
|
struct us_socket_context_t *context;
|
||||||
|
struct us_socket_t *prev, *next;
|
||||||
|
unsigned short timeout : 14;
|
||||||
|
unsigned short low_prio_state : 2;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct us_listen_socket_t {
|
||||||
|
struct us_socket_t s;
|
||||||
|
unsigned int socket_ext_size;
|
||||||
|
};
|
||||||
|
void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls);
|
||||||
|
int us_socket_local_port(int ssl, struct us_listen_socket_t *ls);
|
||||||
|
struct us_loop_t *uws_get_loop();
|
||||||
|
struct us_loop_t *uws_get_loop_with_native(void* existing_native_loop);
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
_COMPRESSOR_MASK = 0x00FF,
|
||||||
|
_DECOMPRESSOR_MASK = 0x0F00,
|
||||||
|
DISABLED = 0,
|
||||||
|
SHARED_COMPRESSOR = 1,
|
||||||
|
SHARED_DECOMPRESSOR = 1 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_32KB = 15 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_16KB = 14 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_8KB = 13 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_4KB = 12 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_2KB = 11 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_1KB = 10 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR_512B = 9 << 8,
|
||||||
|
DEDICATED_DECOMPRESSOR = 15 << 8,
|
||||||
|
DEDICATED_COMPRESSOR_3KB = 9 << 4 | 1,
|
||||||
|
DEDICATED_COMPRESSOR_4KB = 9 << 4 | 2,
|
||||||
|
DEDICATED_COMPRESSOR_8KB = 10 << 4 | 3,
|
||||||
|
DEDICATED_COMPRESSOR_16KB = 11 << 4 | 4,
|
||||||
|
DEDICATED_COMPRESSOR_32KB = 12 << 4 | 5,
|
||||||
|
DEDICATED_COMPRESSOR_64KB = 13 << 4 | 6,
|
||||||
|
DEDICATED_COMPRESSOR_128KB = 14 << 4 | 7,
|
||||||
|
DEDICATED_COMPRESSOR_256KB = 15 << 4 | 8,
|
||||||
|
DEDICATED_COMPRESSOR = 15 << 4 | 8
|
||||||
|
} uws_compress_options_t;
|
||||||
|
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
CONTINUATION = 0,
|
||||||
|
TEXT = 1,
|
||||||
|
BINARY = 2,
|
||||||
|
CLOSE = 8,
|
||||||
|
PING = 9,
|
||||||
|
PONG = 10
|
||||||
|
} uws_opcode_t;
|
||||||
|
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
BACKPRESSURE,
|
||||||
|
SUCCESS,
|
||||||
|
DROPPED
|
||||||
|
} uws_sendstatus_t;
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
int port;
|
||||||
|
const char *host;
|
||||||
|
int options;
|
||||||
|
} uws_app_listen_config_t;
|
||||||
|
|
||||||
|
struct uws_app_s;
|
||||||
|
struct uws_req_s;
|
||||||
|
struct uws_res_s;
|
||||||
|
struct uws_websocket_s;
|
||||||
|
struct uws_header_iterator_s;
|
||||||
|
typedef struct uws_app_s uws_app_t;
|
||||||
|
typedef struct uws_req_s uws_req_t;
|
||||||
|
typedef struct uws_res_s uws_res_t;
|
||||||
|
typedef struct uws_socket_context_s uws_socket_context_t;
|
||||||
|
typedef struct uws_websocket_s uws_websocket_t;
|
||||||
|
|
||||||
|
typedef void (*uws_websocket_handler)(uws_websocket_t *ws, void* user_data);
|
||||||
|
typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, void* user_data);
|
||||||
|
typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data);
|
||||||
|
typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data);
|
||||||
|
typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data);
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
uws_compress_options_t compression;
|
||||||
|
unsigned int maxPayloadLength;
|
||||||
|
unsigned short idleTimeout;
|
||||||
|
unsigned int maxBackpressure;
|
||||||
|
bool closeOnBackpressureLimit;
|
||||||
|
bool resetIdleTimeoutOnSend;
|
||||||
|
bool sendPingsAutomatically;
|
||||||
|
unsigned short maxLifetime;
|
||||||
|
uws_websocket_upgrade_handler upgrade;
|
||||||
|
uws_websocket_handler open;
|
||||||
|
uws_websocket_message_handler message;
|
||||||
|
uws_websocket_handler drain;
|
||||||
|
uws_websocket_ping_pong_handler ping;
|
||||||
|
uws_websocket_ping_pong_handler pong;
|
||||||
|
uws_websocket_close_handler close;
|
||||||
|
} uws_socket_behavior_t;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
bool ok;
|
||||||
|
bool has_responded;
|
||||||
|
} uws_try_end_result_t;
|
||||||
|
|
||||||
|
typedef void (*uws_listen_handler)(struct us_listen_socket_t *listen_socket, uws_app_listen_config_t config, void *user_data);
|
||||||
|
typedef void (*uws_method_handler)(uws_res_t *response, uws_req_t *request, void *user_data);
|
||||||
|
typedef void (*uws_filter_handler)(uws_res_t *response, int, void *user_data);
|
||||||
|
typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data);
|
||||||
|
typedef void (*uws_get_headers_server_handler)(const char *header_name, size_t header_name_size, const char *header_value, size_t header_value_size, void *user_data);
|
||||||
|
|
||||||
|
|
||||||
|
uws_app_t *uws_create_app(int ssl, struct us_socket_context_options_t options);
|
||||||
|
void uws_app_destroy(int ssl, uws_app_t *app);
|
||||||
|
void uws_app_get(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_post(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_options(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_patch(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_put(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_head(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_connect(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_trace(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
void uws_app_any(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
||||||
|
|
||||||
|
void uws_app_run(int ssl, uws_app_t *);
|
||||||
|
|
||||||
|
void uws_app_listen(int ssl, uws_app_t *app, int port, uws_listen_handler handler, void *user_data);
|
||||||
|
void uws_app_listen_with_config(int ssl, uws_app_t *app, uws_app_listen_config_t config, uws_listen_handler handler, void *user_data);
|
||||||
|
bool uws_constructor_failed(int ssl, uws_app_t *app);
|
||||||
|
unsigned int uws_num_subscribers(int ssl, uws_app_t *app, const char *topic, size_t topic_length);
|
||||||
|
bool uws_publish(int ssl, uws_app_t *app, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress);
|
||||||
|
void *uws_get_native_handle(int ssl, uws_app_t *app);
|
||||||
|
void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length);
|
||||||
|
void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length);
|
||||||
|
void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options);
|
||||||
|
void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data);
|
||||||
|
void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data);
|
||||||
|
|
||||||
|
|
||||||
|
void uws_res_end(int ssl, uws_res_t *res, const char *data, size_t length, bool close_connection);
|
||||||
|
void uws_res_pause(int ssl, uws_res_t *res);
|
||||||
|
void uws_res_resume(int ssl, uws_res_t *res);
|
||||||
|
void uws_res_write_continue(int ssl, uws_res_t *res);
|
||||||
|
void uws_res_write_status(int ssl, uws_res_t *res, const char *status, size_t length);
|
||||||
|
void uws_res_write_header(int ssl, uws_res_t *res, const char *key, size_t key_length, const char *value, size_t value_length);
|
||||||
|
void uws_res_override_write_offset(int ssl, uws_res_t *res, uintmax_t offset);
|
||||||
|
|
||||||
|
void uws_res_write_header_int(int ssl, uws_res_t *res, const char *key, size_t key_length, uint64_t value);
|
||||||
|
void uws_res_end_without_body(int ssl, uws_res_t *res, bool close_connection);
|
||||||
|
bool uws_res_write(int ssl, uws_res_t *res, const char *data, size_t length);
|
||||||
|
uintmax_t uws_res_get_write_offset(int ssl, uws_res_t *res);
|
||||||
|
void *uws_res_get_native_handle(int ssl, uws_res_t *res);
|
||||||
|
bool uws_res_has_responded(int ssl, uws_res_t *res);
|
||||||
|
void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data);
|
||||||
|
void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), void *opcional_data);
|
||||||
|
void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data);
|
||||||
|
void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws);
|
||||||
|
uws_try_end_result_t uws_res_try_end(int ssl, uws_res_t *res, const char *data, size_t length, uintmax_t total_size, bool close_connection);
|
||||||
|
void uws_res_cork(int ssl, uws_res_t *res,void(*callback)(uws_res_t *res, void* user_data) ,void* user_data);
|
||||||
|
size_t uws_res_get_remote_address(int ssl, uws_res_t *res, const char **dest);
|
||||||
|
size_t uws_res_get_remote_address_as_text(int ssl, uws_res_t *res, const char **dest);
|
||||||
|
size_t uws_res_get_proxied_remote_address(int ssl, uws_res_t *res, const char **dest);
|
||||||
|
size_t uws_res_get_proxied_remote_address_as_text(int ssl, uws_res_t *res, const char **dest);
|
||||||
|
|
||||||
|
bool uws_req_is_ancient(uws_req_t *res);
|
||||||
|
bool uws_req_get_yield(uws_req_t *res);
|
||||||
|
void uws_req_set_field(uws_req_t *res, bool yield);
|
||||||
|
size_t uws_req_get_url(uws_req_t *res, const char **dest);
|
||||||
|
size_t uws_req_get_method(uws_req_t *res, const char **dest);
|
||||||
|
size_t uws_req_get_case_sensitive_method(uws_req_t *res, const char **dest);
|
||||||
|
|
||||||
|
size_t uws_req_get_header(uws_req_t *res, const char *lower_case_header, size_t lower_case_header_length, const char **dest);
|
||||||
|
size_t uws_req_get_query(uws_req_t *res, const char *key, size_t key_length, const char **dest);
|
||||||
|
size_t uws_req_get_parameter(uws_req_t *res, unsigned short index, const char **dest);
|
||||||
|
size_t uws_req_get_full_url(uws_req_t *res, const char **dest);
|
||||||
|
void uws_req_for_each_header(uws_req_t *res, uws_get_headers_server_handler handler, void *user_data);
|
||||||
|
|
||||||
|
void uws_ws(int ssl, uws_app_t *app, const char *pattern, uws_socket_behavior_t behavior, void* user_data);
|
||||||
|
void *uws_ws_get_user_data(int ssl, uws_websocket_t *ws);
|
||||||
|
void uws_ws_close(int ssl, uws_websocket_t *ws);
|
||||||
|
uws_sendstatus_t uws_ws_send(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode);
|
||||||
|
uws_sendstatus_t uws_ws_send_with_options(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress, bool fin);
|
||||||
|
uws_sendstatus_t uws_ws_send_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress);
|
||||||
|
uws_sendstatus_t uws_ws_send_first_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress);
|
||||||
|
uws_sendstatus_t uws_ws_send_first_fragment_with_opcode(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress);
|
||||||
|
uws_sendstatus_t uws_ws_send_last_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress);
|
||||||
|
void uws_ws_end(int ssl, uws_websocket_t *ws, int code, const char *message, size_t length);
|
||||||
|
void uws_ws_cork(int ssl, uws_websocket_t *ws, void (*handler)(void *user_data), void *user_data);
|
||||||
|
|
||||||
|
bool uws_ws_subscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length);
|
||||||
|
bool uws_ws_unsubscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length);
|
||||||
|
bool uws_ws_is_subscribed(int ssl, uws_websocket_t *ws, const char *topic, size_t length);
|
||||||
|
void uws_ws_iterate_topics(int ssl, uws_websocket_t *ws, void (*callback)(const char *topic, size_t length, void *user_data), void *user_data);
|
||||||
|
bool uws_ws_publish(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length);
|
||||||
|
bool uws_ws_publish_with_options(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress);
|
||||||
|
int uws_ws_get_buffered_amount(int ssl, uws_websocket_t *ws);
|
||||||
|
size_t uws_ws_get_remote_address(int ssl, uws_websocket_t *ws, const char **dest);
|
||||||
|
size_t uws_ws_get_remote_address_as_text(int ssl, uws_websocket_t *ws, const char **dest);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
typedef void (*socketify_prepare_handler)(void* user_data);
|
||||||
|
typedef void (*socketify_timer_handler)(void* user_data);
|
||||||
|
typedef void (*socketify_async_handler)(void* user_data);
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
SOCKETIFY_RUN_DEFAULT = 0,
|
||||||
|
SOCKETIFY_RUN_ONCE,
|
||||||
|
SOCKETIFY_RUN_NOWAIT
|
||||||
|
} socketify_run_mode;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
void* uv_prepare_ptr;
|
||||||
|
socketify_prepare_handler on_prepare_handler;
|
||||||
|
void* on_prepare_data;
|
||||||
|
void* uv_loop;
|
||||||
|
} socketify_loop;
|
||||||
|
|
||||||
|
typedef struct{
|
||||||
|
void* uv_timer_ptr;
|
||||||
|
socketify_timer_handler handler;
|
||||||
|
void* user_data;
|
||||||
|
} socketify_timer;
|
||||||
|
|
||||||
|
typedef struct{
|
||||||
|
void* uv_async_ptr;
|
||||||
|
socketify_async_handler handler;
|
||||||
|
void* user_data;
|
||||||
|
} socketify_async;
|
||||||
|
|
||||||
|
socketify_loop * socketify_create_loop();
|
||||||
|
bool socketify_constructor_failed(socketify_loop* loop);
|
||||||
|
bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data);
|
||||||
|
bool socketify_prepare_unbind(socketify_loop* loop);
|
||||||
|
void socketify_destroy_loop(socketify_loop* loop);
|
||||||
|
void* socketify_get_native_loop(socketify_loop* loop);
|
||||||
|
|
||||||
|
int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode);
|
||||||
|
void socketify_loop_stop(socketify_loop* loop);
|
||||||
|
|
||||||
|
socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, uint64_t repeat, socketify_timer_handler handler, void* user_data);
|
||||||
|
void socketify_timer_destroy(socketify_timer* timer);
|
||||||
|
bool socketify_async_call(socketify_loop* loop, socketify_async_handler handler, void* user_data);
|
||||||
|
void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat);
|
||||||
|
|
||||||
|
|
||||||
|
socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data);
|
||||||
|
void socketify_check_destroy(socketify_timer* timer);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
|
||||||
|
const char* name;
|
||||||
|
const char* value;
|
||||||
|
|
||||||
|
size_t name_size;
|
||||||
|
size_t value_size;
|
||||||
|
|
||||||
|
void* next;
|
||||||
|
} socketify_header;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
|
||||||
|
const char* full_url;
|
||||||
|
const char* url;
|
||||||
|
const char* query_string;
|
||||||
|
const char* method;
|
||||||
|
const char* remote_address;
|
||||||
|
|
||||||
|
size_t full_url_size;
|
||||||
|
size_t url_size;
|
||||||
|
size_t query_string_size;
|
||||||
|
size_t method_size;
|
||||||
|
size_t remote_address_size;
|
||||||
|
|
||||||
|
socketify_header* header_list;
|
||||||
|
} socketify_asgi_data;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
|
||||||
|
const char* full_url;
|
||||||
|
const char* url;
|
||||||
|
const char* query_string;
|
||||||
|
const char* method;
|
||||||
|
const char* remote_address;
|
||||||
|
|
||||||
|
size_t full_url_size;
|
||||||
|
size_t url_size;
|
||||||
|
size_t query_string_size;
|
||||||
|
size_t method_size;
|
||||||
|
size_t remote_address_size;
|
||||||
|
|
||||||
|
const char* protocol;
|
||||||
|
const char* key;
|
||||||
|
const char* extensions;
|
||||||
|
size_t protocol_size;
|
||||||
|
size_t key_size;
|
||||||
|
size_t extensions_size;
|
||||||
|
|
||||||
|
socketify_header* header_list;
|
||||||
|
} socketify_asgi_ws_data;
|
||||||
|
|
||||||
|
typedef void (*socketify_asgi_method_handler)(int ssl, uws_res_t *response, socketify_asgi_data request, void *user_data, bool* aborted);
|
||||||
|
typedef struct {
|
||||||
|
int ssl;
|
||||||
|
uws_app_t* app;
|
||||||
|
socketify_asgi_method_handler handler;
|
||||||
|
void * user_data;
|
||||||
|
} socksocketify_asgi_app_info;
|
||||||
|
|
||||||
|
|
||||||
|
socketify_asgi_data socketify_asgi_request(int ssl, uws_req_t *req, uws_res_t *res);
|
||||||
|
void socketify_destroy_headers(socketify_header* headers);
|
||||||
|
bool socketify_res_write_int_status_with_headers(int ssl, uws_res_t* res, int code, socketify_header* headers);
|
||||||
|
void socketify_res_write_headers(int ssl, uws_res_t* res, socketify_header* headers);
|
||||||
|
socketify_asgi_ws_data socketify_asgi_ws_request(int ssl, uws_req_t *req, uws_res_t *res);
|
||||||
|
|
||||||
|
socksocketify_asgi_app_info* socketify_add_asgi_http_handler(int ssl, uws_app_t* app, socketify_asgi_method_handler handler, void* user_data);
|
||||||
|
void socketify_destroy_asgi_app_info(socksocketify_asgi_app_info* app);
|
||||||
|
|
||||||
|
void socketify_res_cork_write(int ssl, uws_res_t *response, const char* data, size_t length);
|
||||||
|
void socketify_res_cork_end(int ssl, uws_res_t *response, const char* data, size_t length, bool close_connection);
|
||||||
|
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
library_extension = "dll" if platform.system().lower() == "windows" else "so"
|
||||||
|
library_path = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"libsocketify_%s_%s.%s"
|
||||||
|
% (
|
||||||
|
platform.system().lower(),
|
||||||
|
"arm64" if "arm" in platform.processor().lower() else "amd64",
|
||||||
|
library_extension,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
lib = ffi.dlopen(library_path)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import uuid
|
||||||
from urllib.parse import parse_qs, quote_plus, unquote_plus
|
from urllib.parse import parse_qs, quote_plus, unquote_plus
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from .native import ffi, lib
|
||||||
from .loop import Loop
|
from .loop import Loop
|
||||||
from .status_codes import status_codes
|
from .status_codes import status_codes
|
||||||
from .helpers import static_route
|
from .helpers import static_route
|
||||||
|
@ -22,261 +23,6 @@ mimetypes.init()
|
||||||
|
|
||||||
is_python = platform.python_implementation() == "CPython"
|
is_python = platform.python_implementation() == "CPython"
|
||||||
|
|
||||||
ffi = cffi.FFI()
|
|
||||||
ffi.cdef(
|
|
||||||
"""
|
|
||||||
|
|
||||||
struct us_socket_context_options_t {
|
|
||||||
const char *key_file_name;
|
|
||||||
const char *cert_file_name;
|
|
||||||
const char *passphrase;
|
|
||||||
const char *dh_params_file_name;
|
|
||||||
const char *ca_file_name;
|
|
||||||
const char *ssl_ciphers;
|
|
||||||
int ssl_prefer_low_memory_usage;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct us_socket_context_t {
|
|
||||||
struct us_loop_t *loop;
|
|
||||||
unsigned short timestamp;
|
|
||||||
struct us_socket_t *head;
|
|
||||||
struct us_socket_t *iterator;
|
|
||||||
struct us_socket_context_t *prev, *next;
|
|
||||||
struct us_socket_t *(*on_open)(struct us_socket_t *, int is_client, char *ip, int ip_length);
|
|
||||||
struct us_socket_t *(*on_data)(struct us_socket_t *, char *data, int length);
|
|
||||||
struct us_socket_t *(*on_writable)(struct us_socket_t *);
|
|
||||||
struct us_socket_t *(*on_close)(struct us_socket_t *, int code, void *reason);
|
|
||||||
struct us_socket_t *(*on_socket_timeout)(struct us_socket_t *);
|
|
||||||
struct us_socket_t *(*on_end)(struct us_socket_t *);
|
|
||||||
struct us_socket_t *(*on_connect_error)(struct us_socket_t *, int code);
|
|
||||||
int (*is_low_prio)(struct us_socket_t *);
|
|
||||||
};
|
|
||||||
|
|
||||||
struct us_poll_t {
|
|
||||||
struct {
|
|
||||||
signed int fd : 28;
|
|
||||||
unsigned int poll_type : 4;
|
|
||||||
} state;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct us_socket_t {
|
|
||||||
struct us_poll_t p;
|
|
||||||
struct us_socket_context_t *context;
|
|
||||||
struct us_socket_t *prev, *next;
|
|
||||||
unsigned short timeout : 14;
|
|
||||||
unsigned short low_prio_state : 2;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct us_listen_socket_t {
|
|
||||||
struct us_socket_t s;
|
|
||||||
unsigned int socket_ext_size;
|
|
||||||
};
|
|
||||||
void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls);
|
|
||||||
int us_socket_local_port(int ssl, struct us_listen_socket_t *ls);
|
|
||||||
struct us_loop_t *uws_get_loop();
|
|
||||||
struct us_loop_t *uws_get_loop_with_native(void* existing_native_loop);
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
_COMPRESSOR_MASK = 0x00FF,
|
|
||||||
_DECOMPRESSOR_MASK = 0x0F00,
|
|
||||||
DISABLED = 0,
|
|
||||||
SHARED_COMPRESSOR = 1,
|
|
||||||
SHARED_DECOMPRESSOR = 1 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_32KB = 15 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_16KB = 14 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_8KB = 13 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_4KB = 12 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_2KB = 11 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_1KB = 10 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR_512B = 9 << 8,
|
|
||||||
DEDICATED_DECOMPRESSOR = 15 << 8,
|
|
||||||
DEDICATED_COMPRESSOR_3KB = 9 << 4 | 1,
|
|
||||||
DEDICATED_COMPRESSOR_4KB = 9 << 4 | 2,
|
|
||||||
DEDICATED_COMPRESSOR_8KB = 10 << 4 | 3,
|
|
||||||
DEDICATED_COMPRESSOR_16KB = 11 << 4 | 4,
|
|
||||||
DEDICATED_COMPRESSOR_32KB = 12 << 4 | 5,
|
|
||||||
DEDICATED_COMPRESSOR_64KB = 13 << 4 | 6,
|
|
||||||
DEDICATED_COMPRESSOR_128KB = 14 << 4 | 7,
|
|
||||||
DEDICATED_COMPRESSOR_256KB = 15 << 4 | 8,
|
|
||||||
DEDICATED_COMPRESSOR = 15 << 4 | 8
|
|
||||||
} uws_compress_options_t;
|
|
||||||
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
CONTINUATION = 0,
|
|
||||||
TEXT = 1,
|
|
||||||
BINARY = 2,
|
|
||||||
CLOSE = 8,
|
|
||||||
PING = 9,
|
|
||||||
PONG = 10
|
|
||||||
} uws_opcode_t;
|
|
||||||
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
BACKPRESSURE,
|
|
||||||
SUCCESS,
|
|
||||||
DROPPED
|
|
||||||
} uws_sendstatus_t;
|
|
||||||
|
|
||||||
typedef struct
|
|
||||||
{
|
|
||||||
int port;
|
|
||||||
const char *host;
|
|
||||||
int options;
|
|
||||||
} uws_app_listen_config_t;
|
|
||||||
|
|
||||||
struct uws_app_s;
|
|
||||||
struct uws_req_s;
|
|
||||||
struct uws_res_s;
|
|
||||||
struct uws_websocket_s;
|
|
||||||
struct uws_header_iterator_s;
|
|
||||||
typedef struct uws_app_s uws_app_t;
|
|
||||||
typedef struct uws_req_s uws_req_t;
|
|
||||||
typedef struct uws_res_s uws_res_t;
|
|
||||||
typedef struct uws_socket_context_s uws_socket_context_t;
|
|
||||||
typedef struct uws_websocket_s uws_websocket_t;
|
|
||||||
|
|
||||||
typedef void (*uws_websocket_handler)(uws_websocket_t *ws, void* user_data);
|
|
||||||
typedef void (*uws_websocket_message_handler)(uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, void* user_data);
|
|
||||||
typedef void (*uws_websocket_ping_pong_handler)(uws_websocket_t *ws, const char *message, size_t length, void* user_data);
|
|
||||||
typedef void (*uws_websocket_close_handler)(uws_websocket_t *ws, int code, const char *message, size_t length, void* user_data);
|
|
||||||
typedef void (*uws_websocket_upgrade_handler)(uws_res_t *response, uws_req_t *request, uws_socket_context_t *context, void* user_data);
|
|
||||||
typedef struct
|
|
||||||
{
|
|
||||||
uws_compress_options_t compression;
|
|
||||||
unsigned int maxPayloadLength;
|
|
||||||
unsigned short idleTimeout;
|
|
||||||
unsigned int maxBackpressure;
|
|
||||||
bool closeOnBackpressureLimit;
|
|
||||||
bool resetIdleTimeoutOnSend;
|
|
||||||
bool sendPingsAutomatically;
|
|
||||||
unsigned short maxLifetime;
|
|
||||||
uws_websocket_upgrade_handler upgrade;
|
|
||||||
uws_websocket_handler open;
|
|
||||||
uws_websocket_message_handler message;
|
|
||||||
uws_websocket_handler drain;
|
|
||||||
uws_websocket_ping_pong_handler ping;
|
|
||||||
uws_websocket_ping_pong_handler pong;
|
|
||||||
uws_websocket_close_handler close;
|
|
||||||
} uws_socket_behavior_t;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
bool ok;
|
|
||||||
bool has_responded;
|
|
||||||
} uws_try_end_result_t;
|
|
||||||
|
|
||||||
typedef void (*uws_listen_handler)(struct us_listen_socket_t *listen_socket, uws_app_listen_config_t config, void *user_data);
|
|
||||||
typedef void (*uws_method_handler)(uws_res_t *response, uws_req_t *request, void *user_data);
|
|
||||||
typedef void (*uws_filter_handler)(uws_res_t *response, int, void *user_data);
|
|
||||||
typedef void (*uws_missing_server_handler)(const char *hostname, size_t hostname_length, void *user_data);
|
|
||||||
typedef void (*uws_get_headers_server_handler)(const char *header_name, size_t header_name_size, const char *header_value, size_t header_value_size, void *user_data);
|
|
||||||
|
|
||||||
|
|
||||||
uws_app_t *uws_create_app(int ssl, struct us_socket_context_options_t options);
|
|
||||||
void uws_app_destroy(int ssl, uws_app_t *app);
|
|
||||||
void uws_app_get(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_post(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_options(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_patch(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_put(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_head(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_connect(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_trace(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
void uws_app_any(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data);
|
|
||||||
|
|
||||||
void uws_app_run(int ssl, uws_app_t *);
|
|
||||||
|
|
||||||
void uws_app_listen(int ssl, uws_app_t *app, int port, uws_listen_handler handler, void *user_data);
|
|
||||||
void uws_app_listen_with_config(int ssl, uws_app_t *app, uws_app_listen_config_t config, uws_listen_handler handler, void *user_data);
|
|
||||||
bool uws_constructor_failed(int ssl, uws_app_t *app);
|
|
||||||
unsigned int uws_num_subscribers(int ssl, uws_app_t *app, const char *topic, size_t topic_length);
|
|
||||||
bool uws_publish(int ssl, uws_app_t *app, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress);
|
|
||||||
void *uws_get_native_handle(int ssl, uws_app_t *app);
|
|
||||||
void uws_remove_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length);
|
|
||||||
void uws_add_server_name(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length);
|
|
||||||
void uws_add_server_name_with_options(int ssl, uws_app_t *app, const char *hostname_pattern, size_t hostname_pattern_length, struct us_socket_context_options_t options);
|
|
||||||
void uws_missing_server_name(int ssl, uws_app_t *app, uws_missing_server_handler handler, void *user_data);
|
|
||||||
void uws_filter(int ssl, uws_app_t *app, uws_filter_handler handler, void *user_data);
|
|
||||||
|
|
||||||
|
|
||||||
void uws_res_end(int ssl, uws_res_t *res, const char *data, size_t length, bool close_connection);
|
|
||||||
void uws_res_pause(int ssl, uws_res_t *res);
|
|
||||||
void uws_res_resume(int ssl, uws_res_t *res);
|
|
||||||
void uws_res_write_continue(int ssl, uws_res_t *res);
|
|
||||||
void uws_res_write_status(int ssl, uws_res_t *res, const char *status, size_t length);
|
|
||||||
void uws_res_write_header(int ssl, uws_res_t *res, const char *key, size_t key_length, const char *value, size_t value_length);
|
|
||||||
void uws_res_override_write_offset(int ssl, uws_res_t *res, uintmax_t offset);
|
|
||||||
|
|
||||||
void uws_res_write_header_int(int ssl, uws_res_t *res, const char *key, size_t key_length, uint64_t value);
|
|
||||||
void uws_res_end_without_body(int ssl, uws_res_t *res, bool close_connection);
|
|
||||||
bool uws_res_write(int ssl, uws_res_t *res, const char *data, size_t length);
|
|
||||||
uintmax_t uws_res_get_write_offset(int ssl, uws_res_t *res);
|
|
||||||
void *uws_res_get_native_handle(int ssl, uws_res_t *res);
|
|
||||||
bool uws_res_has_responded(int ssl, uws_res_t *res);
|
|
||||||
void uws_res_on_writable(int ssl, uws_res_t *res, bool (*handler)(uws_res_t *res, uintmax_t, void *opcional_data), void *user_data);
|
|
||||||
void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), void *opcional_data);
|
|
||||||
void uws_res_on_data(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, const char *chunk, size_t chunk_length, bool is_end, void *opcional_data), void *opcional_data);
|
|
||||||
void uws_res_upgrade(int ssl, uws_res_t *res, void *data, const char *sec_web_socket_key, size_t sec_web_socket_key_length, const char *sec_web_socket_protocol, size_t sec_web_socket_protocol_length, const char *sec_web_socket_extensions, size_t sec_web_socket_extensions_length, uws_socket_context_t *ws);
|
|
||||||
uws_try_end_result_t uws_res_try_end(int ssl, uws_res_t *res, const char *data, size_t length, uintmax_t total_size, bool close_connection);
|
|
||||||
void uws_res_cork(int ssl, uws_res_t *res,void(*callback)(uws_res_t *res, void* user_data) ,void* user_data);
|
|
||||||
size_t uws_res_get_remote_address(int ssl, uws_res_t *res, const char **dest);
|
|
||||||
size_t uws_res_get_remote_address_as_text(int ssl, uws_res_t *res, const char **dest);
|
|
||||||
size_t uws_res_get_proxied_remote_address(int ssl, uws_res_t *res, const char **dest);
|
|
||||||
size_t uws_res_get_proxied_remote_address_as_text(int ssl, uws_res_t *res, const char **dest);
|
|
||||||
|
|
||||||
bool uws_req_is_ancient(uws_req_t *res);
|
|
||||||
bool uws_req_get_yield(uws_req_t *res);
|
|
||||||
void uws_req_set_field(uws_req_t *res, bool yield);
|
|
||||||
size_t uws_req_get_url(uws_req_t *res, const char **dest);
|
|
||||||
size_t uws_req_get_method(uws_req_t *res, const char **dest);
|
|
||||||
size_t uws_req_get_case_sensitive_method(uws_req_t *res, const char **dest);
|
|
||||||
|
|
||||||
size_t uws_req_get_header(uws_req_t *res, const char *lower_case_header, size_t lower_case_header_length, const char **dest);
|
|
||||||
size_t uws_req_get_query(uws_req_t *res, const char *key, size_t key_length, const char **dest);
|
|
||||||
size_t uws_req_get_parameter(uws_req_t *res, unsigned short index, const char **dest);
|
|
||||||
size_t uws_req_get_full_url(uws_req_t *res, const char **dest);
|
|
||||||
void uws_req_for_each_header(uws_req_t *res, uws_get_headers_server_handler handler, void *user_data);
|
|
||||||
|
|
||||||
void uws_ws(int ssl, uws_app_t *app, const char *pattern, uws_socket_behavior_t behavior, void* user_data);
|
|
||||||
void *uws_ws_get_user_data(int ssl, uws_websocket_t *ws);
|
|
||||||
void uws_ws_close(int ssl, uws_websocket_t *ws);
|
|
||||||
uws_sendstatus_t uws_ws_send(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode);
|
|
||||||
uws_sendstatus_t uws_ws_send_with_options(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress, bool fin);
|
|
||||||
uws_sendstatus_t uws_ws_send_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress);
|
|
||||||
uws_sendstatus_t uws_ws_send_first_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress);
|
|
||||||
uws_sendstatus_t uws_ws_send_first_fragment_with_opcode(int ssl, uws_websocket_t *ws, const char *message, size_t length, uws_opcode_t opcode, bool compress);
|
|
||||||
uws_sendstatus_t uws_ws_send_last_fragment(int ssl, uws_websocket_t *ws, const char *message, size_t length, bool compress);
|
|
||||||
void uws_ws_end(int ssl, uws_websocket_t *ws, int code, const char *message, size_t length);
|
|
||||||
void uws_ws_cork(int ssl, uws_websocket_t *ws, void (*handler)(void *user_data), void *user_data);
|
|
||||||
|
|
||||||
bool uws_ws_subscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length);
|
|
||||||
bool uws_ws_unsubscribe(int ssl, uws_websocket_t *ws, const char *topic, size_t length);
|
|
||||||
bool uws_ws_is_subscribed(int ssl, uws_websocket_t *ws, const char *topic, size_t length);
|
|
||||||
void uws_ws_iterate_topics(int ssl, uws_websocket_t *ws, void (*callback)(const char *topic, size_t length, void *user_data), void *user_data);
|
|
||||||
bool uws_ws_publish(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length);
|
|
||||||
bool uws_ws_publish_with_options(int ssl, uws_websocket_t *ws, const char *topic, size_t topic_length, const char *message, size_t message_length, uws_opcode_t opcode, bool compress);
|
|
||||||
int uws_ws_get_buffered_amount(int ssl, uws_websocket_t *ws);
|
|
||||||
size_t uws_ws_get_remote_address(int ssl, uws_websocket_t *ws, const char **dest);
|
|
||||||
size_t uws_ws_get_remote_address_as_text(int ssl, uws_websocket_t *ws, const char **dest);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
library_extension = "dll" if platform.system().lower() == "windows" else "so"
|
|
||||||
library_path = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"libsocketify_%s_%s.%s"
|
|
||||||
% (
|
|
||||||
platform.system().lower(),
|
|
||||||
"arm64" if "arm" in platform.processor().lower() else "amd64",
|
|
||||||
library_extension,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
lib = ffi.dlopen(library_path)
|
|
||||||
|
|
||||||
|
|
||||||
@ffi.callback("void(const char*, size_t, void*)")
|
@ffi.callback("void(const char*, size_t, void*)")
|
||||||
|
@ -1838,12 +1584,12 @@ class AppResponse:
|
||||||
def write_status(self, status_or_status_text):
|
def write_status(self, status_or_status_text):
|
||||||
if not self.aborted:
|
if not self.aborted:
|
||||||
if isinstance(status_or_status_text, int):
|
if isinstance(status_or_status_text, int):
|
||||||
try:
|
if bool(lib.socketify_res_write_int_status(self.SSL, self.res, status_or_status_text)):
|
||||||
data = status_codes[status_or_status_text]
|
return self
|
||||||
except: # invalid status
|
raise RuntimeError(
|
||||||
raise RuntimeError(
|
'"%d" Is not an valid Status Code' % status_or_status_text
|
||||||
'"%d" Is not an valid Status Code' % status_or_status_text
|
)
|
||||||
)
|
|
||||||
elif isinstance(status_or_status_text, str):
|
elif isinstance(status_or_status_text, str):
|
||||||
data = status_or_status_text.encode("utf-8")
|
data = status_or_status_text.encode("utf-8")
|
||||||
elif isinstance(status_or_status_text, bytes):
|
elif isinstance(status_or_status_text, bytes):
|
||||||
|
|
|
@ -0,0 +1,292 @@
|
||||||
|
from socketify import App, CompressOptions, OpCode
|
||||||
|
from typing import Union, Callable, Awaitable, Optional
|
||||||
|
import inspect
|
||||||
|
from queue import SimpleQueue
|
||||||
|
|
||||||
|
|
||||||
|
class SSGIHttpResponse:
|
||||||
|
extensions: Optional[dict] = None # extensions for http
|
||||||
|
|
||||||
|
def __init__(self, res, req, extensions = None):
|
||||||
|
self.res = res
|
||||||
|
self.req = req
|
||||||
|
self._need_cork = False
|
||||||
|
self._received_queue = None
|
||||||
|
self._miss_receive_queue = None
|
||||||
|
self.extensions = extensions
|
||||||
|
# if payload is None, request ends without body
|
||||||
|
# if has_more is True, data is written but connection will not end
|
||||||
|
def send(self, payload: Union[str, bytes, bytearray, memoryview, None], has_more: Optional[bool] = False):
|
||||||
|
if has_more:
|
||||||
|
self.res.write(payload)
|
||||||
|
else:
|
||||||
|
self.res.end(payload)
|
||||||
|
|
||||||
|
# send chunk of data, can be used to perform with less backpressure than using send
|
||||||
|
# total_size is the sum of all lenghts in bytes of all chunks to be sended
|
||||||
|
# connection will end when total_size is met
|
||||||
|
# returns tuple(bool, bool) first bool represents if the chunk is succefully sended, the second if the connection has ended
|
||||||
|
def send_chunk(self, chunk: Union[bytes, bytearray, memoryview], total_size: int) -> Awaitable:
|
||||||
|
return self.res.send_chunk(chunk, total_size)
|
||||||
|
|
||||||
|
# send status code
|
||||||
|
def send_status(self, status_code: Optional[Union[int, str]] = '200 OK'):
|
||||||
|
self.res.write_status(status_code)
|
||||||
|
|
||||||
|
# send headers to the http response
|
||||||
|
def send_headers(self, headers):
|
||||||
|
for name, value in headers:
|
||||||
|
self.res.write_header(name, value)
|
||||||
|
|
||||||
|
# ensure async call for the handler, passing any arguments to it
|
||||||
|
def run_async(self, handler: Awaitable, *arguments) -> Awaitable:
|
||||||
|
self.req.get_headers() # preserve headers
|
||||||
|
return self.res.run_async(handler(*arguments))
|
||||||
|
|
||||||
|
# get an all data
|
||||||
|
# returns an BytesIO() or None if no payload is available
|
||||||
|
def get_data(self) -> Awaitable:
|
||||||
|
if self.res.get_header("content-length", False) or self.res.get_header("transfer-encoding", False):
|
||||||
|
return self.res.get_data()
|
||||||
|
|
||||||
|
#return empty result
|
||||||
|
future = self.res.loop.create_future()
|
||||||
|
future.set_result(None)
|
||||||
|
return future
|
||||||
|
|
||||||
|
# get an chunk of data (chunk size is decided by the Server implementation)
|
||||||
|
# returns the bytes or None if no more chunks are sent
|
||||||
|
def get_chunk(self) -> Awaitable:
|
||||||
|
if not self._received_queue:
|
||||||
|
self._miss_receive_queue = SimpleQueue()
|
||||||
|
self._received_queue = SimpleQueue()
|
||||||
|
def on_data(res, chunk, is_end):
|
||||||
|
if not self._received_queue.empty():
|
||||||
|
future = self._received_queue.get(False)
|
||||||
|
future.set_result(chunk)
|
||||||
|
if not self._received_queue.empty() and is_end and chunk:
|
||||||
|
future = self._received_queue.get(False)
|
||||||
|
future.set_result(None)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
self._miss_receive_queue.put(chunk, False)
|
||||||
|
|
||||||
|
if is_end and chunk:
|
||||||
|
self._miss_receive_queue.put(None, False)
|
||||||
|
|
||||||
|
future = self.res.loop.create_future()
|
||||||
|
self._received_queue.put(future, False)
|
||||||
|
self.res.on_data(on_data)
|
||||||
|
return future
|
||||||
|
else:
|
||||||
|
future = self.res.loop.create_future()
|
||||||
|
if not self._miss_receive_queue.empty():
|
||||||
|
future.set_result(self._miss_receive_queue.get(False))
|
||||||
|
return future
|
||||||
|
self._received_queue.put(future, False)
|
||||||
|
return future
|
||||||
|
|
||||||
|
# on aborted event, calle when the connection abort
|
||||||
|
def on_aborted(self, handler: Union[Awaitable, Callable], *arguments):
|
||||||
|
def on_aborted(res):
|
||||||
|
res.aborted = True
|
||||||
|
if inspect.iscoroutinefunction(handler):
|
||||||
|
res.run_async(handler(*arguments))
|
||||||
|
else:
|
||||||
|
handler(*arguments)
|
||||||
|
|
||||||
|
self.res.on_aborted(on_aborted)
|
||||||
|
|
||||||
|
class SSGIWebSocket:
|
||||||
|
status: int = 0 # 0 pending upgrade, 1 rejected, 2 closed, 3 accepted
|
||||||
|
extensions: Optional[dict] = None # extensions for websocket
|
||||||
|
def __init__(self, res, req, socket_context, ws, extensions = None):
|
||||||
|
self.res = res
|
||||||
|
self.req = req
|
||||||
|
self.status = 0
|
||||||
|
self.extensions = extensions
|
||||||
|
self._socket_context = socket_context
|
||||||
|
self._key = self.req.get_header("sec-websocket-key")
|
||||||
|
self._protocol = self.req.get_header("sec-websocket-protocol")
|
||||||
|
self._extensions = self.req.get_header("sec-websocket-extensions")
|
||||||
|
self._close_handler = None
|
||||||
|
self._receive_handler = None
|
||||||
|
self._need_cork = False
|
||||||
|
self._accept_future = None
|
||||||
|
|
||||||
|
# accept the connection upgrade
|
||||||
|
# can pass the protocol to accept if None is informed will use sec-websocket-protocol header if available
|
||||||
|
def accept(self, protocol: str = None) -> Awaitable:
|
||||||
|
if self.status == 0:
|
||||||
|
self._accept_future = self.res.loop.create_future()
|
||||||
|
upgrade_protocol = protocol if protocol else self._protocol
|
||||||
|
|
||||||
|
self.res.upgrade(self._key, upgrade_protocol if upgrade_protocol else "", self._extensions, self._socket_context, self)
|
||||||
|
return self._accept_future
|
||||||
|
future = self.res.loop.create_future()
|
||||||
|
future.set_result(False)
|
||||||
|
return future
|
||||||
|
|
||||||
|
# reject the connection upgrade, you can send status_code, payload and headers if you want, all optional
|
||||||
|
def reject(self, status_code: Optional[int] = 403, payload = None, headers = None) -> Awaitable:
|
||||||
|
|
||||||
|
future = self.res.loop.create_future()
|
||||||
|
if self.status < 1:
|
||||||
|
self.status = 1
|
||||||
|
if headers:
|
||||||
|
for name, value in headers:
|
||||||
|
self.res.write_header(name, value)
|
||||||
|
if not payload:
|
||||||
|
self.res.write_status(status_code).end_without_body()
|
||||||
|
else:
|
||||||
|
self.res.write_status(status_code).cork_end(payload)
|
||||||
|
future.set_result(True)
|
||||||
|
else:
|
||||||
|
future.set_result(False)
|
||||||
|
return future
|
||||||
|
|
||||||
|
# if returns an future, this can be awaited or not
|
||||||
|
def send(self, payload: Union[bytes, bytearray, memoryview]):
|
||||||
|
if self.status == 3:
|
||||||
|
if self._need_cork:
|
||||||
|
self.ws.cork_send(payload)
|
||||||
|
else:
|
||||||
|
self.ws.send(payload)
|
||||||
|
|
||||||
|
# close connection
|
||||||
|
def close(self, code: Optional[int] = 1000):
|
||||||
|
if self.status == 3:
|
||||||
|
self.ws.close()
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
# ensure async call for the handler, passing any arguments to it
|
||||||
|
def run_async(self, handler: Awaitable, *arguments) -> Awaitable:
|
||||||
|
self.req.get_headers()
|
||||||
|
self._need_cork = True
|
||||||
|
return self.res.run_async(handler(*arguments))
|
||||||
|
|
||||||
|
# on receive event, called when the socket disconnect
|
||||||
|
# passes ws: SSGIWebSocket, msg: Union[str, bytes, bytearray, memoryview, None], *arguments
|
||||||
|
def on_receive(self, handler: Union[Awaitable, Callable], *arguments):
|
||||||
|
def on_receive_handler(ws, message, opcode):
|
||||||
|
if inspect.iscoroutinefunction(handler):
|
||||||
|
ws.res.run_async(handler(ws, message, *arguments))
|
||||||
|
else:
|
||||||
|
handler(ws, message, *arguments)
|
||||||
|
self._receive_handler = on_receive_handler
|
||||||
|
|
||||||
|
# on close event, called when the socket disconnect
|
||||||
|
# passes ws: SSGIWebSocket, code: int and reason: Optional[str] = None, *arguments
|
||||||
|
def on_close(self, handler: Union[Awaitable, Callable], *arguments):
|
||||||
|
def on_close_handler(ws, code, message):
|
||||||
|
if inspect.iscoroutinefunction(handler):
|
||||||
|
ws.res.run_async(handler(ws, code, message, *arguments))
|
||||||
|
else:
|
||||||
|
handler(ws, code, message, *arguments)
|
||||||
|
|
||||||
|
self._close_handler = on_close_handler
|
||||||
|
|
||||||
|
class SSGI:
|
||||||
|
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
|
||||||
|
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
|
||||||
|
self.SERVER_PORT = None
|
||||||
|
self.SERVER_HOST = ''
|
||||||
|
self.SERVER_SCHEME = 'https' if self.server.options else 'http'
|
||||||
|
self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws'
|
||||||
|
self.SERVER_ADDRESS = ''
|
||||||
|
|
||||||
|
self.app = app
|
||||||
|
support = app.get_supported({ "ssgi": "1.0" })
|
||||||
|
http, middleware = support.get('http', (None, None))
|
||||||
|
websocket, ws_middleware = support.get('websocket', (None, None))
|
||||||
|
|
||||||
|
def ssgi(res, req):
|
||||||
|
|
||||||
|
response = SSGIHttpResponse(res, req)
|
||||||
|
PATH_INFO = req.get_url()
|
||||||
|
# FULL_PATH_INFO = req.get_full_url()
|
||||||
|
METHOD = req.get_method()
|
||||||
|
QUERY_STRING = "" #FULL_PATH_INFO[len(PATH_INFO):]
|
||||||
|
|
||||||
|
# REMOTE_ADDRESS = res.get_remote_address()
|
||||||
|
def get_header(name = None):
|
||||||
|
if name:
|
||||||
|
return req.get_header(name)
|
||||||
|
else:
|
||||||
|
return req.get_headers()
|
||||||
|
# self.SERVER_SCHEME, self.SERVER_ADDRESS,
|
||||||
|
# self.SERVER_SCHEME, self.SERVER_ADDRESS,
|
||||||
|
if inspect.iscoroutinefunction(middleware):
|
||||||
|
req.get_headers() # preserve
|
||||||
|
res.run_async(middleware('http', METHOD, PATH_INFO, QUERY_STRING, get_header, response))
|
||||||
|
else:
|
||||||
|
middleware('http', METHOD, PATH_INFO, QUERY_STRING, get_header, response)
|
||||||
|
# if not response._responded:
|
||||||
|
# res.grab_aborted_handler()
|
||||||
|
|
||||||
|
if http == "ssgi" and middleware:
|
||||||
|
self.server.any("/*", ssgi)
|
||||||
|
|
||||||
|
def ws_upgrade(res, req, socket_context):
|
||||||
|
response = SSGIWebSocket(res, req, socket_context, None)
|
||||||
|
PATH_INFO = req.get_url()
|
||||||
|
FULL_PATH_INFO = req.get_full_url()
|
||||||
|
METHOD = req.get_method()
|
||||||
|
|
||||||
|
REMOTE_ADDRESS = req.get_remote_address()
|
||||||
|
def get_header(name = None):
|
||||||
|
if name:
|
||||||
|
return req.get_header(name)
|
||||||
|
else:
|
||||||
|
return req.get_headers()
|
||||||
|
if inspect.iscoroutinefunction(ws_middleware):
|
||||||
|
req.get_headers() # preserve
|
||||||
|
res.run_async(ws_middleware('websocket', self.SERVER_SCHEME, self.SERVER_ADDRESS, REMOTE_ADDRESS, METHOD, PATH_INFO, FULL_PATH_INFO[len(PATH_INFO):], get_header, response))
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
ws_middleware('websocket', self.SERVER_WS_SCHEME, self.SERVER_HOST, REMOTE_ADDRESS, METHOD, PATH_INFO, FULL_PATH_INFO[len(PATH_INFO):], get_header, response)
|
||||||
|
# not accepted (async?)
|
||||||
|
if response.status == 0 and not response._accept_future:
|
||||||
|
res.grab_aborted_handler()
|
||||||
|
|
||||||
|
if websocket == "ssgi" and ws_middleware:
|
||||||
|
def ws_open(ws):
|
||||||
|
res = ws.get_user_data()
|
||||||
|
res.ws = ws
|
||||||
|
res.status = 3
|
||||||
|
res._accept_future.set_result(True)
|
||||||
|
|
||||||
|
def ws_message(ws, message, op):
|
||||||
|
res = ws.get_user_data()
|
||||||
|
if res._receive_handler:
|
||||||
|
res._receive_handler(res, message, op)
|
||||||
|
|
||||||
|
def ws_close(ws, code, message):
|
||||||
|
res = ws.get_user_data()
|
||||||
|
if res._close_handler:
|
||||||
|
res._close_handler(res, code, message)
|
||||||
|
|
||||||
|
|
||||||
|
self.server.ws("/*", {
|
||||||
|
"compression": CompressOptions.DISABLED,
|
||||||
|
"max_payload_length": 16 * 1024 * 1024,
|
||||||
|
"idle_timeout": 0,
|
||||||
|
"upgrade": ws_upgrade,
|
||||||
|
"open": ws_open,
|
||||||
|
"message": ws_message,
|
||||||
|
"close": ws_close
|
||||||
|
})
|
||||||
|
|
||||||
|
def listen(self, port_or_options, handler):
|
||||||
|
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
|
||||||
|
self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host
|
||||||
|
if self.SERVER_PORT:
|
||||||
|
self.SERVER_ADDRESS = f"{self.SERVER_HOST}:{self.SERVER_PORT}"
|
||||||
|
else:
|
||||||
|
self.SERVER_ADDRESS = self.SERVER_HOST
|
||||||
|
|
||||||
|
self.server.listen(port_or_options, handler)
|
||||||
|
return self
|
||||||
|
def run(self):
|
||||||
|
self.server.run()
|
||||||
|
return self
|
|
@ -1,73 +1,4 @@
|
||||||
import cffi
|
from .native import ffi, lib
|
||||||
import os
|
|
||||||
import platform
|
|
||||||
|
|
||||||
ffi = cffi.FFI()
|
|
||||||
ffi.cdef(
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
typedef void (*socketify_prepare_handler)(void* user_data);
|
|
||||||
typedef void (*socketify_timer_handler)(void* user_data);
|
|
||||||
typedef void (*socketify_async_handler)(void* user_data);
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
SOCKETIFY_RUN_DEFAULT = 0,
|
|
||||||
SOCKETIFY_RUN_ONCE,
|
|
||||||
SOCKETIFY_RUN_NOWAIT
|
|
||||||
} socketify_run_mode;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
void* uv_prepare_ptr;
|
|
||||||
socketify_prepare_handler on_prepare_handler;
|
|
||||||
void* on_prepare_data;
|
|
||||||
void* uv_loop;
|
|
||||||
} socketify_loop;
|
|
||||||
|
|
||||||
typedef struct{
|
|
||||||
void* uv_timer_ptr;
|
|
||||||
socketify_timer_handler handler;
|
|
||||||
void* user_data;
|
|
||||||
} socketify_timer;
|
|
||||||
|
|
||||||
typedef struct{
|
|
||||||
void* uv_async_ptr;
|
|
||||||
socketify_async_handler handler;
|
|
||||||
void* user_data;
|
|
||||||
} socketify_async;
|
|
||||||
|
|
||||||
socketify_loop * socketify_create_loop();
|
|
||||||
bool socketify_constructor_failed(socketify_loop* loop);
|
|
||||||
bool socketify_on_prepare(socketify_loop* loop, socketify_prepare_handler handler, void* user_data);
|
|
||||||
bool socketify_prepare_unbind(socketify_loop* loop);
|
|
||||||
void socketify_destroy_loop(socketify_loop* loop);
|
|
||||||
void* socketify_get_native_loop(socketify_loop* loop);
|
|
||||||
|
|
||||||
int socketify_loop_run(socketify_loop* loop, socketify_run_mode mode);
|
|
||||||
void socketify_loop_stop(socketify_loop* loop);
|
|
||||||
|
|
||||||
socketify_timer* socketify_create_timer(socketify_loop* loop, uint64_t timeout, uint64_t repeat, socketify_timer_handler handler, void* user_data);
|
|
||||||
void socketify_timer_destroy(socketify_timer* timer);
|
|
||||||
bool socketify_async_call(socketify_loop* loop, socketify_async_handler handler, void* user_data);
|
|
||||||
void socketify_timer_set_repeat(socketify_timer* timer, uint64_t repeat);
|
|
||||||
|
|
||||||
|
|
||||||
socketify_timer* socketify_create_check(socketify_loop* loop, socketify_timer_handler handler, void* user_data);
|
|
||||||
void socketify_check_destroy(socketify_timer* timer);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
library_extension = "dll" if platform.system().lower() == "windows" else "so"
|
|
||||||
library_path = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"libsocketify_%s_%s.%s"
|
|
||||||
% (
|
|
||||||
platform.system().lower(),
|
|
||||||
"arm64" if "arm" in platform.processor().lower() else "amd64",
|
|
||||||
library_extension,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
lib = ffi.dlopen(library_path)
|
|
||||||
|
|
||||||
|
|
||||||
@ffi.callback("void(void *)")
|
@ffi.callback("void(void *)")
|
||||||
|
|
|
@ -3,66 +3,146 @@
|
||||||
import os
|
import os
|
||||||
from socketify import App
|
from socketify import App
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
from .native import lib, ffi
|
||||||
|
|
||||||
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
|
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
|
||||||
# re formatting headers is really slow and dummy, dict ops are really slow
|
# re formatting headers is really slow and dummy, dict ops are really slow
|
||||||
|
|
||||||
|
@ffi.callback("void(uws_res_t*, const char*, size_t, bool, void*)")
|
||||||
|
def wsgi_on_data_handler(res, chunk, chunk_length, is_end, user_data):
|
||||||
|
data_response = ffi.from_handle(user_data)
|
||||||
|
if chunk != ffi.NULL:
|
||||||
|
data_response.buffer.write(ffi.unpack(chunk, chunk_length))
|
||||||
|
if bool(is_end):
|
||||||
|
lib.uws_res_cork(data_response.app.server.SSL, res, wsgi_corked_response_start_handler, data_response._ptr)
|
||||||
|
|
||||||
|
|
||||||
|
class WSGIDataResponse:
|
||||||
|
def __init__(self, app, environ, start_response, aborted, buffer, on_data):
|
||||||
|
self.buffer = buffer
|
||||||
|
self.aborted = aborted
|
||||||
|
self._ptr = ffi.new_handle(self)
|
||||||
|
self.on_data = on_data
|
||||||
|
self.environ = environ
|
||||||
|
self.app = app
|
||||||
|
self.start_response = start_response
|
||||||
|
|
||||||
|
def write(ssl, res, message):
|
||||||
|
if isinstance(message, str):
|
||||||
|
data = message.encode("utf-8")
|
||||||
|
elif isinstance(message, bytes):
|
||||||
|
data = message
|
||||||
|
else:
|
||||||
|
data = b''
|
||||||
|
lib.uws_res_write(ssl, res, data, len(data))
|
||||||
|
|
||||||
|
def write_status(ssl, res, status_text):
|
||||||
|
if isinstance(status_text, str):
|
||||||
|
data = status_text.encode("utf-8")
|
||||||
|
elif isinstance(status_text, bytes):
|
||||||
|
data = status_text
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
lib.uws_res_write_status(ssl, res, data, len(data))
|
||||||
|
return True
|
||||||
|
|
||||||
|
def write_header(ssl, res, key, value):
|
||||||
|
if isinstance(key, str):
|
||||||
|
key_data = key.encode("utf-8")
|
||||||
|
elif isinstance(key, bytes):
|
||||||
|
key_data = key
|
||||||
|
|
||||||
|
if isinstance(value, int):
|
||||||
|
lib.uws_res_write_header_int(
|
||||||
|
ssl,
|
||||||
|
res,
|
||||||
|
key_data,
|
||||||
|
len(key_data),
|
||||||
|
ffi.cast("uint64_t", value),
|
||||||
|
)
|
||||||
|
elif isinstance(value, str):
|
||||||
|
value_data = value.encode("utf-8")
|
||||||
|
elif isinstance(value, bytes):
|
||||||
|
value_data = value
|
||||||
|
lib.uws_res_write_header(
|
||||||
|
ssl, res, key_data, len(key_data), value_data, len(value_data)
|
||||||
|
)
|
||||||
|
|
||||||
|
@ffi.callback("void(uws_res_t*, void*)")
|
||||||
|
def wsgi_corked_response_start_handler(res, user_data):
|
||||||
|
data_response = ffi.from_handle(user_data)
|
||||||
|
data_response.on_data(data_response, res)
|
||||||
|
|
||||||
|
@ffi.callback("void(int, uws_res_t*, socketify_asgi_data request, void*, bool*)")
|
||||||
|
def wsgi(ssl, response, info, user_data, aborted):
|
||||||
|
app = ffi.from_handle(user_data)
|
||||||
|
|
||||||
|
environ = dict(app.BASIC_ENVIRON)
|
||||||
|
|
||||||
|
environ['REQUEST_METHOD'] = ffi.unpack(info.method, info.method_size).decode('utf8')
|
||||||
|
environ['PATH_INFO'] = ffi.unpack(info.url, info.url_size).decode('utf8')
|
||||||
|
environ['QUERY_STRING'] = ffi.unpack(info.query_string, info.query_string_size).decode('utf8')
|
||||||
|
environ['REMOTE_ADDR'] = ffi.unpack(info.remote_address, info.remote_address_size).decode('utf8')
|
||||||
|
|
||||||
|
next_header = info.header_list
|
||||||
|
while next_header != ffi.NULL:
|
||||||
|
header = next_header[0]
|
||||||
|
name = ffi.unpack(header.name, header.name_size).decode('utf8')
|
||||||
|
value = ffi.unpack(header.value, header.value_size).decode('utf8')
|
||||||
|
environ[f"HTTP_{name.replace('-', '_').upper()}"]=value
|
||||||
|
next_header = ffi.cast("socketify_header*", next_header.next)
|
||||||
|
def start_response(status, headers):
|
||||||
|
write_status(ssl, response, status)
|
||||||
|
for (name, value) in headers:
|
||||||
|
write_header(ssl, response, name, value)
|
||||||
|
# #check for body
|
||||||
|
if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False):
|
||||||
|
WSGI_INPUT = BytesIO()
|
||||||
|
environ['wsgi.input'] = WSGI_INPUT
|
||||||
|
def on_data(data_response, response):
|
||||||
|
if bool(data_response.aborted[0]):
|
||||||
|
return
|
||||||
|
|
||||||
|
ssl = data_response.app.server.SSL
|
||||||
|
app_iter = data_response.app.app(data_response.environ, data_response.start_response)
|
||||||
|
try:
|
||||||
|
for data in app_iter:
|
||||||
|
write(ssl, response, data)
|
||||||
|
finally:
|
||||||
|
if hasattr(app_iter, 'close'):
|
||||||
|
app_iter.close()
|
||||||
|
lib.uws_res_end_without_body(ssl, response, 0)
|
||||||
|
|
||||||
|
data_response = WSGIDataResponse(app, environ, start_response, aborted, WSGI_INPUT, on_data)
|
||||||
|
|
||||||
|
lib.uws_res_on_data(
|
||||||
|
ssl, response, wsgi_on_data_handler, data_response._ptr
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
environ['wsgi.input'] = None
|
||||||
|
app_iter = app.app(environ, start_response)
|
||||||
|
try:
|
||||||
|
for data in app_iter:
|
||||||
|
write(ssl, response, data)
|
||||||
|
finally:
|
||||||
|
if hasattr(app_iter, 'close'):
|
||||||
|
app_iter.close()
|
||||||
|
lib.uws_res_end_without_body(ssl, response, 0)
|
||||||
|
|
||||||
class WSGI:
|
class WSGI:
|
||||||
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
|
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
|
||||||
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
|
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
|
||||||
self.SERVER_PORT = None
|
self.SERVER_PORT = None
|
||||||
self.app = app
|
self.app = app
|
||||||
self.BASIC_ENVIRON = dict(os.environ)
|
self.BASIC_ENVIRON = dict(os.environ)
|
||||||
def wsgi(res, req):
|
|
||||||
# create environ
|
|
||||||
PATH_INFO = req.get_url()
|
|
||||||
FULL_PATH_INFO = req.get_full_url()
|
|
||||||
environ = dict(self.BASIC_ENVIRON)
|
|
||||||
environ['REQUEST_METHOD'] = req.get_method()
|
|
||||||
environ['PATH_INFO'] = PATH_INFO
|
|
||||||
environ['QUERY_STRING'] = FULL_PATH_INFO[len(PATH_INFO):]
|
|
||||||
environ['REMOTE_ADDR'] = res.get_remote_address()
|
|
||||||
|
|
||||||
|
|
||||||
def start_response(status, headers):
|
|
||||||
res.write_status(status)
|
|
||||||
for (name, value) in headers:
|
|
||||||
res.write_header(name, value)
|
|
||||||
|
|
||||||
|
|
||||||
def set_http(name, value):
|
|
||||||
environ[f"HTTP_{name.replace('-', '_').upper()}"]=value
|
|
||||||
req.for_each_header(set_http)
|
|
||||||
|
|
||||||
#check for body
|
|
||||||
if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False):
|
|
||||||
WSGI_INPUT = BytesIO()
|
|
||||||
environ['wsgi.input'] = WSGI_INPUT
|
|
||||||
def on_data(res, chunk, is_end):
|
|
||||||
if chunk:
|
|
||||||
WSGI_INPUT.write(chunk)
|
|
||||||
if is_end:
|
|
||||||
app_iter = self.app(environ, start_response)
|
|
||||||
try:
|
|
||||||
for data in app_iter:
|
|
||||||
res.write(data)
|
|
||||||
finally:
|
|
||||||
if hasattr(app_iter, 'close'):
|
|
||||||
app_iter.close()
|
|
||||||
res.end_without_body()
|
|
||||||
res.on_data(on_data)
|
|
||||||
else:
|
|
||||||
environ['wsgi.input'] = None
|
|
||||||
app_iter = self.app(environ, start_response)
|
|
||||||
try:
|
|
||||||
for data in app_iter:
|
|
||||||
res.write(data)
|
|
||||||
finally:
|
|
||||||
if hasattr(app_iter, 'close'):
|
|
||||||
app_iter.close()
|
|
||||||
res.end_without_body()
|
|
||||||
|
|
||||||
self.server.any("/*", wsgi)
|
|
||||||
|
|
||||||
|
self._ptr = ffi.new_handle(self)
|
||||||
|
self.asgi_http_info = lib.socketify_add_asgi_http_handler(
|
||||||
|
self.server.SSL,
|
||||||
|
self.server.app,
|
||||||
|
wsgi,
|
||||||
|
self._ptr
|
||||||
|
)
|
||||||
|
|
||||||
def listen(self, port_or_options, handler):
|
def listen(self, port_or_options, handler):
|
||||||
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
|
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
|
||||||
|
|
Ładowanie…
Reference in New Issue