asgi, wsgi preliminar tests

pull/39/head
Ciro 2022-11-30 21:42:07 -03:00
rodzic 3fe7e17ca0
commit 239302062f
15 zmienionych plików z 770 dodań i 5 usunięć

Wyświetl plik

@ -0,0 +1,28 @@
import falcon.asgi
from socketify import ASGI
class Home:
async def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Hello, World!"
async def on_post(self, req, resp):
raw_data = await req.stream.read()
print("data", raw_data)
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = raw_data
app = falcon.asgi.App()
home = Home()
app.add_route("/", home)
if __name__ == "__main__":
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
#pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker

Wyświetl plik

@ -0,0 +1,30 @@
import falcon.asgi
import falcon.media
from socketify import ASGI
class SomeResource:
async def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Connect via ws protocol!"
async def on_websocket(self, req, ws):
try:
await ws.accept()
await ws.send_text("hello!")
while True:
payload = await ws.receive_text()
await ws.send_text(payload)
except falcon.WebSocketDisconnected:
pass
app = falcon.asgi.App()
app.add_route("/", SomeResource())
# python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
# pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker
if __name__ == "__main__":
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()

Wyświetl plik

@ -0,0 +1,39 @@
import falcon.asgi
import falcon.media
from socketify import ASGI
remaining_clients = 16
class SomeResource:
async def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Connect via ws protocol!"
async def on_websocket(self, req, ws):
global remaining_clients
try:
await ws.accept()
await ws.subscribe('all')
remaining_clients = remaining_clients - 1
if remaining_clients == 0:
await ws.publish_text('all', 'ready')
else:
print("remaining_clients", remaining_clients)
while True:
payload = await ws.receive_text()
await ws.publish_text('all', payload)
except falcon.WebSocketDisconnected:
remaining_clients = remaining_clients + 1
print("remaining_clients", remaining_clients)
app = falcon.asgi.App()
app.add_route("/", SomeResource())
if __name__ == "__main__":
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()

Wyświetl plik

@ -0,0 +1,50 @@
import falcon.asgi
import falcon.media
from socketify import ASGI
clients = set([])
remaining_clients = 16
async def broadcast(message):
# some clients got disconnected if we tried to to all async :/
# tasks = [ws.send_text(message) for ws in client]
# return await asyncio.wait(tasks, return_when=ALL_COMPLETED)
for ws in clients:
await ws.send_text(message)
class SomeResource:
async def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Connect via ws protocol!"
async def on_websocket(self, req, ws):
global remaining_clients
try:
await ws.accept()
clients.add(ws)
remaining_clients = remaining_clients - 1
if remaining_clients == 0:
await broadcast("ready")
else:
print("remaining_clients", remaining_clients)
while True:
payload = await ws.receive_text()
await broadcast(payload)
except falcon.WebSocketDisconnected:
clients.remove(ws)
remaining_clients = remaining_clients + 1
print("remaining_clients", remaining_clients)
app = falcon.asgi.App()
app.add_route("/", SomeResource())
# python3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker
# pypy3 -m gunicorn falcon_server:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornH11Worker
if __name__ == "__main__":
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()

Wyświetl plik

@ -0,0 +1,28 @@
import falcon.asgi
from socketify import WSGI
class Home:
def on_get(self, req, resp):
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = "Hello, World!"
def on_post(self, req, resp):
raw_data = req.stream.getvalue()
print("data", raw_data)
resp.status = falcon.HTTP_200 # This is the default status
resp.content_type = falcon.MEDIA_TEXT # Default is JSON, so override
resp.text = raw_data
app = falcon.App()
home = Home()
app.add_route("/", home)
if __name__ == "__main__":
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
#pypy3 -m gunicorn uvicorn_guvicorn_plaintext:app -w 1 -k uvicorn.workers.UvicornWorker

Wyświetl plik

@ -0,0 +1,25 @@
from socketify import ASGI
async def app(scope, receive, send):
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
],
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})
if __name__ == "__main__":
ASGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
# python3 -m gunicorn test:app -w 1 -k uvicorn.workers.UvicornWorker

Wyświetl plik

