import os import time import mimetypes import inspect from os import path mimetypes.init() # We have an version of this using aiofile and aiofiles # This is an sync version without any dependencies is normally much faster in CPython and PyPy3 # In production we highly recomend to use CDN like CloudFlare or/and NGINX or similar for static files async def sendfile(res, req, filename): # read headers before the first await if_modified_since = req.get_header("if-modified-since") range_header = req.get_header("range") bytes_range = None start = 0 end = -1 # parse range header if range_header: bytes_range = range_header.replace("bytes=", "").split("-") start = int(bytes_range[0]) if bytes_range[1]: end = int(bytes_range[1]) try: exists = path.exists(filename) # not found if not exists: return res.write_status(404).end(b"Not Found") # get size and last modified date stats = os.stat(filename) total_size = stats.st_size size = total_size last_modified = time.strftime( "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(stats.st_mtime) ) # check if modified since is provided if if_modified_since == last_modified: return res.write_status(304).end_without_body() # tells the broswer the last modified date res.write_header(b"Last-Modified", last_modified) # add content type (content_type, encoding) = mimetypes.guess_type(filename, strict=True) if content_type and encoding: res.write_header(b"Content-Type", "%s; %s" % (content_type, encoding)) elif content_type: res.write_header(b"Content-Type", content_type) with open(filename, "rb") as fd: # check range and support it if start > 0 or not end == -1: if end < 0 or end >= size: end = size - 1 size = end - start + 1 fd.seek(start) if start > total_size or size > total_size or size < 0 or start < 0: return res.write_status(416).end_without_body() res.write_status(206) else: end = size - 1 res.write_status(200) # tells the browser that we support range res.write_header(b"Accept-Ranges", b"bytes") res.write_header( b"Content-Range", "bytes %d-%d/%d" % (start, end, total_size) ) pending_size = size # keep sending until abort or done while not res.aborted: chunk_size = 16384 # 16kb chunks if chunk_size > pending_size: chunk_size = pending_size buffer = fd.read(chunk_size) pending_size = pending_size - chunk_size (ok, done) = await res.send_chunk(buffer, size) if not ok or done: # if cannot send probably aborted break except Exception as error: res.write_status(500).end("Internal Error") def in_directory(file, directory): # make both absolute directory = path.join(path.realpath(directory), "") file = path.realpath(file) # return true, if the common prefix of both is equal to directory # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b return path.commonprefix([file, directory]) == directory def static_route(app, route, directory): def route_handler(res, req): url = req.get_url() res.grab_aborted_handler() url = url[len(route) : :] if url.startswith("/"): url = url[1::] filename = path.join(path.realpath(directory), url) if not in_directory(filename, directory): res.write_status(404).end_without_body() return res.run_async(sendfile(res, req, filename)) if route.startswith("/"): route = route[1::] app.get("%s/*" % route, route_handler) def middleware(*functions): # we use Optional data=None at the end so you can use and middleware inside a middleware async def middleware_route(res, req, data=None): some_async_as_run = False # cicle to all middlewares for function in functions: # detect if is coroutine or not if inspect.iscoroutinefunction(function): # in async query string, arguments and headers are only valid until the first await if not some_async_as_run: # preserve queries, headers, parameters, url, full_url and method req.preserve() some_async_as_run = True data = await function(res, req, data) else: # call middlewares data = function(res, req, data) # stops if returns Falsy if not data: break return data return middleware_route class MiddlewareRouter: def __init__(self, app, *middlewares): self.app = app self.middlewares = middlewares def get(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.get(path, middleware(*middies)) return self def post(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.post(path, middleware(*middies)) return self def options(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.options(path, middleware(*middies)) return self def delete(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.delete(path, middleware(*middies)) return self def patch(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.patch(path, middleware(*middies)) return self def put(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.put(path, middleware(*middies)) return self def head(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.head(path, middleware(*middies)) return self def connect(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.connect(path, middleware(*middies)) return self def trace(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.trace(path, middleware(*middies)) return self def any(self, path, handler): middies = list(self.middlewares) middies.append(handler) self.app.any(path, middleware(*middies)) return self