kopia lustrzana https://github.com/micropython/micropython-lib
223 wiersze
7.0 KiB
Python
223 wiersze
7.0 KiB
Python
![]() |
"""Utilities for async with-statement contexts.
|
||
|
|
||
|
Based on original source code: https://github.com/python/cpython/blob/3.11/Lib/contextlib.py
|
||
|
|
||
|
"""
|
||
|
|
||
|
|
||
|
class ContextDecorator(object):
|
||
|
"A base class or mixin that enables context managers to work as decorators."
|
||
|
|
||
|
def _recreate_cm(self):
|
||
|
"""Return a recreated instance of self.
|
||
|
|
||
|
Allows an otherwise one-shot context manager like
|
||
|
_GeneratorContextManager to support use as
|
||
|
a decorator via implicit recreation.
|
||
|
|
||
|
This is a private interface just for _GeneratorContextManager.
|
||
|
See issue #11647 for details.
|
||
|
"""
|
||
|
return self
|
||
|
|
||
|
def __call__(self, func):
|
||
|
def inner(*args, **kwds):
|
||
|
with self._recreate_cm():
|
||
|
return func(*args, **kwds)
|
||
|
|
||
|
return inner
|
||
|
|
||
|
|
||
|
class _GeneratorContextManager(ContextDecorator):
|
||
|
"""Helper for @contextmanager decorator."""
|
||
|
|
||
|
def __init__(self, func, *args, **kwds):
|
||
|
self.gen = func(*args, **kwds)
|
||
|
self.func, self.args, self.kwds = func, args, kwds
|
||
|
|
||
|
def _recreate_cm(self):
|
||
|
# _GCM instances are one-shot context managers, so the
|
||
|
# CM must be recreated each time a decorated function is
|
||
|
# called
|
||
|
return self.__class__(self.func, *self.args, **self.kwds)
|
||
|
|
||
|
def __enter__(self):
|
||
|
try:
|
||
|
return next(self.gen)
|
||
|
except StopIteration:
|
||
|
raise RuntimeError("generator didn't yield") from None
|
||
|
|
||
|
def __exit__(self, type, value, traceback):
|
||
|
if type is None:
|
||
|
try:
|
||
|
next(self.gen)
|
||
|
except StopIteration:
|
||
|
return
|
||
|
else:
|
||
|
raise RuntimeError("generator didn't stop")
|
||
|
else:
|
||
|
if value is None:
|
||
|
# Need to force instantiation so we can reliably
|
||
|
# tell if we get the same exception back
|
||
|
value = type()
|
||
|
try:
|
||
|
self.gen.throw(type, value, traceback)
|
||
|
raise RuntimeError("generator didn't stop after throw()")
|
||
|
except StopIteration as exc:
|
||
|
# Suppress the exception *unless* it's the same exception that
|
||
|
# was passed to throw(). This prevents a StopIteration
|
||
|
# raised inside the "with" statement from being suppressed
|
||
|
return exc is not value
|
||
|
|
||
|
|
||
|
def contextmanager(func):
|
||
|
"""@contextmanager decorator.
|
||
|
|
||
|
Typical usage:
|
||
|
|
||
|
@contextmanager
|
||
|
def some_generator(<arguments>):
|
||
|
<setup>
|
||
|
try:
|
||
|
yield <value>
|
||
|
finally:
|
||
|
<cleanup>
|
||
|
|
||
|
This makes this:
|
||
|
|
||
|
with some_generator(<arguments>) as <variable>:
|
||
|
<body>
|
||
|
|
||
|
equivalent to this:
|
||
|
|
||
|
<setup>
|
||
|
try:
|
||
|
<variable> = <value>
|
||
|
<body>
|
||
|
finally:
|
||
|
<cleanup>
|
||
|
|
||
|
"""
|
||
|
|
||
|
def helper(*args, **kwds):
|
||
|
return _GeneratorContextManager(func, *args, **kwds)
|
||
|
|
||
|
return helper
|
||
|
|
||
|
|
||
|
class AsyncContextDecorator(object):
|
||
|
"A base class or mixin that enables async context managers to work as decorators."
|
||
|
|
||
|
def _recreate_cm(self):
|
||
|
"""Return a recreated instance of self."""
|
||
|
return self
|
||
|
|
||
|
def __call__(self, func):
|
||
|
async def inner(*args, **kwds):
|
||
|
async with self._recreate_cm():
|
||
|
return await func(*args, **kwds)
|
||
|
|
||
|
return inner
|
||
|
|
||
|
|
||
|
class _AsyncGeneratorContextManager(AsyncContextDecorator):
|
||
|
"""Helper for @asynccontextmanager decorator."""
|
||
|
|
||
|
def __init__(self, func, args, kwds):
|
||
|
self.gen = func(*args, **kwds)
|
||
|
self.func, self.args, self.kwds = func, args, kwds
|
||
|
|
||
|
async def __aenter__(self):
|
||
|
# do not keep args and kwds alive unnecessarily
|
||
|
# they are only needed for recreation, which is not possible anymore
|
||
|
del self.args, self.kwds, self.func
|
||
|
try:
|
||
|
return await anext(self.gen)
|
||
|
except StopAsyncIteration:
|
||
|
raise RuntimeError("generator didn't yield") from None
|
||
|
|
||
|
async def __aexit__(self, typ, value, traceback):
|
||
|
if typ is None:
|
||
|
try:
|
||
|
await anext(self.gen)
|
||
|
except StopAsyncIteration:
|
||
|
return False
|
||
|
else:
|
||
|
raise RuntimeError("generator didn't stop")
|
||
|
else:
|
||
|
if value is None:
|
||
|
# Need to force instantiation so we can reliably
|
||
|
# tell if we get the same exception back
|
||
|
value = typ()
|
||
|
try:
|
||
|
await self.gen.athrow(typ, value, traceback)
|
||
|
except StopAsyncIteration as exc:
|
||
|
# Suppress StopIteration *unless* it's the same exception that
|
||
|
# was passed to throw(). This prevents a StopIteration
|
||
|
# raised inside the "with" statement from being suppressed.
|
||
|
return exc is not value
|
||
|
except RuntimeError as exc:
|
||
|
# Don't re-raise the passed in exception. (issue27122)
|
||
|
if exc is value:
|
||
|
exc.__traceback__ = traceback
|
||
|
return False
|
||
|
# Avoid suppressing if a Stop(Async)Iteration exception
|
||
|
# was passed to athrow() and later wrapped into a RuntimeError
|
||
|
# (see PEP 479 for sync generators; async generators also
|
||
|
# have this behavior). But do this only if the exception wrapped
|
||
|
# by the RuntimeError is actually Stop(Async)Iteration (see
|
||
|
# issue29692).
|
||
|
if (
|
||
|
isinstance(value, (StopIteration, StopAsyncIteration))
|
||
|
and exc.__cause__ is value
|
||
|
):
|
||
|
value.__traceback__ = traceback
|
||
|
return False
|
||
|
raise
|
||
|
except BaseException as exc:
|
||
|
# only re-raise if it's *not* the exception that was
|
||
|
# passed to throw(), because __exit__() must not raise
|
||
|
# an exception unless __exit__() itself failed. But throw()
|
||
|
# has to raise the exception to signal propagation, so this
|
||
|
# fixes the impedance mismatch between the throw() protocol
|
||
|
# and the __exit__() protocol.
|
||
|
if exc is not value:
|
||
|
raise
|
||
|
exc.__traceback__ = traceback
|
||
|
return False
|
||
|
raise RuntimeError("generator didn't stop after athrow()")
|
||
|
|
||
|
|
||
|
def asynccontextmanager(func):
|
||
|
"""@asynccontextmanager decorator.
|
||
|
|
||
|
Typical usage:
|
||
|
|
||
|
@asynccontextmanager
|
||
|
async def some_async_generator(<arguments>):
|
||
|
<setup>
|
||
|
try:
|
||
|
yield <value>
|
||
|
finally:
|
||
|
<cleanup>
|
||
|
|
||
|
This makes this:
|
||
|
|
||
|
async with some_async_generator(<arguments>) as <variable>:
|
||
|
<body>
|
||
|
|
||
|
equivalent to this:
|
||
|
|
||
|
<setup>
|
||
|
try:
|
||
|
<variable> = <value>
|
||
|
<body>
|
||
|
finally:
|
||
|
<cleanup>
|
||
|
"""
|
||
|
|
||
|
def helper(*args, **kwds):
|
||
|
return _AsyncGeneratorContextManager(func, args, kwds)
|
||
|
|
||
|
return helper
|