@ -0,0 +1,83 @@
from socketify import ASGI
clients = set([])
remaining_clients = 16
async def broadcast(message):
for send in clients:
await send({
'type': 'websocket.send',
'text': message
})
async def app(scope, receive, send):
global remaining_clients
# handle non websocket
if scope['type'] != 'websocket':
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
],
})
await send({
'type': 'http.response.body',
'body': b'Connect via ws protocol!',
})
# handle websocket
protocols = scope['subprotocols']
scope = await receive()
# get connection
assert scope['type'] == 'websocket.connect'
# accept connection
await send({
'type': 'websocket.accept',
'subprotocol': protocols[0] if len(protocols) > 0 else None
})
clients.add(send)
remaining_clients -= 1
print("remaining_clients", remaining_clients)
await send({
'type': 'websocket.subscribe',
'topic':"all"
})
if remaining_clients == 0:
# await broadcast("ready")
await send({
'type': 'websocket.publish',
'topic': "all",
'text': 'ready'
})
else:
print("remaining_clients", remaining_clients)
scope = await receive()
# get data
while True:
type = scope['type']
# disconnected!
if type == 'websocket.disconnect':
remaining_clients += 1
print("remaining_clients", remaining_clients)
break
# await broadcast(scope.get('text', ''))
await send({
'type': 'websocket.publish',
'topic': "all",
'text': scope.get('text', '')
})
scope = await receive()
if __name__ == "__main__":
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
# python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker

Wyświetl plik

@ -0,0 +1,52 @@
from socketify import ASGI
async def app(scope, receive, send):
# handle non websocket
if scope['type'] != 'websocket':
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
],
})
await send({
'type': 'http.response.body',
'body': b'Connect via ws protocol!',
})
protocols = scope['subprotocols']
scope = await receive()
# get connection
assert scope['type'] == 'websocket.connect'
# accept connection
await send({
'type': 'websocket.accept',
'subprotocol': protocols[0] if len(protocols) > 0 else None
})
# get data
while True:
scope = await receive()
type = scope['type']
# disconnected!
if type == 'websocket.disconnect':
print("disconnected!", scope)
break
# echo!
await send({
'type': 'websocket.send',
'bytes': scope.get('bytes', None),
'text': scope.get('text', '')
})
if __name__ == "__main__":
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
# python3 -m gunicorn test:app -w 1 -k uvicorn.workers.UvicornWorker

Wyświetl plik

@ -0,0 +1,9 @@
from socketify import WSGI
def app(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
yield b'Hello, World!\n'
if __name__ == "__main__":
WSGI(app).listen(8000, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()

Wyświetl plik

@ -0,0 +1,69 @@
from socketify import ASGI
clients = set([])
remaining_clients = 16
async def broadcast(message):
for send in clients:
await send({
'type': 'websocket.send',
'text': message
})
async def app(scope, receive, send):
global remaining_clients
# handle non websocket
if scope['type'] != 'websocket':
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
],
})
await send({
'type': 'http.response.body',
'body': b'Connect via ws protocol!',
})
# handle websocket
protocols = scope['subprotocols']
scope = await receive()
# get connection
assert scope['type'] == 'websocket.connect'
# accept connection
await send({
'type': 'websocket.accept',
'subprotocol': protocols[0] if len(protocols) > 0 else None
})
clients.add(send)
remaining_clients -= 1
print("remaining_clients", remaining_clients)
if remaining_clients == 0:
await broadcast("ready")
else:
print("remaining_clients", remaining_clients)
scope = await receive()
# get data
while True:
type = scope['type']
# disconnected!
if type == 'websocket.disconnect':
remaining_clients += 1
print("remaining_clients", remaining_clients)
break
await broadcast(scope.get('text', ''))
scope = await receive()
if __name__ == "__main__":
ASGI(app).listen(4001, lambda config: print(f"Listening on port http://localhost:{config.port} now\n")).run()
# python3 -m gunicorn test-ws-bench:app -b 127.0.0.1:4001 -w 1 -k uvicorn.workers.UvicornWorker

Wyświetl plik

@ -4,8 +4,8 @@ app = Robyn(__file__)
@app.get("/")
async def h(request):
return "Hello, world!"
def h(request):
return "Hello, World!"
app.start(port=8000)

Wyświetl plik

