kopia lustrzana https://github.com/micropython/micropython-lib
contextlib: Add contextlib from cpython 3.4.2.
rodzic
8bed308f41
commit
5557382b5e
|
@ -0,0 +1,340 @@
|
||||||
|
"""Utilities for with-statement contexts. See PEP 343."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from collections import deque
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack",
|
||||||
|
"redirect_stdout", "suppress"]
|
||||||
|
|
||||||
|
|
||||||
|
class ContextDecorator(object):
|
||||||
|
"A base class or mixin that enables context managers to work as decorators."
|
||||||
|
|
||||||
|
def _recreate_cm(self):
|
||||||
|
"""Return a recreated instance of self.
|
||||||
|
|
||||||
|
Allows an otherwise one-shot context manager like
|
||||||
|
_GeneratorContextManager to support use as
|
||||||
|
a decorator via implicit recreation.
|
||||||
|
|
||||||
|
This is a private interface just for _GeneratorContextManager.
|
||||||
|
See issue #11647 for details.
|
||||||
|
"""
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __call__(self, func):
|
||||||
|
@wraps(func)
|
||||||
|
def inner(*args, **kwds):
|
||||||
|
with self._recreate_cm():
|
||||||
|
return func(*args, **kwds)
|
||||||
|
return inner
|
||||||
|
|
||||||
|
|
||||||
|
class _GeneratorContextManager(ContextDecorator):
|
||||||
|
"""Helper for @contextmanager decorator."""
|
||||||
|
|
||||||
|
def __init__(self, func, *args, **kwds):
|
||||||
|
self.gen = func(*args, **kwds)
|
||||||
|
self.func, self.args, self.kwds = func, args, kwds
|
||||||
|
# Issue 19330: ensure context manager instances have good docstrings
|
||||||
|
doc = getattr(func, "__doc__", None)
|
||||||
|
if doc is None:
|
||||||
|
doc = type(self).__doc__
|
||||||
|
self.__doc__ = doc
|
||||||
|
# Unfortunately, this still doesn't provide good help output when
|
||||||
|
# inspecting the created context manager instances, since pydoc
|
||||||
|
# currently bypasses the instance docstring and shows the docstring
|
||||||
|
# for the class instead.
|
||||||
|
# See http://bugs.python.org/issue19404 for more details.
|
||||||
|
|
||||||
|
def _recreate_cm(self):
|
||||||
|
# _GCM instances are one-shot context managers, so the
|
||||||
|
# CM must be recreated each time a decorated function is
|
||||||
|
# called
|
||||||
|
return self.__class__(self.func, *self.args, **self.kwds)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
try:
|
||||||
|
return next(self.gen)
|
||||||
|
except StopIteration:
|
||||||
|
raise RuntimeError("generator didn't yield") from None
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
if type is None:
|
||||||
|
try:
|
||||||
|
next(self.gen)
|
||||||
|
except StopIteration:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise RuntimeError("generator didn't stop")
|
||||||
|
else:
|
||||||
|
if value is None:
|
||||||
|
# Need to force instantiation so we can reliably
|
||||||
|
# tell if we get the same exception back
|
||||||
|
value = type()
|
||||||
|
try:
|
||||||
|
self.gen.throw(type, value, traceback)
|
||||||
|
raise RuntimeError("generator didn't stop after throw()")
|
||||||
|
except StopIteration as exc:
|
||||||
|
# Suppress the exception *unless* it's the same exception that
|
||||||
|
# was passed to throw(). This prevents a StopIteration
|
||||||
|
# raised inside the "with" statement from being suppressed
|
||||||
|
return exc is not value
|
||||||
|
except:
|
||||||
|
# only re-raise if it's *not* the exception that was
|
||||||
|
# passed to throw(), because __exit__() must not raise
|
||||||
|
# an exception unless __exit__() itself failed. But throw()
|
||||||
|
# has to raise the exception to signal propagation, so this
|
||||||
|
# fixes the impedance mismatch between the throw() protocol
|
||||||
|
# and the __exit__() protocol.
|
||||||
|
#
|
||||||
|
if sys.exc_info()[1] is not value:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def contextmanager(func):
|
||||||
|
"""@contextmanager decorator.
|
||||||
|
|
||||||
|
Typical usage:
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def some_generator(<arguments>):
|
||||||
|
<setup>
|
||||||
|
try:
|
||||||
|
yield <value>
|
||||||
|
finally:
|
||||||
|
<cleanup>
|
||||||
|
|
||||||
|
This makes this:
|
||||||
|
|
||||||
|
with some_generator(<arguments>) as <variable>:
|
||||||
|
<body>
|
||||||
|
|
||||||
|
equivalent to this:
|
||||||
|
|
||||||
|
<setup>
|
||||||
|
try:
|
||||||
|
<variable> = <value>
|
||||||
|
<body>
|
||||||
|
finally:
|
||||||
|
<cleanup>
|
||||||
|
|
||||||
|
"""
|
||||||
|
@wraps(func)
|
||||||
|
def helper(*args, **kwds):
|
||||||
|
return _GeneratorContextManager(func, *args, **kwds)
|
||||||
|
return helper
|
||||||
|
|
||||||
|
|
||||||
|
class closing(object):
|
||||||
|
"""Context to automatically close something at the end of a block.
|
||||||
|
|
||||||
|
Code like this:
|
||||||
|
|
||||||
|
with closing(<module>.open(<arguments>)) as f:
|
||||||
|
<block>
|
||||||
|
|
||||||
|
is equivalent to this:
|
||||||
|
|
||||||
|
f = <module>.open(<arguments>)
|
||||||
|
try:
|
||||||
|
<block>
|
||||||
|
finally:
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self, thing):
|
||||||
|
self.thing = thing
|
||||||
|
def __enter__(self):
|
||||||
|
return self.thing
|
||||||
|
def __exit__(self, *exc_info):
|
||||||
|
self.thing.close()
|
||||||
|
|
||||||
|
class redirect_stdout:
|
||||||
|
"""Context manager for temporarily redirecting stdout to another file
|
||||||
|
|
||||||
|
# How to send help() to stderr
|
||||||
|
with redirect_stdout(sys.stderr):
|
||||||
|
help(dir)
|
||||||
|
|
||||||
|
# How to write help() to a file
|
||||||
|
with open('help.txt', 'w') as f:
|
||||||
|
with redirect_stdout(f):
|
||||||
|
help(pow)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, new_target):
|
||||||
|
self._new_target = new_target
|
||||||
|
# We use a list of old targets to make this CM re-entrant
|
||||||
|
self._old_targets = []
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self._old_targets.append(sys.stdout)
|
||||||
|
sys.stdout = self._new_target
|
||||||
|
return self._new_target
|
||||||
|
|
||||||
|
def __exit__(self, exctype, excinst, exctb):
|
||||||
|
sys.stdout = self._old_targets.pop()
|
||||||
|
|
||||||
|
|
||||||
|
class suppress:
|
||||||
|
"""Context manager to suppress specified exceptions
|
||||||
|
|
||||||
|
After the exception is suppressed, execution proceeds with the next
|
||||||
|
statement following the with statement.
|
||||||
|
|
||||||
|
with suppress(FileNotFoundError):
|
||||||
|
os.remove(somefile)
|
||||||
|
# Execution still resumes here if the file was already removed
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *exceptions):
|
||||||
|
self._exceptions = exceptions
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, exctype, excinst, exctb):
|
||||||
|
# Unlike isinstance and issubclass, CPython exception handling
|
||||||
|
# currently only looks at the concrete type hierarchy (ignoring
|
||||||
|
# the instance and subclass checking hooks). While Guido considers
|
||||||
|
# that a bug rather than a feature, it's a fairly hard one to fix
|
||||||
|
# due to various internal implementation details. suppress provides
|
||||||
|
# the simpler issubclass based semantics, rather than trying to
|
||||||
|
# exactly reproduce the limitations of the CPython interpreter.
|
||||||
|
#
|
||||||
|
# See http://bugs.python.org/issue12029 for more details
|
||||||
|
return exctype is not None and issubclass(exctype, self._exceptions)
|
||||||
|
|
||||||
|
|
||||||
|
# Inspired by discussions on http://bugs.python.org/issue13585
|
||||||
|
class ExitStack(object):
|
||||||
|
"""Context manager for dynamic management of a stack of exit callbacks
|
||||||
|
|
||||||
|
For example:
|
||||||
|
|
||||||
|
with ExitStack() as stack:
|
||||||
|
files = [stack.enter_context(open(fname)) for fname in filenames]
|
||||||
|
# All opened files will automatically be closed at the end of
|
||||||
|
# the with statement, even if attempts to open files later
|
||||||
|
# in the list raise an exception
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self._exit_callbacks = deque()
|
||||||
|
|
||||||
|
def pop_all(self):
|
||||||
|
"""Preserve the context stack by transferring it to a new instance"""
|
||||||
|
new_stack = type(self)()
|
||||||
|
new_stack._exit_callbacks = self._exit_callbacks
|
||||||
|
self._exit_callbacks = deque()
|
||||||
|
return new_stack
|
||||||
|
|
||||||
|
def _push_cm_exit(self, cm, cm_exit):
|
||||||
|
"""Helper to correctly register callbacks to __exit__ methods"""
|
||||||
|
def _exit_wrapper(*exc_details):
|
||||||
|
return cm_exit(cm, *exc_details)
|
||||||
|
_exit_wrapper.__self__ = cm
|
||||||
|
self.push(_exit_wrapper)
|
||||||
|
|
||||||
|
def push(self, exit):
|
||||||
|
"""Registers a callback with the standard __exit__ method signature
|
||||||
|
|
||||||
|
Can suppress exceptions the same way __exit__ methods can.
|
||||||
|
|
||||||
|
Also accepts any object with an __exit__ method (registering a call
|
||||||
|
to the method instead of the object itself)
|
||||||
|
"""
|
||||||
|
# We use an unbound method rather than a bound method to follow
|
||||||
|
# the standard lookup behaviour for special methods
|
||||||
|
_cb_type = type(exit)
|
||||||
|
try:
|
||||||
|
exit_method = _cb_type.__exit__
|
||||||
|
except AttributeError:
|
||||||
|
# Not a context manager, so assume its a callable
|
||||||
|
self._exit_callbacks.append(exit)
|
||||||
|
else:
|
||||||
|
self._push_cm_exit(exit, exit_method)
|
||||||
|
return exit # Allow use as a decorator
|
||||||
|
|
||||||
|
def callback(self, callback, *args, **kwds):
|
||||||
|
"""Registers an arbitrary callback and arguments.
|
||||||
|
|
||||||
|
Cannot suppress exceptions.
|
||||||
|
"""
|
||||||
|
def _exit_wrapper(exc_type, exc, tb):
|
||||||
|
callback(*args, **kwds)
|
||||||
|
# We changed the signature, so using @wraps is not appropriate, but
|
||||||
|
# setting __wrapped__ may still help with introspection
|
||||||
|
_exit_wrapper.__wrapped__ = callback
|
||||||
|
self.push(_exit_wrapper)
|
||||||
|
return callback # Allow use as a decorator
|
||||||
|
|
||||||
|
def enter_context(self, cm):
|
||||||
|
"""Enters the supplied context manager
|
||||||
|
|
||||||
|
If successful, also pushes its __exit__ method as a callback and
|
||||||
|
returns the result of the __enter__ method.
|
||||||
|
"""
|
||||||
|
# We look up the special methods on the type to match the with statement
|
||||||
|
_cm_type = type(cm)
|
||||||
|
_exit = _cm_type.__exit__
|
||||||
|
result = _cm_type.__enter__(cm)
|
||||||
|
self._push_cm_exit(cm, _exit)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Immediately unwind the context stack"""
|
||||||
|
self.__exit__(None, None, None)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *exc_details):
|
||||||
|
received_exc = exc_details[0] is not None
|
||||||
|
|
||||||
|
# We manipulate the exception state so it behaves as though
|
||||||
|
# we were actually nesting multiple with statements
|
||||||
|
frame_exc = sys.exc_info()[1]
|
||||||
|
def _fix_exception_context(new_exc, old_exc):
|
||||||
|
# Context may not be correct, so find the end of the chain
|
||||||
|
while 1:
|
||||||
|
exc_context = new_exc.__context__
|
||||||
|
if exc_context is old_exc:
|
||||||
|
# Context is already set correctly (see issue 20317)
|
||||||
|
return
|
||||||
|
if exc_context is None or exc_context is frame_exc:
|
||||||
|
break
|
||||||
|
new_exc = exc_context
|
||||||
|
# Change the end of the chain to point to the exception
|
||||||
|
# we expect it to reference
|
||||||
|
new_exc.__context__ = old_exc
|
||||||
|
|
||||||
|
# Callbacks are invoked in LIFO order to match the behaviour of
|
||||||
|
# nested context managers
|
||||||
|
suppressed_exc = False
|
||||||
|
pending_raise = False
|
||||||
|
while self._exit_callbacks:
|
||||||
|
cb = self._exit_callbacks.pop()
|
||||||
|
try:
|
||||||
|
if cb(*exc_details):
|
||||||
|
suppressed_exc = True
|
||||||
|
pending_raise = False
|
||||||
|
exc_details = (None, None, None)
|
||||||
|
except:
|
||||||
|
new_exc_details = sys.exc_info()
|
||||||
|
# simulate the stack of exceptions by setting the context
|
||||||
|
_fix_exception_context(new_exc_details[1], exc_details[1])
|
||||||
|
pending_raise = True
|
||||||
|
exc_details = new_exc_details
|
||||||
|
if pending_raise:
|
||||||
|
try:
|
||||||
|
# bare "raise exc_details[1]" replaces our carefully
|
||||||
|
# set-up context
|
||||||
|
fixed_ctx = exc_details[1].__context__
|
||||||
|
raise exc_details[1]
|
||||||
|
except BaseException:
|
||||||
|
exc_details[1].__context__ = fixed_ctx
|
||||||
|
raise
|
||||||
|
return received_exc and suppressed_exc
|
|
@ -0,0 +1,4 @@
|
||||||
|
srctype = cpython
|
||||||
|
type = package
|
||||||
|
version = 3.4.2-0
|
||||||
|
long_desc = Port of contextlib for micropython
|
|
@ -0,0 +1,18 @@
|
||||||
|
import sys
|
||||||
|
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
||||||
|
# module instead of system.
|
||||||
|
sys.path.pop(0)
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
|
||||||
|
setup(name='micropython-contextlib',
|
||||||
|
version='3.4.2-0',
|
||||||
|
description='Port of contextlib for micropython',
|
||||||
|
long_description='Port of contextlib for micropython',
|
||||||
|
author='MicroPython Developers',
|
||||||
|
author_email='micro-python@googlegroups.com',
|
||||||
|
maintainer='MicroPython Developers',
|
||||||
|
maintainer_email='micro-python@googlegroups.com',
|
||||||
|
license='MIT',
|
||||||
|
packages=['contextlib'],
|
||||||
|
install_requires=['micropython-unittest'])
|
|
@ -0,0 +1,828 @@
|
||||||
|
"""Unit tests for contextlib.py, and other context managers."""
|
||||||
|
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from contextlib import * # Tests __all__
|
||||||
|
from test import support
|
||||||
|
try:
|
||||||
|
import threading
|
||||||
|
except ImportError:
|
||||||
|
threading = None
|
||||||
|
|
||||||
|
|
||||||
|
class ContextManagerTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_contextmanager_plain(self):
|
||||||
|
state = []
|
||||||
|
@contextmanager
|
||||||
|
def woohoo():
|
||||||
|
state.append(1)
|
||||||
|
yield 42
|
||||||
|
state.append(999)
|
||||||
|
with woohoo() as x:
|
||||||
|
self.assertEqual(state, [1])
|
||||||
|
self.assertEqual(x, 42)
|
||||||
|
state.append(x)
|
||||||
|
self.assertEqual(state, [1, 42, 999])
|
||||||
|
|
||||||
|
def test_contextmanager_finally(self):
|
||||||
|
state = []
|
||||||
|
@contextmanager
|
||||||
|
def woohoo():
|
||||||
|
state.append(1)
|
||||||
|
try:
|
||||||
|
yield 42
|
||||||
|
finally:
|
||||||
|
state.append(999)
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with woohoo() as x:
|
||||||
|
self.assertEqual(state, [1])
|
||||||
|
self.assertEqual(x, 42)
|
||||||
|
state.append(x)
|
||||||
|
raise ZeroDivisionError()
|
||||||
|
self.assertEqual(state, [1, 42, 999])
|
||||||
|
|
||||||
|
def test_contextmanager_no_reraise(self):
|
||||||
|
@contextmanager
|
||||||
|
def whee():
|
||||||
|
yield
|
||||||
|
ctx = whee()
|
||||||
|
ctx.__enter__()
|
||||||
|
# Calling __exit__ should not result in an exception
|
||||||
|
self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
|
||||||
|
|
||||||
|
def test_contextmanager_trap_yield_after_throw(self):
|
||||||
|
@contextmanager
|
||||||
|
def whoo():
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except:
|
||||||
|
yield
|
||||||
|
ctx = whoo()
|
||||||
|
ctx.__enter__()
|
||||||
|
self.assertRaises(
|
||||||
|
RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_contextmanager_except(self):
|
||||||
|
state = []
|
||||||
|
@contextmanager
|
||||||
|
def woohoo():
|
||||||
|
state.append(1)
|
||||||
|
try:
|
||||||
|
yield 42
|
||||||
|
except ZeroDivisionError as e:
|
||||||
|
state.append(e.args[0])
|
||||||
|
self.assertEqual(state, [1, 42, 999])
|
||||||
|
with woohoo() as x:
|
||||||
|
self.assertEqual(state, [1])
|
||||||
|
self.assertEqual(x, 42)
|
||||||
|
state.append(x)
|
||||||
|
raise ZeroDivisionError(999)
|
||||||
|
self.assertEqual(state, [1, 42, 999])
|
||||||
|
|
||||||
|
def _create_contextmanager_attribs(self):
|
||||||
|
def attribs(**kw):
|
||||||
|
def decorate(func):
|
||||||
|
for k,v in kw.items():
|
||||||
|
setattr(func,k,v)
|
||||||
|
return func
|
||||||
|
return decorate
|
||||||
|
@contextmanager
|
||||||
|
@attribs(foo='bar')
|
||||||
|
def baz(spam):
|
||||||
|
"""Whee!"""
|
||||||
|
return baz
|
||||||
|
|
||||||
|
def test_contextmanager_attribs(self):
|
||||||
|
baz = self._create_contextmanager_attribs()
|
||||||
|
self.assertEqual(baz.__name__,'baz')
|
||||||
|
self.assertEqual(baz.foo, 'bar')
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_contextmanager_doc_attrib(self):
|
||||||
|
baz = self._create_contextmanager_attribs()
|
||||||
|
self.assertEqual(baz.__doc__, "Whee!")
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_instance_docstring_given_cm_docstring(self):
|
||||||
|
baz = self._create_contextmanager_attribs()(None)
|
||||||
|
self.assertEqual(baz.__doc__, "Whee!")
|
||||||
|
|
||||||
|
|
||||||
|
class ClosingTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_instance_docs(self):
|
||||||
|
# Issue 19330: ensure context manager instances have good docstrings
|
||||||
|
cm_docstring = closing.__doc__
|
||||||
|
obj = closing(None)
|
||||||
|
self.assertEqual(obj.__doc__, cm_docstring)
|
||||||
|
|
||||||
|
def test_closing(self):
|
||||||
|
state = []
|
||||||
|
class C:
|
||||||
|
def close(self):
|
||||||
|
state.append(1)
|
||||||
|
x = C()
|
||||||
|
self.assertEqual(state, [])
|
||||||
|
with closing(x) as y:
|
||||||
|
self.assertEqual(x, y)
|
||||||
|
self.assertEqual(state, [1])
|
||||||
|
|
||||||
|
def test_closing_error(self):
|
||||||
|
state = []
|
||||||
|
class C:
|
||||||
|
def close(self):
|
||||||
|
state.append(1)
|
||||||
|
x = C()
|
||||||
|
self.assertEqual(state, [])
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with closing(x) as y:
|
||||||
|
self.assertEqual(x, y)
|
||||||
|
1 / 0
|
||||||
|
self.assertEqual(state, [1])
|
||||||
|
|
||||||
|
class FileContextTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def testWithOpen(self):
|
||||||
|
tfn = tempfile.mktemp()
|
||||||
|
try:
|
||||||
|
f = None
|
||||||
|
with open(tfn, "w") as f:
|
||||||
|
self.assertFalse(f.closed)
|
||||||
|
f.write("Booh\n")
|
||||||
|
self.assertTrue(f.closed)
|
||||||
|
f = None
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with open(tfn, "r") as f:
|
||||||
|
self.assertFalse(f.closed)
|
||||||
|
self.assertEqual(f.read(), "Booh\n")
|
||||||
|
1 / 0
|
||||||
|
self.assertTrue(f.closed)
|
||||||
|
finally:
|
||||||
|
support.unlink(tfn)
|
||||||
|
|
||||||
|
@unittest.skipUnless(threading, 'Threading required for this test.')
|
||||||
|
class LockContextTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
def boilerPlate(self, lock, locked):
|
||||||
|
self.assertFalse(locked())
|
||||||
|
with lock:
|
||||||
|
self.assertTrue(locked())
|
||||||
|
self.assertFalse(locked())
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with lock:
|
||||||
|
self.assertTrue(locked())
|
||||||
|
1 / 0
|
||||||
|
self.assertFalse(locked())
|
||||||
|
|
||||||
|
def testWithLock(self):
|
||||||
|
lock = threading.Lock()
|
||||||
|
self.boilerPlate(lock, lock.locked)
|
||||||
|
|
||||||
|
def testWithRLock(self):
|
||||||
|
lock = threading.RLock()
|
||||||
|
self.boilerPlate(lock, lock._is_owned)
|
||||||
|
|
||||||
|
def testWithCondition(self):
|
||||||
|
lock = threading.Condition()
|
||||||
|
def locked():
|
||||||
|
return lock._is_owned()
|
||||||
|
self.boilerPlate(lock, locked)
|
||||||
|
|
||||||
|
def testWithSemaphore(self):
|
||||||
|
lock = threading.Semaphore()
|
||||||
|
def locked():
|
||||||
|
if lock.acquire(False):
|
||||||
|
lock.release()
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
self.boilerPlate(lock, locked)
|
||||||
|
|
||||||
|
def testWithBoundedSemaphore(self):
|
||||||
|
lock = threading.BoundedSemaphore()
|
||||||
|
def locked():
|
||||||
|
if lock.acquire(False):
|
||||||
|
lock.release()
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
self.boilerPlate(lock, locked)
|
||||||
|
|
||||||
|
|
||||||
|
class mycontext(ContextDecorator):
|
||||||
|
"""Example decoration-compatible context manager for testing"""
|
||||||
|
started = False
|
||||||
|
exc = None
|
||||||
|
catch = False
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.started = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *exc):
|
||||||
|
self.exc = exc
|
||||||
|
return self.catch
|
||||||
|
|
||||||
|
|
||||||
|
class TestContextDecorator(unittest.TestCase):
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_instance_docs(self):
|
||||||
|
# Issue 19330: ensure context manager instances have good docstrings
|
||||||
|
cm_docstring = mycontext.__doc__
|
||||||
|
obj = mycontext()
|
||||||
|
self.assertEqual(obj.__doc__, cm_docstring)
|
||||||
|
|
||||||
|
def test_contextdecorator(self):
|
||||||
|
context = mycontext()
|
||||||
|
with context as result:
|
||||||
|
self.assertIs(result, context)
|
||||||
|
self.assertTrue(context.started)
|
||||||
|
|
||||||
|
self.assertEqual(context.exc, (None, None, None))
|
||||||
|
|
||||||
|
|
||||||
|
def test_contextdecorator_with_exception(self):
|
||||||
|
context = mycontext()
|
||||||
|
|
||||||
|
with self.assertRaisesRegex(NameError, 'foo'):
|
||||||
|
with context:
|
||||||
|
raise NameError('foo')
|
||||||
|
self.assertIsNotNone(context.exc)
|
||||||
|
self.assertIs(context.exc[0], NameError)
|
||||||
|
|
||||||
|
context = mycontext()
|
||||||
|
context.catch = True
|
||||||
|
with context:
|
||||||
|
raise NameError('foo')
|
||||||
|
self.assertIsNotNone(context.exc)
|
||||||
|
self.assertIs(context.exc[0], NameError)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decorator(self):
|
||||||
|
context = mycontext()
|
||||||
|
|
||||||
|
@context
|
||||||
|
def test():
|
||||||
|
self.assertIsNone(context.exc)
|
||||||
|
self.assertTrue(context.started)
|
||||||
|
test()
|
||||||
|
self.assertEqual(context.exc, (None, None, None))
|
||||||
|
|
||||||
|
|
||||||
|
def test_decorator_with_exception(self):
|
||||||
|
context = mycontext()
|
||||||
|
|
||||||
|
@context
|
||||||
|
def test():
|
||||||
|
self.assertIsNone(context.exc)
|
||||||
|
self.assertTrue(context.started)
|
||||||
|
raise NameError('foo')
|
||||||
|
|
||||||
|
with self.assertRaisesRegex(NameError, 'foo'):
|
||||||
|
test()
|
||||||
|
self.assertIsNotNone(context.exc)
|
||||||
|
self.assertIs(context.exc[0], NameError)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decorating_method(self):
|
||||||
|
context = mycontext()
|
||||||
|
|
||||||
|
class Test(object):
|
||||||
|
|
||||||
|
@context
|
||||||
|
def method(self, a, b, c=None):
|
||||||
|
self.a = a
|
||||||
|
self.b = b
|
||||||
|
self.c = c
|
||||||
|
|
||||||
|
# these tests are for argument passing when used as a decorator
|
||||||
|
test = Test()
|
||||||
|
test.method(1, 2)
|
||||||
|
self.assertEqual(test.a, 1)
|
||||||
|
self.assertEqual(test.b, 2)
|
||||||
|
self.assertEqual(test.c, None)
|
||||||
|
|
||||||
|
test = Test()
|
||||||
|
test.method('a', 'b', 'c')
|
||||||
|
self.assertEqual(test.a, 'a')
|
||||||
|
self.assertEqual(test.b, 'b')
|
||||||
|
self.assertEqual(test.c, 'c')
|
||||||
|
|
||||||
|
test = Test()
|
||||||
|
test.method(a=1, b=2)
|
||||||
|
self.assertEqual(test.a, 1)
|
||||||
|
self.assertEqual(test.b, 2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_typo_enter(self):
|
||||||
|
class mycontext(ContextDecorator):
|
||||||
|
def __unter__(self):
|
||||||
|
pass
|
||||||
|
def __exit__(self, *exc):
|
||||||
|
pass
|
||||||
|
|
||||||
|
with self.assertRaises(AttributeError):
|
||||||
|
with mycontext():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_typo_exit(self):
|
||||||
|
class mycontext(ContextDecorator):
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
def __uxit__(self, *exc):
|
||||||
|
pass
|
||||||
|
|
||||||
|
with self.assertRaises(AttributeError):
|
||||||
|
with mycontext():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_contextdecorator_as_mixin(self):
|
||||||
|
class somecontext(object):
|
||||||
|
started = False
|
||||||
|
exc = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.started = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *exc):
|
||||||
|
self.exc = exc
|
||||||
|
|
||||||
|
class mycontext(somecontext, ContextDecorator):
|
||||||
|
pass
|
||||||
|
|
||||||
|
context = mycontext()
|
||||||
|
@context
|
||||||
|
def test():
|
||||||
|
self.assertIsNone(context.exc)
|
||||||
|
self.assertTrue(context.started)
|
||||||
|
test()
|
||||||
|
self.assertEqual(context.exc, (None, None, None))
|
||||||
|
|
||||||
|
|
||||||
|
def test_contextmanager_as_decorator(self):
|
||||||
|
@contextmanager
|
||||||
|
def woohoo(y):
|
||||||
|
state.append(y)
|
||||||
|
yield
|
||||||
|
state.append(999)
|
||||||
|
|
||||||
|
state = []
|
||||||
|
@woohoo(1)
|
||||||
|
def test(x):
|
||||||
|
self.assertEqual(state, [1])
|
||||||
|
state.append(x)
|
||||||
|
test('something')
|
||||||
|
self.assertEqual(state, [1, 'something', 999])
|
||||||
|
|
||||||
|
# Issue #11647: Ensure the decorated function is 'reusable'
|
||||||
|
state = []
|
||||||
|
test('something else')
|
||||||
|
self.assertEqual(state, [1, 'something else', 999])
|
||||||
|
|
||||||
|
|
||||||
|
class TestExitStack(unittest.TestCase):
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_instance_docs(self):
|
||||||
|
# Issue 19330: ensure context manager instances have good docstrings
|
||||||
|
cm_docstring = ExitStack.__doc__
|
||||||
|
obj = ExitStack()
|
||||||
|
self.assertEqual(obj.__doc__, cm_docstring)
|
||||||
|
|
||||||
|
def test_no_resources(self):
|
||||||
|
with ExitStack():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_callback(self):
|
||||||
|
expected = [
|
||||||
|
((), {}),
|
||||||
|
((1,), {}),
|
||||||
|
((1,2), {}),
|
||||||
|
((), dict(example=1)),
|
||||||
|
((1,), dict(example=1)),
|
||||||
|
((1,2), dict(example=1)),
|
||||||
|
]
|
||||||
|
result = []
|
||||||
|
def _exit(*args, **kwds):
|
||||||
|
"""Test metadata propagation"""
|
||||||
|
result.append((args, kwds))
|
||||||
|
with ExitStack() as stack:
|
||||||
|
for args, kwds in reversed(expected):
|
||||||
|
if args and kwds:
|
||||||
|
f = stack.callback(_exit, *args, **kwds)
|
||||||
|
elif args:
|
||||||
|
f = stack.callback(_exit, *args)
|
||||||
|
elif kwds:
|
||||||
|
f = stack.callback(_exit, **kwds)
|
||||||
|
else:
|
||||||
|
f = stack.callback(_exit)
|
||||||
|
self.assertIs(f, _exit)
|
||||||
|
for wrapper in stack._exit_callbacks:
|
||||||
|
self.assertIs(wrapper.__wrapped__, _exit)
|
||||||
|
self.assertNotEqual(wrapper.__name__, _exit.__name__)
|
||||||
|
self.assertIsNone(wrapper.__doc__, _exit.__doc__)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_push(self):
|
||||||
|
exc_raised = ZeroDivisionError
|
||||||
|
def _expect_exc(exc_type, exc, exc_tb):
|
||||||
|
self.assertIs(exc_type, exc_raised)
|
||||||
|
def _suppress_exc(*exc_details):
|
||||||
|
return True
|
||||||
|
def _expect_ok(exc_type, exc, exc_tb):
|
||||||
|
self.assertIsNone(exc_type)
|
||||||
|
self.assertIsNone(exc)
|
||||||
|
self.assertIsNone(exc_tb)
|
||||||
|
class ExitCM(object):
|
||||||
|
def __init__(self, check_exc):
|
||||||
|
self.check_exc = check_exc
|
||||||
|
def __enter__(self):
|
||||||
|
self.fail("Should not be called!")
|
||||||
|
def __exit__(self, *exc_details):
|
||||||
|
self.check_exc(*exc_details)
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.push(_expect_ok)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1], _expect_ok)
|
||||||
|
cm = ExitCM(_expect_ok)
|
||||||
|
stack.push(cm)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
|
||||||
|
stack.push(_suppress_exc)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
|
||||||
|
cm = ExitCM(_expect_exc)
|
||||||
|
stack.push(cm)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
|
||||||
|
stack.push(_expect_exc)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
|
||||||
|
stack.push(_expect_exc)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
|
||||||
|
1/0
|
||||||
|
|
||||||
|
def test_enter_context(self):
|
||||||
|
class TestCM(object):
|
||||||
|
def __enter__(self):
|
||||||
|
result.append(1)
|
||||||
|
def __exit__(self, *exc_details):
|
||||||
|
result.append(3)
|
||||||
|
|
||||||
|
result = []
|
||||||
|
cm = TestCM()
|
||||||
|
with ExitStack() as stack:
|
||||||
|
@stack.callback # Registered first => cleaned up last
|
||||||
|
def _exit():
|
||||||
|
result.append(4)
|
||||||
|
self.assertIsNotNone(_exit)
|
||||||
|
stack.enter_context(cm)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
|
||||||
|
result.append(2)
|
||||||
|
self.assertEqual(result, [1, 2, 3, 4])
|
||||||
|
|
||||||
|
def test_close(self):
|
||||||
|
result = []
|
||||||
|
with ExitStack() as stack:
|
||||||
|
@stack.callback
|
||||||
|
def _exit():
|
||||||
|
result.append(1)
|
||||||
|
self.assertIsNotNone(_exit)
|
||||||
|
stack.close()
|
||||||
|
result.append(2)
|
||||||
|
self.assertEqual(result, [1, 2])
|
||||||
|
|
||||||
|
def test_pop_all(self):
|
||||||
|
result = []
|
||||||
|
with ExitStack() as stack:
|
||||||
|
@stack.callback
|
||||||
|
def _exit():
|
||||||
|
result.append(3)
|
||||||
|
self.assertIsNotNone(_exit)
|
||||||
|
new_stack = stack.pop_all()
|
||||||
|
result.append(1)
|
||||||
|
result.append(2)
|
||||||
|
new_stack.close()
|
||||||
|
self.assertEqual(result, [1, 2, 3])
|
||||||
|
|
||||||
|
def test_exit_raise(self):
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.push(lambda *exc: False)
|
||||||
|
1/0
|
||||||
|
|
||||||
|
def test_exit_suppress(self):
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.push(lambda *exc: True)
|
||||||
|
1/0
|
||||||
|
|
||||||
|
def test_exit_exception_chaining_reference(self):
|
||||||
|
# Sanity check to make sure that ExitStack chaining matches
|
||||||
|
# actual nested with statements
|
||||||
|
class RaiseExc:
|
||||||
|
def __init__(self, exc):
|
||||||
|
self.exc = exc
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
def __exit__(self, *exc_details):
|
||||||
|
raise self.exc
|
||||||
|
|
||||||
|
class RaiseExcWithContext:
|
||||||
|
def __init__(self, outer, inner):
|
||||||
|
self.outer = outer
|
||||||
|
self.inner = inner
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
def __exit__(self, *exc_details):
|
||||||
|
try:
|
||||||
|
raise self.inner
|
||||||
|
except:
|
||||||
|
raise self.outer
|
||||||
|
|
||||||
|
class SuppressExc:
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
def __exit__(self, *exc_details):
|
||||||
|
type(self).saved_details = exc_details
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
with RaiseExc(IndexError):
|
||||||
|
with RaiseExcWithContext(KeyError, AttributeError):
|
||||||
|
with SuppressExc():
|
||||||
|
with RaiseExc(ValueError):
|
||||||
|
1 / 0
|
||||||
|
except IndexError as exc:
|
||||||
|
self.assertIsInstance(exc.__context__, KeyError)
|
||||||
|
self.assertIsInstance(exc.__context__.__context__, AttributeError)
|
||||||
|
# Inner exceptions were suppressed
|
||||||
|
self.assertIsNone(exc.__context__.__context__.__context__)
|
||||||
|
else:
|
||||||
|
self.fail("Expected IndexError, but no exception was raised")
|
||||||
|
# Check the inner exceptions
|
||||||
|
inner_exc = SuppressExc.saved_details[1]
|
||||||
|
self.assertIsInstance(inner_exc, ValueError)
|
||||||
|
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
|
||||||
|
|
||||||
|
def test_exit_exception_chaining(self):
|
||||||
|
# Ensure exception chaining matches the reference behaviour
|
||||||
|
def raise_exc(exc):
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
saved_details = None
|
||||||
|
def suppress_exc(*exc_details):
|
||||||
|
nonlocal saved_details
|
||||||
|
saved_details = exc_details
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.callback(raise_exc, IndexError)
|
||||||
|
stack.callback(raise_exc, KeyError)
|
||||||
|
stack.callback(raise_exc, AttributeError)
|
||||||
|
stack.push(suppress_exc)
|
||||||
|
stack.callback(raise_exc, ValueError)
|
||||||
|
1 / 0
|
||||||
|
except IndexError as exc:
|
||||||
|
self.assertIsInstance(exc.__context__, KeyError)
|
||||||
|
self.assertIsInstance(exc.__context__.__context__, AttributeError)
|
||||||
|
# Inner exceptions were suppressed
|
||||||
|
self.assertIsNone(exc.__context__.__context__.__context__)
|
||||||
|
else:
|
||||||
|
self.fail("Expected IndexError, but no exception was raised")
|
||||||
|
# Check the inner exceptions
|
||||||
|
inner_exc = saved_details[1]
|
||||||
|
self.assertIsInstance(inner_exc, ValueError)
|
||||||
|
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
|
||||||
|
|
||||||
|
def test_exit_exception_non_suppressing(self):
|
||||||
|
# http://bugs.python.org/issue19092
|
||||||
|
def raise_exc(exc):
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
def suppress_exc(*exc_details):
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.callback(lambda: None)
|
||||||
|
stack.callback(raise_exc, IndexError)
|
||||||
|
except Exception as exc:
|
||||||
|
self.assertIsInstance(exc, IndexError)
|
||||||
|
else:
|
||||||
|
self.fail("Expected IndexError, but no exception was raised")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.callback(raise_exc, KeyError)
|
||||||
|
stack.push(suppress_exc)
|
||||||
|
stack.callback(raise_exc, IndexError)
|
||||||
|
except Exception as exc:
|
||||||
|
self.assertIsInstance(exc, KeyError)
|
||||||
|
else:
|
||||||
|
self.fail("Expected KeyError, but no exception was raised")
|
||||||
|
|
||||||
|
def test_exit_exception_with_correct_context(self):
|
||||||
|
# http://bugs.python.org/issue20317
|
||||||
|
@contextmanager
|
||||||
|
def gets_the_context_right(exc):
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
exc1 = Exception(1)
|
||||||
|
exc2 = Exception(2)
|
||||||
|
exc3 = Exception(3)
|
||||||
|
exc4 = Exception(4)
|
||||||
|
|
||||||
|
# The contextmanager already fixes the context, so prior to the
|
||||||
|
# fix, ExitStack would try to fix it *again* and get into an
|
||||||
|
# infinite self-referential loop
|
||||||
|
try:
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.enter_context(gets_the_context_right(exc4))
|
||||||
|
stack.enter_context(gets_the_context_right(exc3))
|
||||||
|
stack.enter_context(gets_the_context_right(exc2))
|
||||||
|
raise exc1
|
||||||
|
except Exception as exc:
|
||||||
|
self.assertIs(exc, exc4)
|
||||||
|
self.assertIs(exc.__context__, exc3)
|
||||||
|
self.assertIs(exc.__context__.__context__, exc2)
|
||||||
|
self.assertIs(exc.__context__.__context__.__context__, exc1)
|
||||||
|
self.assertIsNone(
|
||||||
|
exc.__context__.__context__.__context__.__context__)
|
||||||
|
|
||||||
|
def test_exit_exception_with_existing_context(self):
|
||||||
|
# Addresses a lack of test coverage discovered after checking in a
|
||||||
|
# fix for issue 20317 that still contained debugging code.
|
||||||
|
def raise_nested(inner_exc, outer_exc):
|
||||||
|
try:
|
||||||
|
raise inner_exc
|
||||||
|
finally:
|
||||||
|
raise outer_exc
|
||||||
|
exc1 = Exception(1)
|
||||||
|
exc2 = Exception(2)
|
||||||
|
exc3 = Exception(3)
|
||||||
|
exc4 = Exception(4)
|
||||||
|
exc5 = Exception(5)
|
||||||
|
try:
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.callback(raise_nested, exc4, exc5)
|
||||||
|
stack.callback(raise_nested, exc2, exc3)
|
||||||
|
raise exc1
|
||||||
|
except Exception as exc:
|
||||||
|
self.assertIs(exc, exc5)
|
||||||
|
self.assertIs(exc.__context__, exc4)
|
||||||
|
self.assertIs(exc.__context__.__context__, exc3)
|
||||||
|
self.assertIs(exc.__context__.__context__.__context__, exc2)
|
||||||
|
self.assertIs(
|
||||||
|
exc.__context__.__context__.__context__.__context__, exc1)
|
||||||
|
self.assertIsNone(
|
||||||
|
exc.__context__.__context__.__context__.__context__.__context__)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def test_body_exception_suppress(self):
|
||||||
|
def suppress_exc(*exc_details):
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.push(suppress_exc)
|
||||||
|
1/0
|
||||||
|
except IndexError as exc:
|
||||||
|
self.fail("Expected no exception, got IndexError")
|
||||||
|
|
||||||
|
def test_exit_exception_chaining_suppress(self):
|
||||||
|
with ExitStack() as stack:
|
||||||
|
stack.push(lambda *exc: True)
|
||||||
|
stack.push(lambda *exc: 1/0)
|
||||||
|
stack.push(lambda *exc: {}[1])
|
||||||
|
|
||||||
|
def test_excessive_nesting(self):
|
||||||
|
# The original implementation would die with RecursionError here
|
||||||
|
with ExitStack() as stack:
|
||||||
|
for i in range(10000):
|
||||||
|
stack.callback(int)
|
||||||
|
|
||||||
|
def test_instance_bypass(self):
|
||||||
|
class Example(object): pass
|
||||||
|
cm = Example()
|
||||||
|
cm.__exit__ = object()
|
||||||
|
stack = ExitStack()
|
||||||
|
self.assertRaises(AttributeError, stack.enter_context, cm)
|
||||||
|
stack.push(cm)
|
||||||
|
self.assertIs(stack._exit_callbacks[-1], cm)
|
||||||
|
|
||||||
|
class TestRedirectStdout(unittest.TestCase):
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_instance_docs(self):
|
||||||
|
# Issue 19330: ensure context manager instances have good docstrings
|
||||||
|
cm_docstring = redirect_stdout.__doc__
|
||||||
|
obj = redirect_stdout(None)
|
||||||
|
self.assertEqual(obj.__doc__, cm_docstring)
|
||||||
|
|
||||||
|
def test_no_redirect_in_init(self):
|
||||||
|
orig_stdout = sys.stdout
|
||||||
|
redirect_stdout(None)
|
||||||
|
self.assertIs(sys.stdout, orig_stdout)
|
||||||
|
|
||||||
|
def test_redirect_to_string_io(self):
|
||||||
|
f = io.StringIO()
|
||||||
|
msg = "Consider an API like help(), which prints directly to stdout"
|
||||||
|
orig_stdout = sys.stdout
|
||||||
|
with redirect_stdout(f):
|
||||||
|
print(msg)
|
||||||
|
self.assertIs(sys.stdout, orig_stdout)
|
||||||
|
s = f.getvalue().strip()
|
||||||
|
self.assertEqual(s, msg)
|
||||||
|
|
||||||
|
def test_enter_result_is_target(self):
|
||||||
|
f = io.StringIO()
|
||||||
|
with redirect_stdout(f) as enter_result:
|
||||||
|
self.assertIs(enter_result, f)
|
||||||
|
|
||||||
|
def test_cm_is_reusable(self):
|
||||||
|
f = io.StringIO()
|
||||||
|
write_to_f = redirect_stdout(f)
|
||||||
|
orig_stdout = sys.stdout
|
||||||
|
with write_to_f:
|
||||||
|
print("Hello", end=" ")
|
||||||
|
with write_to_f:
|
||||||
|
print("World!")
|
||||||
|
self.assertIs(sys.stdout, orig_stdout)
|
||||||
|
s = f.getvalue()
|
||||||
|
self.assertEqual(s, "Hello World!\n")
|
||||||
|
|
||||||
|
def test_cm_is_reentrant(self):
|
||||||
|
f = io.StringIO()
|
||||||
|
write_to_f = redirect_stdout(f)
|
||||||
|
orig_stdout = sys.stdout
|
||||||
|
with write_to_f:
|
||||||
|
print("Hello", end=" ")
|
||||||
|
with write_to_f:
|
||||||
|
print("World!")
|
||||||
|
self.assertIs(sys.stdout, orig_stdout)
|
||||||
|
s = f.getvalue()
|
||||||
|
self.assertEqual(s, "Hello World!\n")
|
||||||
|
|
||||||
|
|
||||||
|
class TestSuppress(unittest.TestCase):
|
||||||
|
|
||||||
|
@support.requires_docstrings
|
||||||
|
def test_instance_docs(self):
|
||||||
|
# Issue 19330: ensure context manager instances have good docstrings
|
||||||
|
cm_docstring = suppress.__doc__
|
||||||
|
obj = suppress()
|
||||||
|
self.assertEqual(obj.__doc__, cm_docstring)
|
||||||
|
|
||||||
|
def test_no_result_from_enter(self):
|
||||||
|
with suppress(ValueError) as enter_result:
|
||||||
|
self.assertIsNone(enter_result)
|
||||||
|
|
||||||
|
def test_no_exception(self):
|
||||||
|
with suppress(ValueError):
|
||||||
|
self.assertEqual(pow(2, 5), 32)
|
||||||
|
|
||||||
|
def test_exact_exception(self):
|
||||||
|
with suppress(TypeError):
|
||||||
|
len(5)
|
||||||
|
|
||||||
|
def test_exception_hierarchy(self):
|
||||||
|
with suppress(LookupError):
|
||||||
|
'Hello'[50]
|
||||||
|
|
||||||
|
def test_other_exception(self):
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with suppress(TypeError):
|
||||||
|
1/0
|
||||||
|
|
||||||
|
def test_no_args(self):
|
||||||
|
with self.assertRaises(ZeroDivisionError):
|
||||||
|
with suppress():
|
||||||
|
1/0
|
||||||
|
|
||||||
|
def test_multiple_exception_args(self):
|
||||||
|
with suppress(ZeroDivisionError, TypeError):
|
||||||
|
1/0
|
||||||
|
with suppress(ZeroDivisionError, TypeError):
|
||||||
|
len(5)
|
||||||
|
|
||||||
|
def test_cm_is_reentrant(self):
|
||||||
|
ignore_exceptions = suppress(Exception)
|
||||||
|
with ignore_exceptions:
|
||||||
|
pass
|
||||||
|
with ignore_exceptions:
|
||||||
|
len(5)
|
||||||
|
with ignore_exceptions:
|
||||||
|
1/0
|
||||||
|
with ignore_exceptions: # Check nested usage
|
||||||
|
len(5)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Ładowanie…
Reference in New Issue