kopia lustrzana https://github.com/cirospaciari/socketify.py
more tweaks, custom tasks for PyPy and cli doc
rodzic
4745dd29bc
commit
2713403faf
|
@ -0,0 +1,83 @@
|
|||
|
||||
# CLI Tools
|
||||
|
||||
You can use CLI tools to run ASGI, WSGI or socketify.py apps.
|
||||
```bash
|
||||
python3 -m socketify --help
|
||||
```
|
||||
```bash
|
||||
Usage: python -m socketify APP [OPTIONS]
|
||||
python3 -m socketify APP [OPTIONS]
|
||||
pypy3 -m socketify APP [OPTIONS]
|
||||
|
||||
Options:
|
||||
--help Show this Help
|
||||
--host or -h TEXT Bind socket to this host. [default:127.0.0.1]
|
||||
--port or -p INTEGER Bind socket to this port. [default: 8000]
|
||||
--workers or -w INTEGER Number of worker processes. Defaults to the WEB_CONCURRENCY
|
||||
environment variable if available, or 1
|
||||
|
||||
--uds TEXT Bind to a UNIX domain socket, this options disables --host or -h and --port or -p.
|
||||
--ws [auto|none|module:ws] If informed module:ws will auto detect to use socketify.py or ASGI websockets
|
||||
interface and disabled if informed none [default: auto]
|
||||
--ws-max-size INTEGER WebSocket max size message in bytes [default: 16777216]
|
||||
--ws-auto-ping BOOLEAN WebSocket auto ping sending [default: True]
|
||||
--ws-idle-timeout INT WebSocket idle timeout [default: 20]
|
||||
--ws-reset-idle-on-send BOOLEAN Reset WebSocket idle timeout on send [default: True]
|
||||
--ws-per-message-deflate BOOLEAN WebSocket per-message-deflate compression [default: True]
|
||||
--ws-max-lifetime INT Websocket maximum socket lifetime in seconds before forced closure, 0 to disable [default: 0]
|
||||
--ws-max-backpressure INT WebSocket maximum backpressure in bytes [default: 16777216]
|
||||
--ws-close-on-backpressure-limit BOOLEAN Close connections that hits maximum backpressure [default: False]
|
||||
--lifespan [auto|on|off] Lifespan implementation. [default: auto]
|
||||
--interface [auto|asgi|asgi3|wsgi|ssgi|socketify] Select ASGI (same as ASGI3), ASGI3, WSGI or SSGI as the application interface. [default: auto]
|
||||
--disable-listen-log BOOLEAN Disable log when start listenning [default: False]
|
||||
--version or -v Display the socketify.py version and exit.
|
||||
--ssl-keyfile TEXT SSL key file
|
||||
--ssl-certfile TEXT SSL certificate file
|
||||
--ssl-keyfile-password TEXT SSL keyfile password
|
||||
--ssl-ca-certs TEXT CA certificates file
|
||||
--ssl-ciphers TEXT Ciphers to use (see stdlib ssl module's) [default: TLSv1]
|
||||
--req-res-factory-maxitems INT Pre allocated instances of Response and Request objects for socketify interface [default: 0]
|
||||
--ws-factory-maxitems INT Pre allocated instances of WebSockets objects for socketify interface [default: 0]
|
||||
|
||||
Example:
|
||||
python3 -m socketify main:app -w 8 -p 8181
|
||||
|
||||
```
|
||||
|
||||
|
||||
Socketify apps requires you to pass an run function, the cli will create the instance for you
|
||||
|
||||
```python
|
||||
from socketify import App
|
||||
# App will be created by the cli with all things you want configured
|
||||
def run(app: App):
|
||||
# add your routes here
|
||||
app.get("/", lambda res, req: res.end("Hello World!"))
|
||||
```
|
||||
|
||||
|
||||
WebSockets can be in the same or another module, you can still use .ws("/*) to serve Websockets
|
||||
```bash
|
||||
python3 -m socketify hello_world_cli:run --ws hello_world_cli:websocket --port 8080 --workers 2
|
||||
```
|
||||
```python
|
||||
websocket = {
|
||||
"open": lambda ws: ws.send("Hello World!", OpCode.TEXT),
|
||||
"message": lambda ws, message, opcode: ws.send(message, opcode),
|
||||
"close": lambda ws, code, message: print("WebSocket closed"),
|
||||
}
|
||||
```
|
||||
|
||||
When running ASGI websocket will be served by default, but you can disabled it
|
||||
```bash
|
||||
python3 -m socketify falcon_asgi:app --ws none --port 8080 --workers 2
|
||||
```
|
||||
|
||||
When running WSGI or ASGI you can still use socketify.py or ASGI websockets in the same server, mixing all available methods
|
||||
You can use WSGI to more throughput in HTTP and use ASGI for websockets for example or you can use ASGI/WSGI for HTTP to keep compatibility and just re-write the websockets to use socketify interface with pub/sub and all features
|
||||
```bash
|
||||
python3 -m socketify falcon_wsgi:app --ws falcon:ws none --port 8080 --workers 2
|
||||
```
|
||||
|
||||
### Next [API Reference](api.md)
|
|
@ -0,0 +1,610 @@
|
|||
from asyncio import (
|
||||
base_futures,
|
||||
base_tasks,
|
||||
format_helpers,
|
||||
exceptions,
|
||||
futures,
|
||||
_register_task,
|
||||
_enter_task,
|
||||
_leave_task,
|
||||
_unregister_task,
|
||||
)
|
||||
import contextvars
|
||||
import sys
|
||||
import inspect
|
||||
import platform
|
||||
|
||||
is_pypy = platform.python_implementation() == "PyPy"
|
||||
|
||||
# GenericAlias
|
||||
if sys.version_info >= (3, 9):
|
||||
GenericAlias = type(list[int])
|
||||
else:
|
||||
GenericAlias = None
|
||||
|
||||
isfuture = base_futures.isfuture
|
||||
|
||||
|
||||
_PENDING = base_futures._PENDING
|
||||
_CANCELLED = base_futures._CANCELLED
|
||||
_FINISHED = base_futures._FINISHED
|
||||
|
||||
|
||||
class RequestTask:
|
||||
"""This class is *almost* compatible with concurrent.futures.Future.
|
||||
|
||||
|
||||
Differences:
|
||||
|
||||
- This class is only used by socketify.py loop.run_async
|
||||
|
||||
- This class is not thread-safe.
|
||||
|
||||
- result() and exception() do not take a timeout argument and
|
||||
raise an exception when the future isn't done yet.
|
||||
|
||||
- Callbacks registered with add_done_callback() are always called
|
||||
via the event loop's call_soon().
|
||||
|
||||
- This class is not compatible with the wait() and as_completed()
|
||||
methods in the concurrent.futures package.
|
||||
|
||||
(In Python 3.4 or later we may be able to unify the implementations.)
|
||||
"""
|
||||
|
||||
# Class variables serving as defaults for instance variables.
|
||||
_state = _PENDING
|
||||
_result = None
|
||||
_exception = None
|
||||
_loop = None
|
||||
_source_traceback = None
|
||||
_cancel_message = None
|
||||
# A saved CancelledError for later chaining as an exception context.
|
||||
_cancelled_exc = None
|
||||
|
||||
# This field is used for a dual purpose:
|
||||
# - Its presence is a marker to declare that a class implements
|
||||
# the Future protocol (i.e. is intended to be duck-type compatible).
|
||||
# The value must also be not-None, to enable a subclass to declare
|
||||
# that it is not compatible by setting this to None.
|
||||
# - It is set by __iter__() below so that Task._step() can tell
|
||||
# the difference between
|
||||
# `await Future()` or`yield from Future()` (correct) vs.
|
||||
# `yield Future()` (incorrect).
|
||||
_asyncio_future_blocking = False
|
||||
|
||||
__log_traceback = False
|
||||
|
||||
"""A coroutine wrapped in a Future."""
|
||||
# An important invariant maintained while a Task not done:
|
||||
#
|
||||
# - Either _fut_waiter is None, and _step() is scheduled;
|
||||
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
|
||||
#
|
||||
# The only transition from the latter to the former is through
|
||||
# _wakeup(). When _fut_waiter is not None, one of its callbacks
|
||||
# must be _wakeup().
|
||||
|
||||
# If False, don't log a message if the task is destroyed whereas its
|
||||
# status is still pending
|
||||
_log_destroy_pending = True
|
||||
|
||||
def __init__(self, coro, loop, default_done_callback=None, no_register=False, context=None):
|
||||
"""Initialize the future.
|
||||
|
||||
The optional event_loop argument allows explicitly setting the event
|
||||
loop object used by the future. If it's not provided, the future uses
|
||||
the default event loop.
|
||||
"""
|
||||
self._loop = loop
|
||||
self._name = "socketify.py-request-task" # fixed name for compatibility
|
||||
self._context = context if context else contextvars.copy_context()
|
||||
|
||||
if default_done_callback:
|
||||
self._callbacks = [(default_done_callback, self._context)]
|
||||
else:
|
||||
self._callbacks = []
|
||||
|
||||
self._num_cancels_requested = 0
|
||||
self._must_cancel = False
|
||||
self._fut_waiter = None
|
||||
self._coro = coro
|
||||
if not no_register:
|
||||
self._log_destroy_pending = False
|
||||
if self._loop.get_debug():
|
||||
self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
|
||||
self._loop.call_soon(self.__step, context=self._context)
|
||||
_register_task(self)
|
||||
|
||||
def _reuse(self, coro, loop, default_done_callback=None):
|
||||
"""Reuse an future that is not pending anymore."""
|
||||
self._state = _PENDING
|
||||
self._result = None
|
||||
self._exception = None
|
||||
self._source_traceback = None
|
||||
self._cancel_message = None
|
||||
self._cancelled_exc = None
|
||||
self._asyncio_future_blocking = False
|
||||
self._log_traceback = False
|
||||
_unregister_task(self)
|
||||
|
||||
self._loop = loop
|
||||
self._name = "socketify.py-request-task" # fixed name for compatibility
|
||||
self._context = contextvars.copy_context()
|
||||
|
||||
if default_done_callback:
|
||||
self._callbacks = [(default_done_callback, self._context)]
|
||||
else:
|
||||
self._callbacks = []
|
||||
|
||||
if self._loop.get_debug():
|
||||
self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
|
||||
|
||||
self._num_cancels_requested = 0
|
||||
self._must_cancel = False
|
||||
self._fut_waiter = None
|
||||
self._coro = coro
|
||||
|
||||
self._loop.call_soon(self.__step, context=self._context)
|
||||
_register_task(self)
|
||||
|
||||
def __repr__(self):
|
||||
return base_tasks._task_repr(self)
|
||||
|
||||
def __del__(self):
|
||||
|
||||
if self._state == _PENDING and self._log_destroy_pending:
|
||||
context = {
|
||||
"task": self,
|
||||
"message": "Task was destroyed but it is pending!",
|
||||
}
|
||||
if self._source_traceback:
|
||||
context["source_traceback"] = self._source_traceback
|
||||
self._loop.call_exception_handler(context)
|
||||
if not self.__log_traceback:
|
||||
# set_exception() was not called, or result() or exception()
|
||||
# has consumed the exception
|
||||
return
|
||||
exc = self._exception
|
||||
context = {
|
||||
"message": f"{self.__class__.__name__} exception was never retrieved",
|
||||
"exception": exc,
|
||||
"future": self,
|
||||
}
|
||||
if self._source_traceback:
|
||||
context["source_traceback"] = self._source_traceback
|
||||
self._loop.call_exception_handler(context)
|
||||
|
||||
__class_getitem__ = classmethod(GenericAlias)
|
||||
|
||||
def get_coro(self):
|
||||
return self._coro
|
||||
|
||||
def get_context(self):
|
||||
return self._context
|
||||
|
||||
def get_name(self):
|
||||
return self._name
|
||||
|
||||
def set_name(self, value):
|
||||
self._name = str(value)
|
||||
|
||||
def set_result(self, result):
|
||||
raise RuntimeError("Task does not support set_result operation")
|
||||
|
||||
def set_exception(self, exception):
|
||||
raise RuntimeError("Task does not support set_exception operation")
|
||||
|
||||
def get_stack(self, *, limit=None):
|
||||
"""Return the list of stack frames for this task's coroutine.
|
||||
|
||||
If the coroutine is not done, this returns the stack where it is
|
||||
suspended. If the coroutine has completed successfully or was
|
||||
cancelled, this returns an empty list. If the coroutine was
|
||||
terminated by an exception, this returns the list of traceback
|
||||
frames.
|
||||
|
||||
The frames are always ordered from oldest to newest.
|
||||
|
||||
The optional limit gives the maximum number of frames to
|
||||
return; by default all available frames are returned. Its
|
||||
meaning differs depending on whether a stack or a traceback is
|
||||
returned: the newest frames of a stack are returned, but the
|
||||
oldest frames of a traceback are returned. (This matches the
|
||||
behavior of the traceback module.)
|
||||
|
||||
For reasons beyond our control, only one stack frame is
|
||||
returned for a suspended coroutine.
|
||||
"""
|
||||
return base_tasks._task_get_stack(self, limit)
|
||||
|
||||
def print_stack(self, *, limit=None, file=None):
|
||||
"""Print the stack or traceback for this task's coroutine.
|
||||
|
||||
This produces output similar to that of the traceback module,
|
||||
for the frames retrieved by get_stack(). The limit argument
|
||||
is passed to get_stack(). The file argument is an I/O stream
|
||||
to which the output is written; by default output is written
|
||||
to sys.stderr.
|
||||
"""
|
||||
return base_tasks._task_print_stack(self, limit, file)
|
||||
|
||||
@property
|
||||
def _log_traceback(self):
|
||||
return self.__log_traceback
|
||||
|
||||
@_log_traceback.setter
|
||||
def _log_traceback(self, val):
|
||||
if val:
|
||||
raise ValueError("_log_traceback can only be set to False")
|
||||
self.__log_traceback = False
|
||||
|
||||
def get_loop(self):
|
||||
"""Return the event loop the Future is bound to."""
|
||||
loop = self._loop
|
||||
if loop is None:
|
||||
raise RuntimeError("Future object is not initialized.")
|
||||
return loop
|
||||
|
||||
def _make_cancelled_error(self):
|
||||
"""Create the CancelledError to raise if the Future is cancelled.
|
||||
|
||||
This should only be called once when handling a cancellation since
|
||||
it erases the saved context exception value.
|
||||
"""
|
||||
if self._cancelled_exc is not None:
|
||||
exc = self._cancelled_exc
|
||||
self._cancelled_exc = None
|
||||
return exc
|
||||
|
||||
if self._cancel_message is None:
|
||||
exc = exceptions.CancelledError()
|
||||
else:
|
||||
exc = exceptions.CancelledError(self._cancel_message)
|
||||
exc.__context__ = self._cancelled_exc
|
||||
# Remove the reference since we don't need this anymore.
|
||||
self._cancelled_exc = None
|
||||
return exc
|
||||
|
||||
def cancel(self, msg=None):
|
||||
"""Request that this task cancel itself.
|
||||
|
||||
This arranges for a CancelledError to be thrown into the
|
||||
wrapped coroutine on the next cycle through the event loop.
|
||||
The coroutine then has a chance to clean up or even deny
|
||||
the request using try/except/finally.
|
||||
|
||||
Unlike Future.cancel, this does not guarantee that the
|
||||
task will be cancelled: the exception might be caught and
|
||||
acted upon, delaying cancellation of the task or preventing
|
||||
cancellation completely. The task may also return a value or
|
||||
raise a different exception.
|
||||
|
||||
Immediately after this method is called, Task.cancelled() will
|
||||
not return True (unless the task was already cancelled). A
|
||||
task will be marked as cancelled when the wrapped coroutine
|
||||
terminates with a CancelledError exception (even if cancel()
|
||||
was not called).
|
||||
|
||||
This also increases the task's count of cancellation requests.
|
||||
"""
|
||||
self._log_traceback = False
|
||||
if self.done():
|
||||
return False
|
||||
self._num_cancels_requested += 1
|
||||
# These two lines are controversial. See discussion starting at
|
||||
# https://github.com/python/cpython/pull/31394#issuecomment-1053545331
|
||||
# Also remember that this is duplicated in _asynciomodule.c.
|
||||
# if self._num_cancels_requested > 1:
|
||||
# return False
|
||||
if self._fut_waiter is not None:
|
||||
if self._fut_waiter.cancel(msg=msg):
|
||||
# Leave self._fut_waiter; it may be a Task that
|
||||
# catches and ignores the cancellation so we may have
|
||||
# to cancel it again later.
|
||||
return True
|
||||
# It must be the case that self.__step is already scheduled.
|
||||
self._must_cancel = True
|
||||
self._cancel_message = msg
|
||||
return True
|
||||
|
||||
def _cancel(self, msg=None):
|
||||
"""Cancel the future and schedule callbacks.
|
||||
|
||||
If the future is already done or cancelled, return False. Otherwise,
|
||||
change the future's state to cancelled, schedule the callbacks and
|
||||
return True.
|
||||
"""
|
||||
self.__log_traceback = False
|
||||
if self._state != _PENDING:
|
||||
return False
|
||||
self._state = _CANCELLED
|
||||
self._cancel_message = msg
|
||||
self.__schedule_callbacks()
|
||||
return True
|
||||
|
||||
def cancelling(self):
|
||||
"""Return the count of the task's cancellation requests.
|
||||
|
||||
This count is incremented when .cancel() is called
|
||||
and may be decremented using .uncancel().
|
||||
"""
|
||||
return self._num_cancels_requested
|
||||
|
||||
def uncancel(self):
|
||||
"""Decrement the task's count of cancellation requests.
|
||||
|
||||
This should be called by the party that called `cancel()` on the task
|
||||
beforehand.
|
||||
|
||||
Returns the remaining number of cancellation requests.
|
||||
"""
|
||||
if self._num_cancels_requested > 0:
|
||||
self._num_cancels_requested -= 1
|
||||
return self._num_cancels_requested
|
||||
|
||||
def __schedule_callbacks(self):
|
||||
"""Internal: Ask the event loop to call all callbacks.
|
||||
|
||||
The callbacks are scheduled to be called as soon as possible. Also
|
||||
clears the callback list.
|
||||
"""
|
||||
callbacks = self._callbacks[:]
|
||||
if not callbacks:
|
||||
return
|
||||
|
||||
self._callbacks[:] = []
|
||||
for callback, ctx in callbacks:
|
||||
self._loop.call_soon(callback, self, context=ctx)
|
||||
|
||||
def cancelled(self):
|
||||
"""Return True if the future was cancelled."""
|
||||
return self._state == _CANCELLED
|
||||
|
||||
# Don't implement running(); see http://bugs.python.org/issue18699
|
||||
|
||||
def done(self):
|
||||
"""Return True if the future is done.
|
||||
|
||||
Done means either that a result / exception are available, or that the
|
||||
future was cancelled.
|
||||
"""
|
||||
return self._state != _PENDING
|
||||
|
||||
def result(self):
|
||||
"""Return the result this future represents.
|
||||
|
||||
If the future has been cancelled, raises CancelledError. If the
|
||||
future's result isn't yet available, raises InvalidStateError. If
|
||||
the future is done and has an exception set, this exception is raised.
|
||||
"""
|
||||
if self._state == _CANCELLED:
|
||||
exc = self._make_cancelled_error()
|
||||
raise exc
|
||||
if self._state != _FINISHED:
|
||||
raise exceptions.InvalidStateError("Result is not ready.")
|
||||
self.__log_traceback = False
|
||||
if self._exception is not None:
|
||||
raise self._exception.with_traceback(self._exception_tb)
|
||||
return self._result
|
||||
|
||||
def exception(self):
|
||||
"""Return the exception that was set on this future.
|
||||
|
||||
The exception (or None if no exception was set) is returned only if
|
||||
the future is done. If the future has been cancelled, raises
|
||||
CancelledError. If the future isn't done yet, raises
|
||||
InvalidStateError.
|
||||
"""
|
||||
if self._state == _CANCELLED:
|
||||
exc = self._make_cancelled_error()
|
||||
raise exc
|
||||
if self._state != _FINISHED:
|
||||
raise exceptions.InvalidStateError("Exception is not set.")
|
||||
self.__log_traceback = False
|
||||
return self._exception
|
||||
|
||||
def add_done_callback(self, fn, *, context=None):
|
||||
"""Add a callback to be run when the future becomes done.
|
||||
|
||||
The callback is called with a single argument - the future object. If
|
||||
the future is already done when this is called, the callback is
|
||||
scheduled with call_soon.
|
||||
"""
|
||||
if self._state != _PENDING:
|
||||
self._loop.call_soon(fn, self, context=context)
|
||||
else:
|
||||
if context is None:
|
||||
context = contextvars.copy_context()
|
||||
self._callbacks.append((fn, context))
|
||||
|
||||
# New method not in PEP 3148.
|
||||
|
||||
def remove_done_callback(self, fn):
|
||||
"""Remove all instances of a callback from the "call when done" list.
|
||||
|
||||
Returns the number of callbacks removed.
|
||||
"""
|
||||
filtered_callbacks = [(f, ctx) for (f, ctx) in self._callbacks if f != fn]
|
||||
removed_count = len(self._callbacks) - len(filtered_callbacks)
|
||||
if removed_count:
|
||||
self._callbacks[:] = filtered_callbacks
|
||||
return removed_count
|
||||
|
||||
# So-called internal methods (note: no set_running_or_notify_cancel()).
|
||||
|
||||
def _set_result(self, result):
|
||||
"""Mark the future done and set its result.
|
||||
|
||||
If the future is already done when this method is called, raises
|
||||
InvalidStateError.
|
||||
"""
|
||||
if self._state != _PENDING:
|
||||
raise exceptions.InvalidStateError(f"{self._state}: {self!r}")
|
||||
self._result = result
|
||||
self._state = _FINISHED
|
||||
self.__schedule_callbacks()
|
||||
|
||||
def _set_exception(self, exception):
|
||||
"""Mark the future done and set an exception.
|
||||
|
||||
If the future is already done when this method is called, raises
|
||||
InvalidStateError.
|
||||
"""
|
||||
if self._state != _PENDING:
|
||||
raise exceptions.InvalidStateError(f"{self._state}: {self!r}")
|
||||
if isinstance(exception, type):
|
||||
exception = exception()
|
||||
if type(exception) is StopIteration:
|
||||
raise TypeError(
|
||||
"StopIteration interacts badly with generators "
|
||||
"and cannot be raised into a Future"
|
||||
)
|
||||
self._exception = exception
|
||||
self._exception_tb = exception.__traceback__
|
||||
self._state = _FINISHED
|
||||
self.__schedule_callbacks()
|
||||
self.__log_traceback = True
|
||||
|
||||
def __await__(self):
|
||||
if not self.done():
|
||||
self._asyncio_future_blocking = True
|
||||
yield self # This tells Task to wait for completion.
|
||||
if not self.done():
|
||||
raise RuntimeError("await wasn't used with future")
|
||||
return self.result() # May raise too.
|
||||
|
||||
def __step(self, exc=None):
|
||||
if self.done():
|
||||
raise exceptions.InvalidStateError(
|
||||
f"_step(): already done: {self!r}, {exc!r}"
|
||||
)
|
||||
if self._must_cancel:
|
||||
if not isinstance(exc, exceptions.CancelledError):
|
||||
exc = self._make_cancelled_error()
|
||||
self._must_cancel = False
|
||||
coro = self._coro
|
||||
self._fut_waiter = None
|
||||
|
||||
_enter_task(self._loop, self)
|
||||
# Call either coro.throw(exc) or coro.send(None).
|
||||
try:
|
||||
if exc is None:
|
||||
# We use the `send` method directly, because coroutines
|
||||
# don't have `__iter__` and `__next__` methods.
|
||||
result = coro.send(None)
|
||||
else:
|
||||
result = coro.throw(exc)
|
||||
except StopIteration as exc:
|
||||
if self._must_cancel:
|
||||
# Task is cancelled right before coro stops.
|
||||
self._must_cancel = False
|
||||
self._cancel(msg=self._cancel_message)
|
||||
else:
|
||||
self._set_result(exc.value)
|
||||
except exceptions.CancelledError as exc:
|
||||
# Save the original exception so we can chain it later.
|
||||
self._cancelled_exc = exc
|
||||
self._cancel() # I.e., Future.cancel(self).
|
||||
except (KeyboardInterrupt, SystemExit) as exc:
|
||||
self._set_exception(exc)
|
||||
raise
|
||||
except BaseException as exc:
|
||||
self._set_exception(exc)
|
||||
else:
|
||||
blocking = getattr(result, "_asyncio_future_blocking", None)
|
||||
if blocking is not None:
|
||||
# Yielded Future must come from Future.__iter__().
|
||||
if futures._get_loop(result) is not self._loop:
|
||||
new_exc = RuntimeError(
|
||||
f"Task {self!r} got Future "
|
||||
f"{result!r} attached to a different loop"
|
||||
)
|
||||
self._loop.call_soon(self.__step, new_exc, context=self._context)
|
||||
elif blocking:
|
||||
if result is self:
|
||||
new_exc = RuntimeError(f"Task cannot await on itself: {self!r}")
|
||||
self._loop.call_soon(
|
||||
self.__step, new_exc, context=self._context
|
||||
)
|
||||
else:
|
||||
result._asyncio_future_blocking = False
|
||||
result.add_done_callback(self.__wakeup, context=self._context)
|
||||
self._fut_waiter = result
|
||||
if self._must_cancel:
|
||||
if self._fut_waiter.cancel(msg=self._cancel_message):
|
||||
self._must_cancel = False
|
||||
else:
|
||||
new_exc = RuntimeError(
|
||||
f"yield was used instead of yield from "
|
||||
f"in task {self!r} with {result!r}"
|
||||
)
|
||||
self._loop.call_soon(self.__step, new_exc, context=self._context)
|
||||
|
||||
elif result is None:
|
||||
# Bare yield relinquishes control for one event loop iteration.
|
||||
self._loop.call_soon(self.__step, context=self._context)
|
||||
elif inspect.isgenerator(result):
|
||||
# Yielding a generator is just wrong.
|
||||
new_exc = RuntimeError(
|
||||
f"yield was used instead of yield from for "
|
||||
f"generator in task {self!r} with {result!r}"
|
||||
)
|
||||
self._loop.call_soon(self.__step, new_exc, context=self._context)
|
||||
else:
|
||||
# Yielding something else is an error.
|
||||
new_exc = RuntimeError(f"Task got bad yield: {result!r}")
|
||||
self._loop.call_soon(self.__step, new_exc, context=self._context)
|
||||
finally:
|
||||
_leave_task(self._loop, self)
|
||||
self = None # Needed to break cycles when an exception occurs.
|
||||
|
||||
def __wakeup(self, future):
|
||||
try:
|
||||
future.result()
|
||||
except BaseException as exc:
|
||||
# This may also be a cancellation.
|
||||
self.__step(exc)
|
||||
else:
|
||||
# Don't pass the value of `future.result()` explicitly,
|
||||
# as `Future.__iter__` and `Future.__await__` don't need it.
|
||||
# If we call `_step(value, None)` instead of `_step()`,
|
||||
# Python eval loop would use `.send(value)` method call,
|
||||
# instead of `__next__()`, which is slower for futures
|
||||
# that return non-generator iterators from their `__iter__`.
|
||||
self.__step()
|
||||
self = None # Needed to break cycles when an exception occurs.
|
||||
|
||||
__iter__ = __await__ # make compatible with 'yield from'.
|
||||
|
||||
|
||||
def create_task_with_factory(task_factory_max_items=100_000):
|
||||
items = []
|
||||
for _ in range(0, task_factory_max_items):
|
||||
task = RequestTask(None, None, None, True)
|
||||
if task._source_traceback:
|
||||
del task._source_traceback[-1]
|
||||
items.append(task)
|
||||
|
||||
def factory(loop, coro, default_done_callback=None):
|
||||
if len(items) == 0:
|
||||
return create_task(loop, coro, default_done_callback)
|
||||
task = items.pop()
|
||||
def done(f):
|
||||
if default_done_callback is not None:
|
||||
default_done_callback(f)
|
||||
items.append(f)
|
||||
task._reuse(coro, loop, done)
|
||||
return task
|
||||
|
||||
return factory
|
||||
|
||||
|
||||
def create_task(loop, coro, default_done_callback=None, context=None):
|
||||
"""Schedule a coroutine object.
|
||||
Return a task object.
|
||||
"""
|
||||
task = RequestTask(coro, loop, default_done_callback, context=context)
|
||||
if task._source_traceback:
|
||||
del task._source_traceback[-1]
|
||||
return task
|
Ładowanie…
Reference in New Issue