2019-04-21 05:28:15 +00:00
|
|
|
import asyncio
|
|
|
|
from contextlib import contextmanager
|
2022-02-05 05:19:49 +00:00
|
|
|
from contextvars import ContextVar
|
2021-06-05 21:49:16 +00:00
|
|
|
from markupsafe import escape
|
2019-04-21 05:28:15 +00:00
|
|
|
import time
|
2019-06-24 03:13:09 +00:00
|
|
|
import json
|
2019-05-11 19:06:22 +00:00
|
|
|
import traceback
|
2019-04-21 05:28:15 +00:00
|
|
|
|
|
|
|
tracers = {}
|
|
|
|
|
2019-05-11 19:06:22 +00:00
|
|
|
TRACE_RESERVED_KEYS = {"type", "start", "end", "duration_ms", "traceback"}
|
|
|
|
|
2022-02-05 05:19:49 +00:00
|
|
|
trace_task_id = ContextVar("trace_task_id", default=None)
|
2019-11-11 03:45:34 +00:00
|
|
|
|
|
|
|
|
2019-04-21 05:28:15 +00:00
|
|
|
def get_task_id():
|
2022-02-05 05:19:49 +00:00
|
|
|
current = trace_task_id.get(None)
|
|
|
|
if current is not None:
|
|
|
|
return current
|
2019-04-21 05:28:15 +00:00
|
|
|
try:
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
except RuntimeError:
|
|
|
|
return None
|
2022-02-05 05:19:49 +00:00
|
|
|
return id(asyncio.current_task(loop=loop))
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
def trace_child_tasks():
|
|
|
|
token = trace_task_id.set(get_task_id())
|
|
|
|
yield
|
|
|
|
trace_task_id.reset(token)
|
2019-04-21 05:28:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
2019-05-11 19:06:22 +00:00
|
|
|
def trace(type, **kwargs):
|
|
|
|
assert not TRACE_RESERVED_KEYS.intersection(
|
|
|
|
kwargs.keys()
|
2020-11-15 23:24:22 +00:00
|
|
|
), f".trace() keyword parameters cannot include {TRACE_RESERVED_KEYS}"
|
2019-04-21 05:28:15 +00:00
|
|
|
task_id = get_task_id()
|
|
|
|
if task_id is None:
|
2021-12-19 20:30:34 +00:00
|
|
|
yield kwargs
|
2019-04-21 05:28:15 +00:00
|
|
|
return
|
|
|
|
tracer = tracers.get(task_id)
|
|
|
|
if tracer is None:
|
2021-12-19 20:30:34 +00:00
|
|
|
yield kwargs
|
2019-04-21 05:28:15 +00:00
|
|
|
return
|
2020-12-21 21:49:14 +00:00
|
|
|
start = time.perf_counter()
|
2021-12-19 20:30:34 +00:00
|
|
|
yield kwargs
|
2020-12-21 21:49:14 +00:00
|
|
|
end = time.perf_counter()
|
2019-06-24 03:13:09 +00:00
|
|
|
trace_info = {
|
2019-05-11 19:06:22 +00:00
|
|
|
"type": type,
|
|
|
|
"start": start,
|
|
|
|
"end": end,
|
|
|
|
"duration_ms": (end - start) * 1000,
|
|
|
|
"traceback": traceback.format_list(traceback.extract_stack(limit=6)[:-3]),
|
|
|
|
}
|
2019-06-24 03:13:09 +00:00
|
|
|
trace_info.update(kwargs)
|
|
|
|
tracer.append(trace_info)
|
2019-04-21 05:28:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
def capture_traces(tracer):
|
|
|
|
# tracer is a list
|
|
|
|
task_id = get_task_id()
|
|
|
|
if task_id is None:
|
|
|
|
yield
|
|
|
|
return
|
|
|
|
tracers[task_id] = tracer
|
|
|
|
yield
|
|
|
|
del tracers[task_id]
|
2019-06-24 03:13:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
class AsgiTracer:
|
|
|
|
# If the body is larger than this we don't attempt to append the trace
|
|
|
|
max_body_bytes = 1024 * 256 # 256 KB
|
|
|
|
|
|
|
|
def __init__(self, app):
|
|
|
|
self.app = app
|
|
|
|
|
|
|
|
async def __call__(self, scope, receive, send):
|
|
|
|
if b"_trace=1" not in scope.get("query_string", b"").split(b"&"):
|
|
|
|
await self.app(scope, receive, send)
|
|
|
|
return
|
2020-12-21 21:49:14 +00:00
|
|
|
trace_start = time.perf_counter()
|
2019-06-24 03:13:09 +00:00
|
|
|
traces = []
|
|
|
|
|
|
|
|
accumulated_body = b""
|
|
|
|
size_limit_exceeded = False
|
|
|
|
response_headers = []
|
|
|
|
|
|
|
|
async def wrapped_send(message):
|
|
|
|
nonlocal accumulated_body, size_limit_exceeded, response_headers
|
|
|
|
if message["type"] == "http.response.start":
|
|
|
|
response_headers = message["headers"]
|
|
|
|
await send(message)
|
|
|
|
return
|
|
|
|
|
|
|
|
if message["type"] != "http.response.body" or size_limit_exceeded:
|
|
|
|
await send(message)
|
|
|
|
return
|
|
|
|
|
|
|
|
# Accumulate body until the end or until size is exceeded
|
|
|
|
accumulated_body += message["body"]
|
|
|
|
if len(accumulated_body) > self.max_body_bytes:
|
|
|
|
await send(
|
|
|
|
{
|
|
|
|
"type": "http.response.body",
|
|
|
|
"body": accumulated_body,
|
|
|
|
"more_body": True,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
size_limit_exceeded = True
|
|
|
|
return
|
|
|
|
|
|
|
|
if not message.get("more_body"):
|
|
|
|
# We have all the body - modify it and send the result
|
|
|
|
# TODO: What to do about Content-Type or other cases?
|
|
|
|
trace_info = {
|
2020-12-21 21:49:14 +00:00
|
|
|
"request_duration_ms": 1000 * (time.perf_counter() - trace_start),
|
2019-06-24 03:13:09 +00:00
|
|
|
"sum_trace_duration_ms": sum(t["duration_ms"] for t in traces),
|
|
|
|
"num_traces": len(traces),
|
|
|
|
"traces": traces,
|
|
|
|
}
|
|
|
|
try:
|
|
|
|
content_type = [
|
|
|
|
v.decode("utf8")
|
|
|
|
for k, v in response_headers
|
|
|
|
if k.lower() == b"content-type"
|
|
|
|
][0]
|
|
|
|
except IndexError:
|
|
|
|
content_type = ""
|
|
|
|
if "text/html" in content_type and b"</body>" in accumulated_body:
|
2021-06-05 21:49:16 +00:00
|
|
|
extra = escape(json.dumps(trace_info, indent=2))
|
2020-11-15 23:24:22 +00:00
|
|
|
extra_html = f"<pre>{extra}</pre></body>".encode("utf8")
|
2019-06-24 03:13:09 +00:00
|
|
|
accumulated_body = accumulated_body.replace(b"</body>", extra_html)
|
|
|
|
elif "json" in content_type and accumulated_body.startswith(b"{"):
|
|
|
|
data = json.loads(accumulated_body.decode("utf8"))
|
|
|
|
if "_trace" not in data:
|
|
|
|
data["_trace"] = trace_info
|
|
|
|
accumulated_body = json.dumps(data).encode("utf8")
|
|
|
|
await send({"type": "http.response.body", "body": accumulated_body})
|
|
|
|
|
|
|
|
with capture_traces(traces):
|
|
|
|
await self.app(scope, receive, wrapped_send)
|