contextlib: Make compatible with micropython

pull/20/merge
nvbn 2014-12-24 21:28:23 +03:00 zatwierdzone przez Paul Sokolovsky
rodzic 5557382b5e
commit 92ef77c3d2
4 zmienionych plików z 67 dodań i 1002 usunięć

Wyświetl plik

@ -1,12 +1,12 @@
"""Utilities for with-statement contexts. See PEP 343."""
"""Utilities for with-statement contexts. See PEP 343.
import sys
from collections import deque
from functools import wraps
Original source code: https://hg.python.org/cpython/file/3.4/Lib/contextlib.py
__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack",
"redirect_stdout", "suppress"]
Not implemented:
- redirect_stdout;
- ExitStack.
"""
class ContextDecorator(object):
"A base class or mixin that enables context managers to work as decorators."
@ -24,7 +24,6 @@ class ContextDecorator(object):
return self
def __call__(self, func):
@wraps(func)
def inner(*args, **kwds):
with self._recreate_cm():
return func(*args, **kwds)
@ -37,16 +36,6 @@ class _GeneratorContextManager(ContextDecorator):
def __init__(self, func, *args, **kwds):
self.gen = func(*args, **kwds)
self.func, self.args, self.kwds = func, args, kwds
# Issue 19330: ensure context manager instances have good docstrings
doc = getattr(func, "__doc__", None)
if doc is None:
doc = type(self).__doc__
self.__doc__ = doc
# Unfortunately, this still doesn't provide good help output when
# inspecting the created context manager instances, since pydoc
# currently bypasses the instance docstring and shows the docstring
# for the class instead.
# See http://bugs.python.org/issue19404 for more details.
def _recreate_cm(self):
# _GCM instances are one-shot context managers, so the
@ -81,16 +70,6 @@ class _GeneratorContextManager(ContextDecorator):
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed
return exc is not value
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
if sys.exc_info()[1] is not value:
raise
def contextmanager(func):
@ -121,7 +100,6 @@ def contextmanager(func):
<cleanup>
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, *args, **kwds)
return helper
@ -151,32 +129,6 @@ class closing(object):
def __exit__(self, *exc_info):
self.thing.close()
class redirect_stdout:
"""Context manager for temporarily redirecting stdout to another file
# How to send help() to stderr
with redirect_stdout(sys.stderr):
help(dir)
# How to write help() to a file
with open('help.txt', 'w') as f:
with redirect_stdout(f):
help(pow)
"""
def __init__(self, new_target):
self._new_target = new_target
# We use a list of old targets to make this CM re-entrant
self._old_targets = []
def __enter__(self):
self._old_targets.append(sys.stdout)
sys.stdout = self._new_target
return self._new_target
def __exit__(self, exctype, excinst, exctb):
sys.stdout = self._old_targets.pop()
class suppress:
"""Context manager to suppress specified exceptions
@ -206,135 +158,3 @@ class suppress:
#
# See http://bugs.python.org/issue12029 for more details
return exctype is not None and issubclass(exctype, self._exceptions)
# Inspired by discussions on http://bugs.python.org/issue13585
class ExitStack(object):
"""Context manager for dynamic management of a stack of exit callbacks
For example:
with ExitStack() as stack:
files = [stack.enter_context(open(fname)) for fname in filenames]
# All opened files will automatically be closed at the end of
# the with statement, even if attempts to open files later
# in the list raise an exception
"""
def __init__(self):
self._exit_callbacks = deque()
def pop_all(self):
"""Preserve the context stack by transferring it to a new instance"""
new_stack = type(self)()
new_stack._exit_callbacks = self._exit_callbacks
self._exit_callbacks = deque()
return new_stack
def _push_cm_exit(self, cm, cm_exit):
"""Helper to correctly register callbacks to __exit__ methods"""
def _exit_wrapper(*exc_details):
return cm_exit(cm, *exc_details)
_exit_wrapper.__self__ = cm
self.push(_exit_wrapper)
def push(self, exit):
"""Registers a callback with the standard __exit__ method signature
Can suppress exceptions the same way __exit__ methods can.
Also accepts any object with an __exit__ method (registering a call
to the method instead of the object itself)
"""
# We use an unbound method rather than a bound method to follow
# the standard lookup behaviour for special methods
_cb_type = type(exit)
try:
exit_method = _cb_type.__exit__
except AttributeError:
# Not a context manager, so assume its a callable
self._exit_callbacks.append(exit)
else:
self._push_cm_exit(exit, exit_method)
return exit # Allow use as a decorator
def callback(self, callback, *args, **kwds):
"""Registers an arbitrary callback and arguments.
Cannot suppress exceptions.
"""
def _exit_wrapper(exc_type, exc, tb):
callback(*args, **kwds)
# We changed the signature, so using @wraps is not appropriate, but
# setting __wrapped__ may still help with introspection
_exit_wrapper.__wrapped__ = callback
self.push(_exit_wrapper)
return callback # Allow use as a decorator
def enter_context(self, cm):
"""Enters the supplied context manager
If successful, also pushes its __exit__ method as a callback and
returns the result of the __enter__ method.
"""
# We look up the special methods on the type to match the with statement
_cm_type = type(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
self._push_cm_exit(cm, _exit)
return result
def close(self):
"""Immediately unwind the context stack"""
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, *exc_details):
received_exc = exc_details[0] is not None
# We manipulate the exception state so it behaves as though
# we were actually nesting multiple with statements
frame_exc = sys.exc_info()[1]
def _fix_exception_context(new_exc, old_exc):
# Context may not be correct, so find the end of the chain
while 1:
exc_context = new_exc.__context__
if exc_context is old_exc:
# Context is already set correctly (see issue 20317)
return
if exc_context is None or exc_context is frame_exc:
break
new_exc = exc_context
# Change the end of the chain to point to the exception
# we expect it to reference
new_exc.__context__ = old_exc
# Callbacks are invoked in LIFO order to match the behaviour of
# nested context managers
suppressed_exc = False
pending_raise = False
while self._exit_callbacks:
cb = self._exit_callbacks.pop()
try:
if cb(*exc_details):
suppressed_exc = True
pending_raise = False
exc_details = (None, None, None)
except:
new_exc_details = sys.exc_info()
# simulate the stack of exceptions by setting the context
_fix_exception_context(new_exc_details[1], exc_details[1])
pending_raise = True
exc_details = new_exc_details
if pending_raise:
try:
# bare "raise exc_details[1]" replaces our carefully
# set-up context
fixed_ctx = exc_details[1].__context__
raise exc_details[1]
except BaseException:
exc_details[1].__context__ = fixed_ctx
raise
return received_exc and suppressed_exc

Wyświetl plik

@ -1,4 +1,4 @@
srctype = cpython
type = package
version = 3.4.2-0
type = module
version = 3.4.2-1
long_desc = Port of contextlib for micropython

Wyświetl plik

@ -6,13 +6,13 @@ from setuptools import setup
setup(name='micropython-contextlib',
version='3.4.2-0',
description='Port of contextlib for micropython',
long_description='Port of contextlib for micropython',
author='MicroPython Developers',
author_email='micro-python@googlegroups.com',
version='3.4.2-1',
description='CPython contextlib module ported to MicroPython',
long_description='This is a module ported from CPython standard library to be compatible with\nMicroPython interpreter. Usually, this means applying small patches for\nfeatures not supported (yet, or at all) in MicroPython. Sometimes, heavier\nchanges are required. Note that CPython modules are written with availability\nof vast resources in mind, and may not work for MicroPython ports with\nlimited heap. If you are affected by such a case, please help reimplement\nthe module from scratch.',
url='https://github.com/micropython/micropython/issues/405',
author='CPython Developers',
author_email='python-dev@python.org',
maintainer='MicroPython Developers',
maintainer_email='micro-python@googlegroups.com',
license='MIT',
packages=['contextlib'],
install_requires=['micropython-unittest'])
license='Python',
py_modules=['contextlib'])

Wyświetl plik

@ -1,828 +1,73 @@
"""Unit tests for contextlib.py, and other context managers."""
import io
import sys
import tempfile
import unittest
from contextlib import * # Tests __all__
from test import support
try:
import threading
except ImportError:
threading = None
from unittest import TestCase, run_class
from contextlib import contextmanager, closing, suppress
class ContextManagerTestCase(unittest.TestCase):
class ContextManagerTestCase(TestCase):
def setUp(self):
self._history = []
def test_contextmanager_plain(self):
state = []
@contextmanager
def woohoo():
state.append(1)
yield 42
state.append(999)
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_finally(self):
state = []
@contextmanager
def woohoo():
state.append(1)
def manager(x):
self._history.append('start')
try:
yield 42
yield x
finally:
state.append(999)
with self.assertRaises(ZeroDivisionError):
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])
self._history.append('finish')
def test_contextmanager_no_reraise(self):
@contextmanager
def whee():
yield
ctx = whee()
ctx.__enter__()
# Calling __exit__ should not result in an exception
self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
self._manager = manager
def test_contextmanager_trap_yield_after_throw(self):
@contextmanager
def whoo():
def test_context_manager(self):
with self._manager(123) as x:
self.assertEqual(x, 123)
self.assertEqual(self._history, ['start', 'finish'])
def test_context_manager_on_error(self):
exc = Exception()
try:
yield
except:
yield
ctx = whoo()
ctx.__enter__()
self.assertRaises(
RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
)
def test_contextmanager_except(self):
state = []
@contextmanager
def woohoo():
state.append(1)
try:
yield 42
except ZeroDivisionError as e:
state.append(e.args[0])
self.assertEqual(state, [1, 42, 999])
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])
def _create_contextmanager_attribs(self):
def attribs(**kw):
def decorate(func):
for k,v in kw.items():
setattr(func,k,v)
return func
return decorate
@contextmanager
@attribs(foo='bar')
def baz(spam):
"""Whee!"""
return baz
def test_contextmanager_attribs(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__name__,'baz')
self.assertEqual(baz.foo, 'bar')
@support.requires_docstrings
def test_contextmanager_doc_attrib(self):
baz = self._create_contextmanager_attribs()
self.assertEqual(baz.__doc__, "Whee!")
@support.requires_docstrings
def test_instance_docstring_given_cm_docstring(self):
baz = self._create_contextmanager_attribs()(None)
self.assertEqual(baz.__doc__, "Whee!")
with self._manager(123) as x:
raise exc
except Exception as e:
self.assertEqual(exc, e)
self.assertEqual(self._history, ['start', 'finish'])
class ClosingTestCase(unittest.TestCase):
class ClosingTestCase(TestCase):
@support.requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = closing.__doc__
obj = closing(None)
self.assertEqual(obj.__doc__, cm_docstring)
class Closable:
def __init__(self):
self.closed = False
def close(self):
self.closed = True
def test_closing(self):
state = []
class C:
def close(self):
state.append(1)
x = C()
self.assertEqual(state, [])
with closing(x) as y:
self.assertEqual(x, y)
self.assertEqual(state, [1])
closable = self.Closable()
with closing(closable) as c:
self.assertFalse(c.closed)
self.assertTrue(closable.closed)
def test_closing_error(self):
state = []
class C:
def close(self):
state.append(1)
x = C()
self.assertEqual(state, [])
with self.assertRaises(ZeroDivisionError):
with closing(x) as y:
self.assertEqual(x, y)
1 / 0
self.assertEqual(state, [1])
class FileContextTestCase(unittest.TestCase):
def testWithOpen(self):
tfn = tempfile.mktemp()
def test_closing_after_error(self):
closable = self.Closable()
exc = Exception()
try:
f = None
with open(tfn, "w") as f:
self.assertFalse(f.closed)
f.write("Booh\n")
self.assertTrue(f.closed)
f = None
with self.assertRaises(ZeroDivisionError):
with open(tfn, "r") as f:
self.assertFalse(f.closed)
self.assertEqual(f.read(), "Booh\n")
1 / 0
self.assertTrue(f.closed)
finally:
support.unlink(tfn)
@unittest.skipUnless(threading, 'Threading required for this test.')
class LockContextTestCase(unittest.TestCase):
def boilerPlate(self, lock, locked):
self.assertFalse(locked())
with lock:
self.assertTrue(locked())
self.assertFalse(locked())
with self.assertRaises(ZeroDivisionError):
with lock:
self.assertTrue(locked())
1 / 0
self.assertFalse(locked())
def testWithLock(self):
lock = threading.Lock()
self.boilerPlate(lock, lock.locked)
def testWithRLock(self):
lock = threading.RLock()
self.boilerPlate(lock, lock._is_owned)
def testWithCondition(self):
lock = threading.Condition()
def locked():
return lock._is_owned()
self.boilerPlate(lock, locked)
def testWithSemaphore(self):
lock = threading.Semaphore()
def locked():
if lock.acquire(False):
lock.release()
return False
else:
return True
self.boilerPlate(lock, locked)
def testWithBoundedSemaphore(self):
lock = threading.BoundedSemaphore()
def locked():
if lock.acquire(False):
lock.release()
return False
else:
return True
self.boilerPlate(lock, locked)
class mycontext(ContextDecorator):
"""Example decoration-compatible context manager for testing"""
started = False
exc = None
catch = False
def __enter__(self):
self.started = True
return self
def __exit__(self, *exc):
self.exc = exc
return self.catch
class TestContextDecorator(unittest.TestCase):
@support.requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = mycontext.__doc__
obj = mycontext()
self.assertEqual(obj.__doc__, cm_docstring)
def test_contextdecorator(self):
context = mycontext()
with context as result:
self.assertIs(result, context)
self.assertTrue(context.started)
self.assertEqual(context.exc, (None, None, None))
def test_contextdecorator_with_exception(self):
context = mycontext()
with self.assertRaisesRegex(NameError, 'foo'):
with context:
raise NameError('foo')
self.assertIsNotNone(context.exc)
self.assertIs(context.exc[0], NameError)
context = mycontext()
context.catch = True
with context:
raise NameError('foo')
self.assertIsNotNone(context.exc)
self.assertIs(context.exc[0], NameError)
def test_decorator(self):
context = mycontext()
@context
def test():
self.assertIsNone(context.exc)
self.assertTrue(context.started)
test()
self.assertEqual(context.exc, (None, None, None))
def test_decorator_with_exception(self):
context = mycontext()
@context
def test():
self.assertIsNone(context.exc)
self.assertTrue(context.started)
raise NameError('foo')
with self.assertRaisesRegex(NameError, 'foo'):
test()
self.assertIsNotNone(context.exc)
self.assertIs(context.exc[0], NameError)
def test_decorating_method(self):
context = mycontext()
class Test(object):
@context
def method(self, a, b, c=None):
self.a = a
self.b = b
self.c = c
# these tests are for argument passing when used as a decorator
test = Test()
test.method(1, 2)
self.assertEqual(test.a, 1)
self.assertEqual(test.b, 2)
self.assertEqual(test.c, None)
test = Test()
test.method('a', 'b', 'c')
self.assertEqual(test.a, 'a')
self.assertEqual(test.b, 'b')
self.assertEqual(test.c, 'c')
test = Test()
test.method(a=1, b=2)
self.assertEqual(test.a, 1)
self.assertEqual(test.b, 2)
def test_typo_enter(self):
class mycontext(ContextDecorator):
def __unter__(self):
pass
def __exit__(self, *exc):
pass
with self.assertRaises(AttributeError):
with mycontext():
pass
def test_typo_exit(self):
class mycontext(ContextDecorator):
def __enter__(self):
pass
def __uxit__(self, *exc):
pass
with self.assertRaises(AttributeError):
with mycontext():
pass
def test_contextdecorator_as_mixin(self):
class somecontext(object):
started = False
exc = None
def __enter__(self):
self.started = True
return self
def __exit__(self, *exc):
self.exc = exc
class mycontext(somecontext, ContextDecorator):
pass
context = mycontext()
@context
def test():
self.assertIsNone(context.exc)
self.assertTrue(context.started)
test()
self.assertEqual(context.exc, (None, None, None))
def test_contextmanager_as_decorator(self):
@contextmanager
def woohoo(y):
state.append(y)
yield
state.append(999)
state = []
@woohoo(1)
def test(x):
self.assertEqual(state, [1])
state.append(x)
test('something')
self.assertEqual(state, [1, 'something', 999])
# Issue #11647: Ensure the decorated function is 'reusable'
state = []
test('something else')
self.assertEqual(state, [1, 'something else', 999])
class TestExitStack(unittest.TestCase):
@support.requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = ExitStack.__doc__
obj = ExitStack()
self.assertEqual(obj.__doc__, cm_docstring)
def test_no_resources(self):
with ExitStack():
pass
def test_callback(self):
expected = [
((), {}),
((1,), {}),
((1,2), {}),
((), dict(example=1)),
((1,), dict(example=1)),
((1,2), dict(example=1)),
]
result = []
def _exit(*args, **kwds):
"""Test metadata propagation"""
result.append((args, kwds))
with ExitStack() as stack:
for args, kwds in reversed(expected):
if args and kwds:
f = stack.callback(_exit, *args, **kwds)
elif args:
f = stack.callback(_exit, *args)
elif kwds:
f = stack.callback(_exit, **kwds)
else:
f = stack.callback(_exit)
self.assertIs(f, _exit)
for wrapper in stack._exit_callbacks:
self.assertIs(wrapper.__wrapped__, _exit)
self.assertNotEqual(wrapper.__name__, _exit.__name__)
self.assertIsNone(wrapper.__doc__, _exit.__doc__)
self.assertEqual(result, expected)
def test_push(self):
exc_raised = ZeroDivisionError
def _expect_exc(exc_type, exc, exc_tb):
self.assertIs(exc_type, exc_raised)
def _suppress_exc(*exc_details):
return True
def _expect_ok(exc_type, exc, exc_tb):
self.assertIsNone(exc_type)
self.assertIsNone(exc)
self.assertIsNone(exc_tb)
class ExitCM(object):
def __init__(self, check_exc):
self.check_exc = check_exc
def __enter__(self):
self.fail("Should not be called!")
def __exit__(self, *exc_details):
self.check_exc(*exc_details)
with ExitStack() as stack:
stack.push(_expect_ok)
self.assertIs(stack._exit_callbacks[-1], _expect_ok)
cm = ExitCM(_expect_ok)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
stack.push(_suppress_exc)
self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
cm = ExitCM(_expect_exc)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
stack.push(_expect_exc)
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
stack.push(_expect_exc)
self.assertIs(stack._exit_callbacks[-1], _expect_exc)
1/0
def test_enter_context(self):
class TestCM(object):
def __enter__(self):
result.append(1)
def __exit__(self, *exc_details):
result.append(3)
result = []
cm = TestCM()
with ExitStack() as stack:
@stack.callback # Registered first => cleaned up last
def _exit():
result.append(4)
self.assertIsNotNone(_exit)
stack.enter_context(cm)
self.assertIs(stack._exit_callbacks[-1].__self__, cm)
result.append(2)
self.assertEqual(result, [1, 2, 3, 4])
def test_close(self):
result = []
with ExitStack() as stack:
@stack.callback
def _exit():
result.append(1)
self.assertIsNotNone(_exit)
stack.close()
result.append(2)
self.assertEqual(result, [1, 2])
def test_pop_all(self):
result = []
with ExitStack() as stack:
@stack.callback
def _exit():
result.append(3)
self.assertIsNotNone(_exit)
new_stack = stack.pop_all()
result.append(1)
result.append(2)
new_stack.close()
self.assertEqual(result, [1, 2, 3])
def test_exit_raise(self):
with self.assertRaises(ZeroDivisionError):
with ExitStack() as stack:
stack.push(lambda *exc: False)
1/0
def test_exit_suppress(self):
with ExitStack() as stack:
stack.push(lambda *exc: True)
1/0
def test_exit_exception_chaining_reference(self):
# Sanity check to make sure that ExitStack chaining matches
# actual nested with statements
class RaiseExc:
def __init__(self, exc):
self.exc = exc
def __enter__(self):
return self
def __exit__(self, *exc_details):
raise self.exc
class RaiseExcWithContext:
def __init__(self, outer, inner):
self.outer = outer
self.inner = inner
def __enter__(self):
return self
def __exit__(self, *exc_details):
try:
raise self.inner
except:
raise self.outer
class SuppressExc:
def __enter__(self):
return self
def __exit__(self, *exc_details):
type(self).saved_details = exc_details
return True
try:
with RaiseExc(IndexError):
with RaiseExcWithContext(KeyError, AttributeError):
with SuppressExc():
with RaiseExc(ValueError):
1 / 0
except IndexError as exc:
self.assertIsInstance(exc.__context__, KeyError)
self.assertIsInstance(exc.__context__.__context__, AttributeError)
# Inner exceptions were suppressed
self.assertIsNone(exc.__context__.__context__.__context__)
else:
self.fail("Expected IndexError, but no exception was raised")
# Check the inner exceptions
inner_exc = SuppressExc.saved_details[1]
self.assertIsInstance(inner_exc, ValueError)
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
def test_exit_exception_chaining(self):
# Ensure exception chaining matches the reference behaviour
def raise_exc(exc):
with closing(closable) as c:
raise exc
saved_details = None
def suppress_exc(*exc_details):
nonlocal saved_details
saved_details = exc_details
return True
try:
with ExitStack() as stack:
stack.callback(raise_exc, IndexError)
stack.callback(raise_exc, KeyError)
stack.callback(raise_exc, AttributeError)
stack.push(suppress_exc)
stack.callback(raise_exc, ValueError)
1 / 0
except IndexError as exc:
self.assertIsInstance(exc.__context__, KeyError)
self.assertIsInstance(exc.__context__.__context__, AttributeError)
# Inner exceptions were suppressed
self.assertIsNone(exc.__context__.__context__.__context__)
else:
self.fail("Expected IndexError, but no exception was raised")
# Check the inner exceptions
inner_exc = saved_details[1]
self.assertIsInstance(inner_exc, ValueError)
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
def test_exit_exception_non_suppressing(self):
# http://bugs.python.org/issue19092
def raise_exc(exc):
raise exc
def suppress_exc(*exc_details):
return True
try:
with ExitStack() as stack:
stack.callback(lambda: None)
stack.callback(raise_exc, IndexError)
except Exception as exc:
self.assertIsInstance(exc, IndexError)
else:
self.fail("Expected IndexError, but no exception was raised")
try:
with ExitStack() as stack:
stack.callback(raise_exc, KeyError)
stack.push(suppress_exc)
stack.callback(raise_exc, IndexError)
except Exception as exc:
self.assertIsInstance(exc, KeyError)
else:
self.fail("Expected KeyError, but no exception was raised")
def test_exit_exception_with_correct_context(self):
# http://bugs.python.org/issue20317
@contextmanager
def gets_the_context_right(exc):
try:
yield
finally:
raise exc
exc1 = Exception(1)
exc2 = Exception(2)
exc3 = Exception(3)
exc4 = Exception(4)
# The contextmanager already fixes the context, so prior to the
# fix, ExitStack would try to fix it *again* and get into an
# infinite self-referential loop
try:
with ExitStack() as stack:
stack.enter_context(gets_the_context_right(exc4))
stack.enter_context(gets_the_context_right(exc3))
stack.enter_context(gets_the_context_right(exc2))
raise exc1
except Exception as exc:
self.assertIs(exc, exc4)
self.assertIs(exc.__context__, exc3)
self.assertIs(exc.__context__.__context__, exc2)
self.assertIs(exc.__context__.__context__.__context__, exc1)
self.assertIsNone(
exc.__context__.__context__.__context__.__context__)
def test_exit_exception_with_existing_context(self):
# Addresses a lack of test coverage discovered after checking in a
# fix for issue 20317 that still contained debugging code.
def raise_nested(inner_exc, outer_exc):
try:
raise inner_exc
finally:
raise outer_exc
exc1 = Exception(1)
exc2 = Exception(2)
exc3 = Exception(3)
exc4 = Exception(4)
exc5 = Exception(5)
try:
with ExitStack() as stack:
stack.callback(raise_nested, exc4, exc5)
stack.callback(raise_nested, exc2, exc3)
raise exc1
except Exception as exc:
self.assertIs(exc, exc5)
self.assertIs(exc.__context__, exc4)
self.assertIs(exc.__context__.__context__, exc3)
self.assertIs(exc.__context__.__context__.__context__, exc2)
self.assertIs(
exc.__context__.__context__.__context__.__context__, exc1)
self.assertIsNone(
exc.__context__.__context__.__context__.__context__.__context__)
except Exception as e:
self.assertEqual(exc, e)
self.assertTrue(closable.closed)
class SuppressTestCase(TestCase):
def test_body_exception_suppress(self):
def suppress_exc(*exc_details):
return True
try:
with ExitStack() as stack:
stack.push(suppress_exc)
1/0
except IndexError as exc:
self.fail("Expected no exception, got IndexError")
def test_exit_exception_chaining_suppress(self):
with ExitStack() as stack:
stack.push(lambda *exc: True)
stack.push(lambda *exc: 1/0)
stack.push(lambda *exc: {}[1])
def test_excessive_nesting(self):
# The original implementation would die with RecursionError here
with ExitStack() as stack:
for i in range(10000):
stack.callback(int)
def test_instance_bypass(self):
class Example(object): pass
cm = Example()
cm.__exit__ = object()
stack = ExitStack()
self.assertRaises(AttributeError, stack.enter_context, cm)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1], cm)
class TestRedirectStdout(unittest.TestCase):
@support.requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = redirect_stdout.__doc__
obj = redirect_stdout(None)
self.assertEqual(obj.__doc__, cm_docstring)
def test_no_redirect_in_init(self):
orig_stdout = sys.stdout
redirect_stdout(None)
self.assertIs(sys.stdout, orig_stdout)
def test_redirect_to_string_io(self):
f = io.StringIO()
msg = "Consider an API like help(), which prints directly to stdout"
orig_stdout = sys.stdout
with redirect_stdout(f):
print(msg)
self.assertIs(sys.stdout, orig_stdout)
s = f.getvalue().strip()
self.assertEqual(s, msg)
def test_enter_result_is_target(self):
f = io.StringIO()
with redirect_stdout(f) as enter_result:
self.assertIs(enter_result, f)
def test_cm_is_reusable(self):
f = io.StringIO()
write_to_f = redirect_stdout(f)
orig_stdout = sys.stdout
with write_to_f:
print("Hello", end=" ")
with write_to_f:
print("World!")
self.assertIs(sys.stdout, orig_stdout)
s = f.getvalue()
self.assertEqual(s, "Hello World!\n")
def test_cm_is_reentrant(self):
f = io.StringIO()
write_to_f = redirect_stdout(f)
orig_stdout = sys.stdout
with write_to_f:
print("Hello", end=" ")
with write_to_f:
print("World!")
self.assertIs(sys.stdout, orig_stdout)
s = f.getvalue()
self.assertEqual(s, "Hello World!\n")
def test_suppress(self):
with suppress(ValueError, TypeError):
raise ValueError()
raise TypeError()
self.assertTrue(True)
class TestSuppress(unittest.TestCase):
@support.requires_docstrings
def test_instance_docs(self):
# Issue 19330: ensure context manager instances have good docstrings
cm_docstring = suppress.__doc__
obj = suppress()
self.assertEqual(obj.__doc__, cm_docstring)
def test_no_result_from_enter(self):
with suppress(ValueError) as enter_result:
self.assertIsNone(enter_result)
def test_no_exception(self):
with suppress(ValueError):
self.assertEqual(pow(2, 5), 32)
def test_exact_exception(self):
with suppress(TypeError):
len(5)
def test_exception_hierarchy(self):
with suppress(LookupError):
'Hello'[50]
def test_other_exception(self):
with self.assertRaises(ZeroDivisionError):
with suppress(TypeError):
1/0
def test_no_args(self):
with self.assertRaises(ZeroDivisionError):
with suppress():
1/0
def test_multiple_exception_args(self):
with suppress(ZeroDivisionError, TypeError):
1/0
with suppress(ZeroDivisionError, TypeError):
len(5)
def test_cm_is_reentrant(self):
ignore_exceptions = suppress(Exception)
with ignore_exceptions:
pass
with ignore_exceptions:
len(5)
with ignore_exceptions:
1/0
with ignore_exceptions: # Check nested usage
len(5)
if __name__ == "__main__":
unittest.main()
if __name__ == '__main__':
run_class(ContextManagerTestCase)
run_class(ClosingTestCase)
run_class(SuppressTestCase)