@ -4,7 +4,7 @@ import multiprocessing
def run_app():
app = App(None, 200_000)
app = App(request_response_factory_max_itens=200_000)
app.get("/", lambda res, req: res.end("Hello, World!"))
app.listen(
8000,
@ -26,6 +26,5 @@ def create_fork():
# fork limiting the cpu count - 1
for i in range(1, multiprocessing.cpu_count()):
create_fork()
run_app() # run app on the main process too :)

Wyświetl plik

@ -6,4 +6,10 @@ from .socketify import (
SendStatus,
CompressOptions,
)
from .asgi import (
ASGI
)
from .wsgi import (
WSGI
)
from .helpers import sendfile, middleware, MiddlewareRouter

Wyświetl plik

@ -0,0 +1,258 @@
from socketify import App, CompressOptions, OpCode
import asyncio
from queue import SimpleQueue
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
# re encoding data and headers is really dummy (can be consumed directly by ffi), dict ops are really slow
EMPTY_RESPONSE = { 'type': 'http.request', 'body': b'', 'more_body': False }
class ASGIWebSocket:
def __init__(self, loop):
self.loop = loop
self.accept_future = None
self.ws = None
self.receive_queue = SimpleQueue()
self.miss_receive_queue = SimpleQueue()
self.miss_receive_queue.put({
'type': 'websocket.connect'
}, False)
def accept(self):
self.accept_future = self.loop.create_future()
return self.accept_future
def open(self, ws):
self.ws = ws
if not self.accept_future.done():
self.accept_future.set_result(True)
def receive(self):
future = self.loop.create_future()
if not self.miss_receive_queue.empty():
future.set_result(self.miss_receive_queue.get(False))
return future
self.receive_queue.put(future, False)
return future
def disconnect(self, code, message):
self.ws = None
if not self.receive_queue.empty():
future = self.receive_queue.get(False)
future.set_result({
'type': 'websocket.disconnect',
'code': code,
'message': message
})
else:
self.miss_receive_queue.put({
'type': 'websocket.disconnect',
'code': code,
'message': message
}, False)
def message(self, ws, value, opcode):
self.ws = ws
if self.receive_queue.empty():
if opcode == OpCode.TEXT:
self.miss_receive_queue.put({
'type': 'websocket.receive',
'text': value
}, False)
elif opcode == OpCode.BINARY:
self.miss_receive_queue.put({
'type': 'websocket.receive',
'bytes': value
}, False)
return True
future = self.receive_queue.get(False)
if opcode == OpCode.TEXT:
future.set_result({
'type': 'websocket.receive',
'text': value
})
elif opcode == OpCode.BINARY:
future.set_result({
'type': 'websocket.receive',
'bytes': value
})
class ASGI:
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
self.SERVER_PORT = None
self.SERVER_HOST = ''
self.SERVER_SCHEME = 'https' if self.server.options else 'http'
self.SERVER_WS_SCHEME = 'wss' if self.server.options else 'ws'
self.app = app
def asgi(res, req):
PATH_INFO = req.get_url()
FULL_PATH_INFO = req.get_full_url()
headers = []
req.for_each_header(lambda name, value: headers.append((name.encode('utf8'), value.encode('utf8'))))
scope = {
'type': 'http',
'asgi': {
'version': '3.0',
'spec_version': '2.0'
},
'http_version': '1.1',
'server': (self.SERVER_HOST, self.SERVER_PORT),
'client': (res.get_remote_address(), None),
'scheme': self.SERVER_SCHEME,
'method': req.get_method(),
'root_path': '',
'path': PATH_INFO,
'raw_path': PATH_INFO.encode('utf8'),
'query_string': FULL_PATH_INFO[len(PATH_INFO):].encode('utf8'),
'headers': headers
}
async def receive():
if res.aborted:
return { 'type': 'http.disconnect'}
if scope.get("content-length", False) or scope.get("transfer-encoding", False):
data = await res.get_data()
if data:
# all at once but could get in chunks
return {
'type': 'http.request',
'body': data.getvalue(),
'more_body': False
}
# no body, just empty
return EMPTY_RESPONSE
async def send(options):
if res.aborted: return False
type = options['type']
if type == 'http.response.start':
res.write_status(options.get('status', 200))
for header in options.get('headers', []):
res.write_header(header[0], header[1])
return True
if type == 'http.response.body':
if options.get('more_body', False):
res.write(options.get('body', ""))
else:
res.cork_end(options.get('body', ""))
return True
return False
#grab handler
res.grab_aborted_handler()
asyncio.ensure_future(app(scope, receive, send))
self.server.any("/*", asgi)
def ws_upgrade(res, req, socket_context):
PATH_INFO = req.get_url()
FULL_PATH_INFO = req.get_full_url()
headers = []
def filtered_headers(name, value):
if name != "sec-websocket-protocol":
headers.append((name.encode('utf8'), value.encode('utf8')))
req.for_each_header(filtered_headers)
key = req.get_header("sec-websocket-key")
protocol = req.get_header("sec-websocket-protocol")
extensions = req.get_header("sec-websocket-extensions")
ws = ASGIWebSocket(self.server.loop)
scope = {
'type': 'websocket',
'asgi': {
'version': '3.0',
'spec_version': '2.0'
},
'http_version': '1.1',
'server': (self.SERVER_HOST, self.SERVER_PORT),
'client': (res.get_remote_address(), None),
'scheme': self.SERVER_WS_SCHEME,
'method': req.get_method(),
'root_path': '',
'path': PATH_INFO,
'raw_path': PATH_INFO.encode('utf8'),
'query_string': FULL_PATH_INFO[len(PATH_INFO):].encode('utf8'),
'headers': headers,
'subprotocols': [protocol] if protocol else [],
'extensions': { 'websocket.publish': True, 'websocket.subscribe': True, 'websocket.unsubscribe': True }
}
server = self.server
async def send(options):
nonlocal ws, res, server
if res.aborted: return False
type = options['type']
if type == 'websocket.send':
bytes = options.get("bytes", None)
if ws.ws:
if bytes:
ws.ws.cork_send(bytes, OpCode.BINARY)
else:
ws.ws.cork_send(options.get('text', ''), OpCode.TEXT)
return True
return False
if type == 'websocket.accept': # upgrade!
for header in options.get('headers', []):
res.write_header(header[0], header[1])
future = ws.accept()
upgrade_protocol = options.get('subprotocol', protocol)
res.upgrade(key, upgrade_protocol if upgrade_protocol else "", extensions, socket_context, ws)
return await future
if type == 'websocket.close': # code and reason?
if ws.ws: ws.ws.close()
else: res.write_status(403).end_without_body()
return True
if type == 'websocket.publish': # publish extension
bytes = options.get("bytes", None)
if bytes:
server.publish(options.get('topic'), bytes)
else:
server.publish(options.get('topic'), options.get('text'), OpCode.TEXT)
return True
if type == 'websocket.subscribe': # subscribe extension
if ws.ws: ws.ws.subscribe(options.get('topic'))
else: res.write_status(403).end_without_body()
return True
if type == 'websocket.unsubscribe': # unsubscribe extension
if ws.ws: ws.ws.unsubscribe(options.get('topic'))
else: res.write_status(403).end_without_body()
return True
return False
#grab handler
res.grab_aborted_handler()
asyncio.ensure_future(app(scope, ws.receive, send))
self.server.ws("/*", {
"compression": CompressOptions.DISABLED,
"max_payload_length": 16 * 1024 * 1024,
"idle_timeout": 0,
"upgrade": ws_upgrade,
"open": lambda ws: ws.get_user_data().open(ws),
"message": lambda ws, msg, opcode: ws.get_user_data().message(ws, msg, opcode),
"close": lambda ws, code, message: ws.get_user_data().disconnect(code, message)
})
def listen(self, port_or_options, handler):
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
self.SERVER_HOST = "0.0.0.0" if isinstance(port_or_options, int) else port_or_options.host
self.server.listen(port_or_options, handler)
return self
def run(self):
self.server.run()
return self

Wyświetl plik

@ -0,0 +1,89 @@
import os
from socketify import App
from io import BytesIO
# Just an IDEA, must be implemented in native code (Cython or HPy), is really slow use this way
# re formatting headers is really slow and dummy, dict ops are really slow
class WSGI:
def __init__(self, app, options=None, request_response_factory_max_itens=0, websocket_factory_max_itens=0):
self.server = App(options, request_response_factory_max_itens, websocket_factory_max_itens)
self.SERVER_PORT = None
self.app = app
self.BASIC_ENVIRON = dict(os.environ)
def wsgi(res, req):
# create environ
PATH_INFO = req.get_url()
FULL_PATH_INFO = req.get_full_url()
environ = dict(self.BASIC_ENVIRON)
environ['REQUEST_METHOD'] = req.get_method()
environ['PATH_INFO'] = PATH_INFO
environ['QUERY_STRING'] = FULL_PATH_INFO[len(PATH_INFO):]
environ['REMOTE_ADDR'] = res.get_remote_address()
def start_response(status, headers):
res.write_status(status)
for (name, value) in headers:
res.write_header(name, value)
def set_http(name, value):
environ[f"HTTP_{name.replace('-', '_').upper()}"]=value
req.for_each_header(set_http)
#check for body
if environ.get("HTTP_CONTENT_LENGTH", False) or environ.get("HTTP_TRANSFER_ENCODING", False):
WSGI_INPUT = BytesIO()
environ['wsgi.input'] = WSGI_INPUT
def on_data(res, chunk, is_end):
if chunk:
WSGI_INPUT.write(chunk)
if is_end:
app_iter = self.app(environ, start_response)
try:
for data in app_iter:
res.write(data)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
res.end_without_body()
res.on_data(on_data)
else:
environ['wsgi.input'] = None
app_iter = self.app(environ, start_response)
try:
for data in app_iter:
res.write(data)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
res.end_without_body()
self.server.any("/*", wsgi)
def listen(self, port_or_options, handler):
self.SERVER_PORT = port_or_options if isinstance(port_or_options, int) else port_or_options.port
self.BASIC_ENVIRON.update({
'GATEWAY_INTERFACE': 'CGI/1.1',
'SERVER_PORT': str(self.SERVER_PORT),
'SERVER_SOFTWARE': 'WSGIServer/0.2',
'wsgi.input': None,
'wsgi.errors': None,
'wsgi.version': (1, 0),
'wsgi.run_once': False,
'wsgi.url_scheme': 'https' if self.server.options else 'http',
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.file_wrapper': None, # No file wrapper support for now
'SCRIPT_NAME': '',
'SERVER_PROTOCOL': 'HTTP/1.1',
'REMOTE_HOST': '',
})
self.server.listen(port_or_options, handler)
return self
def run(self):
self.server.run()
return self