diff --git a/contextlib/contextlib.py b/contextlib/contextlib.py new file mode 100644 index 00000000..c4384fe0 --- /dev/null +++ b/contextlib/contextlib.py @@ -0,0 +1,340 @@ +"""Utilities for with-statement contexts. See PEP 343.""" + +import sys +from collections import deque +from functools import wraps + +__all__ = ["contextmanager", "closing", "ContextDecorator", "ExitStack", + "redirect_stdout", "suppress"] + + +class ContextDecorator(object): + "A base class or mixin that enables context managers to work as decorators." + + def _recreate_cm(self): + """Return a recreated instance of self. + + Allows an otherwise one-shot context manager like + _GeneratorContextManager to support use as + a decorator via implicit recreation. + + This is a private interface just for _GeneratorContextManager. + See issue #11647 for details. + """ + return self + + def __call__(self, func): + @wraps(func) + def inner(*args, **kwds): + with self._recreate_cm(): + return func(*args, **kwds) + return inner + + +class _GeneratorContextManager(ContextDecorator): + """Helper for @contextmanager decorator.""" + + def __init__(self, func, *args, **kwds): + self.gen = func(*args, **kwds) + self.func, self.args, self.kwds = func, args, kwds + # Issue 19330: ensure context manager instances have good docstrings + doc = getattr(func, "__doc__", None) + if doc is None: + doc = type(self).__doc__ + self.__doc__ = doc + # Unfortunately, this still doesn't provide good help output when + # inspecting the created context manager instances, since pydoc + # currently bypasses the instance docstring and shows the docstring + # for the class instead. + # See http://bugs.python.org/issue19404 for more details. + + def _recreate_cm(self): + # _GCM instances are one-shot context managers, so the + # CM must be recreated each time a decorated function is + # called + return self.__class__(self.func, *self.args, **self.kwds) + + def __enter__(self): + try: + return next(self.gen) + except StopIteration: + raise RuntimeError("generator didn't yield") from None + + def __exit__(self, type, value, traceback): + if type is None: + try: + next(self.gen) + except StopIteration: + return + else: + raise RuntimeError("generator didn't stop") + else: + if value is None: + # Need to force instantiation so we can reliably + # tell if we get the same exception back + value = type() + try: + self.gen.throw(type, value, traceback) + raise RuntimeError("generator didn't stop after throw()") + except StopIteration as exc: + # Suppress the exception *unless* it's the same exception that + # was passed to throw(). This prevents a StopIteration + # raised inside the "with" statement from being suppressed + return exc is not value + except: + # only re-raise if it's *not* the exception that was + # passed to throw(), because __exit__() must not raise + # an exception unless __exit__() itself failed. But throw() + # has to raise the exception to signal propagation, so this + # fixes the impedance mismatch between the throw() protocol + # and the __exit__() protocol. + # + if sys.exc_info()[1] is not value: + raise + + +def contextmanager(func): + """@contextmanager decorator. + + Typical usage: + + @contextmanager + def some_generator(): + + try: + yield + finally: + + + This makes this: + + with some_generator() as : + + + equivalent to this: + + + try: + = + + finally: + + + """ + @wraps(func) + def helper(*args, **kwds): + return _GeneratorContextManager(func, *args, **kwds) + return helper + + +class closing(object): + """Context to automatically close something at the end of a block. + + Code like this: + + with closing(.open()) as f: + + + is equivalent to this: + + f = .open() + try: + + finally: + f.close() + + """ + def __init__(self, thing): + self.thing = thing + def __enter__(self): + return self.thing + def __exit__(self, *exc_info): + self.thing.close() + +class redirect_stdout: + """Context manager for temporarily redirecting stdout to another file + + # How to send help() to stderr + with redirect_stdout(sys.stderr): + help(dir) + + # How to write help() to a file + with open('help.txt', 'w') as f: + with redirect_stdout(f): + help(pow) + """ + + def __init__(self, new_target): + self._new_target = new_target + # We use a list of old targets to make this CM re-entrant + self._old_targets = [] + + def __enter__(self): + self._old_targets.append(sys.stdout) + sys.stdout = self._new_target + return self._new_target + + def __exit__(self, exctype, excinst, exctb): + sys.stdout = self._old_targets.pop() + + +class suppress: + """Context manager to suppress specified exceptions + + After the exception is suppressed, execution proceeds with the next + statement following the with statement. + + with suppress(FileNotFoundError): + os.remove(somefile) + # Execution still resumes here if the file was already removed + """ + + def __init__(self, *exceptions): + self._exceptions = exceptions + + def __enter__(self): + pass + + def __exit__(self, exctype, excinst, exctb): + # Unlike isinstance and issubclass, CPython exception handling + # currently only looks at the concrete type hierarchy (ignoring + # the instance and subclass checking hooks). While Guido considers + # that a bug rather than a feature, it's a fairly hard one to fix + # due to various internal implementation details. suppress provides + # the simpler issubclass based semantics, rather than trying to + # exactly reproduce the limitations of the CPython interpreter. + # + # See http://bugs.python.org/issue12029 for more details + return exctype is not None and issubclass(exctype, self._exceptions) + + +# Inspired by discussions on http://bugs.python.org/issue13585 +class ExitStack(object): + """Context manager for dynamic management of a stack of exit callbacks + + For example: + + with ExitStack() as stack: + files = [stack.enter_context(open(fname)) for fname in filenames] + # All opened files will automatically be closed at the end of + # the with statement, even if attempts to open files later + # in the list raise an exception + + """ + def __init__(self): + self._exit_callbacks = deque() + + def pop_all(self): + """Preserve the context stack by transferring it to a new instance""" + new_stack = type(self)() + new_stack._exit_callbacks = self._exit_callbacks + self._exit_callbacks = deque() + return new_stack + + def _push_cm_exit(self, cm, cm_exit): + """Helper to correctly register callbacks to __exit__ methods""" + def _exit_wrapper(*exc_details): + return cm_exit(cm, *exc_details) + _exit_wrapper.__self__ = cm + self.push(_exit_wrapper) + + def push(self, exit): + """Registers a callback with the standard __exit__ method signature + + Can suppress exceptions the same way __exit__ methods can. + + Also accepts any object with an __exit__ method (registering a call + to the method instead of the object itself) + """ + # We use an unbound method rather than a bound method to follow + # the standard lookup behaviour for special methods + _cb_type = type(exit) + try: + exit_method = _cb_type.__exit__ + except AttributeError: + # Not a context manager, so assume its a callable + self._exit_callbacks.append(exit) + else: + self._push_cm_exit(exit, exit_method) + return exit # Allow use as a decorator + + def callback(self, callback, *args, **kwds): + """Registers an arbitrary callback and arguments. + + Cannot suppress exceptions. + """ + def _exit_wrapper(exc_type, exc, tb): + callback(*args, **kwds) + # We changed the signature, so using @wraps is not appropriate, but + # setting __wrapped__ may still help with introspection + _exit_wrapper.__wrapped__ = callback + self.push(_exit_wrapper) + return callback # Allow use as a decorator + + def enter_context(self, cm): + """Enters the supplied context manager + + If successful, also pushes its __exit__ method as a callback and + returns the result of the __enter__ method. + """ + # We look up the special methods on the type to match the with statement + _cm_type = type(cm) + _exit = _cm_type.__exit__ + result = _cm_type.__enter__(cm) + self._push_cm_exit(cm, _exit) + return result + + def close(self): + """Immediately unwind the context stack""" + self.__exit__(None, None, None) + + def __enter__(self): + return self + + def __exit__(self, *exc_details): + received_exc = exc_details[0] is not None + + # We manipulate the exception state so it behaves as though + # we were actually nesting multiple with statements + frame_exc = sys.exc_info()[1] + def _fix_exception_context(new_exc, old_exc): + # Context may not be correct, so find the end of the chain + while 1: + exc_context = new_exc.__context__ + if exc_context is old_exc: + # Context is already set correctly (see issue 20317) + return + if exc_context is None or exc_context is frame_exc: + break + new_exc = exc_context + # Change the end of the chain to point to the exception + # we expect it to reference + new_exc.__context__ = old_exc + + # Callbacks are invoked in LIFO order to match the behaviour of + # nested context managers + suppressed_exc = False + pending_raise = False + while self._exit_callbacks: + cb = self._exit_callbacks.pop() + try: + if cb(*exc_details): + suppressed_exc = True + pending_raise = False + exc_details = (None, None, None) + except: + new_exc_details = sys.exc_info() + # simulate the stack of exceptions by setting the context + _fix_exception_context(new_exc_details[1], exc_details[1]) + pending_raise = True + exc_details = new_exc_details + if pending_raise: + try: + # bare "raise exc_details[1]" replaces our carefully + # set-up context + fixed_ctx = exc_details[1].__context__ + raise exc_details[1] + except BaseException: + exc_details[1].__context__ = fixed_ctx + raise + return received_exc and suppressed_exc \ No newline at end of file diff --git a/contextlib/metadata.txt b/contextlib/metadata.txt new file mode 100644 index 00000000..ab1a0321 --- /dev/null +++ b/contextlib/metadata.txt @@ -0,0 +1,4 @@ +srctype = cpython +type = package +version = 3.4.2-0 +long_desc = Port of contextlib for micropython diff --git a/contextlib/setup.py b/contextlib/setup.py new file mode 100644 index 00000000..77265d44 --- /dev/null +++ b/contextlib/setup.py @@ -0,0 +1,18 @@ +import sys +# Remove current dir from sys.path, otherwise setuptools will peek up our +# module instead of system. +sys.path.pop(0) +from setuptools import setup + + +setup(name='micropython-contextlib', + version='3.4.2-0', + description='Port of contextlib for micropython', + long_description='Port of contextlib for micropython', + author='MicroPython Developers', + author_email='micro-python@googlegroups.com', + maintainer='MicroPython Developers', + maintainer_email='micro-python@googlegroups.com', + license='MIT', + packages=['contextlib'], + install_requires=['micropython-unittest']) diff --git a/contextlib/tests.py b/contextlib/tests.py new file mode 100644 index 00000000..0b41b210 --- /dev/null +++ b/contextlib/tests.py @@ -0,0 +1,828 @@ +"""Unit tests for contextlib.py, and other context managers.""" + +import io +import sys +import tempfile +import unittest +from contextlib import * # Tests __all__ +from test import support +try: + import threading +except ImportError: + threading = None + + +class ContextManagerTestCase(unittest.TestCase): + + def test_contextmanager_plain(self): + state = [] + @contextmanager + def woohoo(): + state.append(1) + yield 42 + state.append(999) + with woohoo() as x: + self.assertEqual(state, [1]) + self.assertEqual(x, 42) + state.append(x) + self.assertEqual(state, [1, 42, 999]) + + def test_contextmanager_finally(self): + state = [] + @contextmanager + def woohoo(): + state.append(1) + try: + yield 42 + finally: + state.append(999) + with self.assertRaises(ZeroDivisionError): + with woohoo() as x: + self.assertEqual(state, [1]) + self.assertEqual(x, 42) + state.append(x) + raise ZeroDivisionError() + self.assertEqual(state, [1, 42, 999]) + + def test_contextmanager_no_reraise(self): + @contextmanager + def whee(): + yield + ctx = whee() + ctx.__enter__() + # Calling __exit__ should not result in an exception + self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None)) + + def test_contextmanager_trap_yield_after_throw(self): + @contextmanager + def whoo(): + try: + yield + except: + yield + ctx = whoo() + ctx.__enter__() + self.assertRaises( + RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None + ) + + def test_contextmanager_except(self): + state = [] + @contextmanager + def woohoo(): + state.append(1) + try: + yield 42 + except ZeroDivisionError as e: + state.append(e.args[0]) + self.assertEqual(state, [1, 42, 999]) + with woohoo() as x: + self.assertEqual(state, [1]) + self.assertEqual(x, 42) + state.append(x) + raise ZeroDivisionError(999) + self.assertEqual(state, [1, 42, 999]) + + def _create_contextmanager_attribs(self): + def attribs(**kw): + def decorate(func): + for k,v in kw.items(): + setattr(func,k,v) + return func + return decorate + @contextmanager + @attribs(foo='bar') + def baz(spam): + """Whee!""" + return baz + + def test_contextmanager_attribs(self): + baz = self._create_contextmanager_attribs() + self.assertEqual(baz.__name__,'baz') + self.assertEqual(baz.foo, 'bar') + + @support.requires_docstrings + def test_contextmanager_doc_attrib(self): + baz = self._create_contextmanager_attribs() + self.assertEqual(baz.__doc__, "Whee!") + + @support.requires_docstrings + def test_instance_docstring_given_cm_docstring(self): + baz = self._create_contextmanager_attribs()(None) + self.assertEqual(baz.__doc__, "Whee!") + + +class ClosingTestCase(unittest.TestCase): + + @support.requires_docstrings + def test_instance_docs(self): + # Issue 19330: ensure context manager instances have good docstrings + cm_docstring = closing.__doc__ + obj = closing(None) + self.assertEqual(obj.__doc__, cm_docstring) + + def test_closing(self): + state = [] + class C: + def close(self): + state.append(1) + x = C() + self.assertEqual(state, []) + with closing(x) as y: + self.assertEqual(x, y) + self.assertEqual(state, [1]) + + def test_closing_error(self): + state = [] + class C: + def close(self): + state.append(1) + x = C() + self.assertEqual(state, []) + with self.assertRaises(ZeroDivisionError): + with closing(x) as y: + self.assertEqual(x, y) + 1 / 0 + self.assertEqual(state, [1]) + +class FileContextTestCase(unittest.TestCase): + + def testWithOpen(self): + tfn = tempfile.mktemp() + try: + f = None + with open(tfn, "w") as f: + self.assertFalse(f.closed) + f.write("Booh\n") + self.assertTrue(f.closed) + f = None + with self.assertRaises(ZeroDivisionError): + with open(tfn, "r") as f: + self.assertFalse(f.closed) + self.assertEqual(f.read(), "Booh\n") + 1 / 0 + self.assertTrue(f.closed) + finally: + support.unlink(tfn) + +@unittest.skipUnless(threading, 'Threading required for this test.') +class LockContextTestCase(unittest.TestCase): + + def boilerPlate(self, lock, locked): + self.assertFalse(locked()) + with lock: + self.assertTrue(locked()) + self.assertFalse(locked()) + with self.assertRaises(ZeroDivisionError): + with lock: + self.assertTrue(locked()) + 1 / 0 + self.assertFalse(locked()) + + def testWithLock(self): + lock = threading.Lock() + self.boilerPlate(lock, lock.locked) + + def testWithRLock(self): + lock = threading.RLock() + self.boilerPlate(lock, lock._is_owned) + + def testWithCondition(self): + lock = threading.Condition() + def locked(): + return lock._is_owned() + self.boilerPlate(lock, locked) + + def testWithSemaphore(self): + lock = threading.Semaphore() + def locked(): + if lock.acquire(False): + lock.release() + return False + else: + return True + self.boilerPlate(lock, locked) + + def testWithBoundedSemaphore(self): + lock = threading.BoundedSemaphore() + def locked(): + if lock.acquire(False): + lock.release() + return False + else: + return True + self.boilerPlate(lock, locked) + + +class mycontext(ContextDecorator): + """Example decoration-compatible context manager for testing""" + started = False + exc = None + catch = False + + def __enter__(self): + self.started = True + return self + + def __exit__(self, *exc): + self.exc = exc + return self.catch + + +class TestContextDecorator(unittest.TestCase): + + @support.requires_docstrings + def test_instance_docs(self): + # Issue 19330: ensure context manager instances have good docstrings + cm_docstring = mycontext.__doc__ + obj = mycontext() + self.assertEqual(obj.__doc__, cm_docstring) + + def test_contextdecorator(self): + context = mycontext() + with context as result: + self.assertIs(result, context) + self.assertTrue(context.started) + + self.assertEqual(context.exc, (None, None, None)) + + + def test_contextdecorator_with_exception(self): + context = mycontext() + + with self.assertRaisesRegex(NameError, 'foo'): + with context: + raise NameError('foo') + self.assertIsNotNone(context.exc) + self.assertIs(context.exc[0], NameError) + + context = mycontext() + context.catch = True + with context: + raise NameError('foo') + self.assertIsNotNone(context.exc) + self.assertIs(context.exc[0], NameError) + + + def test_decorator(self): + context = mycontext() + + @context + def test(): + self.assertIsNone(context.exc) + self.assertTrue(context.started) + test() + self.assertEqual(context.exc, (None, None, None)) + + + def test_decorator_with_exception(self): + context = mycontext() + + @context + def test(): + self.assertIsNone(context.exc) + self.assertTrue(context.started) + raise NameError('foo') + + with self.assertRaisesRegex(NameError, 'foo'): + test() + self.assertIsNotNone(context.exc) + self.assertIs(context.exc[0], NameError) + + + def test_decorating_method(self): + context = mycontext() + + class Test(object): + + @context + def method(self, a, b, c=None): + self.a = a + self.b = b + self.c = c + + # these tests are for argument passing when used as a decorator + test = Test() + test.method(1, 2) + self.assertEqual(test.a, 1) + self.assertEqual(test.b, 2) + self.assertEqual(test.c, None) + + test = Test() + test.method('a', 'b', 'c') + self.assertEqual(test.a, 'a') + self.assertEqual(test.b, 'b') + self.assertEqual(test.c, 'c') + + test = Test() + test.method(a=1, b=2) + self.assertEqual(test.a, 1) + self.assertEqual(test.b, 2) + + + def test_typo_enter(self): + class mycontext(ContextDecorator): + def __unter__(self): + pass + def __exit__(self, *exc): + pass + + with self.assertRaises(AttributeError): + with mycontext(): + pass + + + def test_typo_exit(self): + class mycontext(ContextDecorator): + def __enter__(self): + pass + def __uxit__(self, *exc): + pass + + with self.assertRaises(AttributeError): + with mycontext(): + pass + + + def test_contextdecorator_as_mixin(self): + class somecontext(object): + started = False + exc = None + + def __enter__(self): + self.started = True + return self + + def __exit__(self, *exc): + self.exc = exc + + class mycontext(somecontext, ContextDecorator): + pass + + context = mycontext() + @context + def test(): + self.assertIsNone(context.exc) + self.assertTrue(context.started) + test() + self.assertEqual(context.exc, (None, None, None)) + + + def test_contextmanager_as_decorator(self): + @contextmanager + def woohoo(y): + state.append(y) + yield + state.append(999) + + state = [] + @woohoo(1) + def test(x): + self.assertEqual(state, [1]) + state.append(x) + test('something') + self.assertEqual(state, [1, 'something', 999]) + + # Issue #11647: Ensure the decorated function is 'reusable' + state = [] + test('something else') + self.assertEqual(state, [1, 'something else', 999]) + + +class TestExitStack(unittest.TestCase): + + @support.requires_docstrings + def test_instance_docs(self): + # Issue 19330: ensure context manager instances have good docstrings + cm_docstring = ExitStack.__doc__ + obj = ExitStack() + self.assertEqual(obj.__doc__, cm_docstring) + + def test_no_resources(self): + with ExitStack(): + pass + + def test_callback(self): + expected = [ + ((), {}), + ((1,), {}), + ((1,2), {}), + ((), dict(example=1)), + ((1,), dict(example=1)), + ((1,2), dict(example=1)), + ] + result = [] + def _exit(*args, **kwds): + """Test metadata propagation""" + result.append((args, kwds)) + with ExitStack() as stack: + for args, kwds in reversed(expected): + if args and kwds: + f = stack.callback(_exit, *args, **kwds) + elif args: + f = stack.callback(_exit, *args) + elif kwds: + f = stack.callback(_exit, **kwds) + else: + f = stack.callback(_exit) + self.assertIs(f, _exit) + for wrapper in stack._exit_callbacks: + self.assertIs(wrapper.__wrapped__, _exit) + self.assertNotEqual(wrapper.__name__, _exit.__name__) + self.assertIsNone(wrapper.__doc__, _exit.__doc__) + self.assertEqual(result, expected) + + def test_push(self): + exc_raised = ZeroDivisionError + def _expect_exc(exc_type, exc, exc_tb): + self.assertIs(exc_type, exc_raised) + def _suppress_exc(*exc_details): + return True + def _expect_ok(exc_type, exc, exc_tb): + self.assertIsNone(exc_type) + self.assertIsNone(exc) + self.assertIsNone(exc_tb) + class ExitCM(object): + def __init__(self, check_exc): + self.check_exc = check_exc + def __enter__(self): + self.fail("Should not be called!") + def __exit__(self, *exc_details): + self.check_exc(*exc_details) + with ExitStack() as stack: + stack.push(_expect_ok) + self.assertIs(stack._exit_callbacks[-1], _expect_ok) + cm = ExitCM(_expect_ok) + stack.push(cm) + self.assertIs(stack._exit_callbacks[-1].__self__, cm) + stack.push(_suppress_exc) + self.assertIs(stack._exit_callbacks[-1], _suppress_exc) + cm = ExitCM(_expect_exc) + stack.push(cm) + self.assertIs(stack._exit_callbacks[-1].__self__, cm) + stack.push(_expect_exc) + self.assertIs(stack._exit_callbacks[-1], _expect_exc) + stack.push(_expect_exc) + self.assertIs(stack._exit_callbacks[-1], _expect_exc) + 1/0 + + def test_enter_context(self): + class TestCM(object): + def __enter__(self): + result.append(1) + def __exit__(self, *exc_details): + result.append(3) + + result = [] + cm = TestCM() + with ExitStack() as stack: + @stack.callback # Registered first => cleaned up last + def _exit(): + result.append(4) + self.assertIsNotNone(_exit) + stack.enter_context(cm) + self.assertIs(stack._exit_callbacks[-1].__self__, cm) + result.append(2) + self.assertEqual(result, [1, 2, 3, 4]) + + def test_close(self): + result = [] + with ExitStack() as stack: + @stack.callback + def _exit(): + result.append(1) + self.assertIsNotNone(_exit) + stack.close() + result.append(2) + self.assertEqual(result, [1, 2]) + + def test_pop_all(self): + result = [] + with ExitStack() as stack: + @stack.callback + def _exit(): + result.append(3) + self.assertIsNotNone(_exit) + new_stack = stack.pop_all() + result.append(1) + result.append(2) + new_stack.close() + self.assertEqual(result, [1, 2, 3]) + + def test_exit_raise(self): + with self.assertRaises(ZeroDivisionError): + with ExitStack() as stack: + stack.push(lambda *exc: False) + 1/0 + + def test_exit_suppress(self): + with ExitStack() as stack: + stack.push(lambda *exc: True) + 1/0 + + def test_exit_exception_chaining_reference(self): + # Sanity check to make sure that ExitStack chaining matches + # actual nested with statements + class RaiseExc: + def __init__(self, exc): + self.exc = exc + def __enter__(self): + return self + def __exit__(self, *exc_details): + raise self.exc + + class RaiseExcWithContext: + def __init__(self, outer, inner): + self.outer = outer + self.inner = inner + def __enter__(self): + return self + def __exit__(self, *exc_details): + try: + raise self.inner + except: + raise self.outer + + class SuppressExc: + def __enter__(self): + return self + def __exit__(self, *exc_details): + type(self).saved_details = exc_details + return True + + try: + with RaiseExc(IndexError): + with RaiseExcWithContext(KeyError, AttributeError): + with SuppressExc(): + with RaiseExc(ValueError): + 1 / 0 + except IndexError as exc: + self.assertIsInstance(exc.__context__, KeyError) + self.assertIsInstance(exc.__context__.__context__, AttributeError) + # Inner exceptions were suppressed + self.assertIsNone(exc.__context__.__context__.__context__) + else: + self.fail("Expected IndexError, but no exception was raised") + # Check the inner exceptions + inner_exc = SuppressExc.saved_details[1] + self.assertIsInstance(inner_exc, ValueError) + self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) + + def test_exit_exception_chaining(self): + # Ensure exception chaining matches the reference behaviour + def raise_exc(exc): + raise exc + + saved_details = None + def suppress_exc(*exc_details): + nonlocal saved_details + saved_details = exc_details + return True + + try: + with ExitStack() as stack: + stack.callback(raise_exc, IndexError) + stack.callback(raise_exc, KeyError) + stack.callback(raise_exc, AttributeError) + stack.push(suppress_exc) + stack.callback(raise_exc, ValueError) + 1 / 0 + except IndexError as exc: + self.assertIsInstance(exc.__context__, KeyError) + self.assertIsInstance(exc.__context__.__context__, AttributeError) + # Inner exceptions were suppressed + self.assertIsNone(exc.__context__.__context__.__context__) + else: + self.fail("Expected IndexError, but no exception was raised") + # Check the inner exceptions + inner_exc = saved_details[1] + self.assertIsInstance(inner_exc, ValueError) + self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) + + def test_exit_exception_non_suppressing(self): + # http://bugs.python.org/issue19092 + def raise_exc(exc): + raise exc + + def suppress_exc(*exc_details): + return True + + try: + with ExitStack() as stack: + stack.callback(lambda: None) + stack.callback(raise_exc, IndexError) + except Exception as exc: + self.assertIsInstance(exc, IndexError) + else: + self.fail("Expected IndexError, but no exception was raised") + + try: + with ExitStack() as stack: + stack.callback(raise_exc, KeyError) + stack.push(suppress_exc) + stack.callback(raise_exc, IndexError) + except Exception as exc: + self.assertIsInstance(exc, KeyError) + else: + self.fail("Expected KeyError, but no exception was raised") + + def test_exit_exception_with_correct_context(self): + # http://bugs.python.org/issue20317 + @contextmanager + def gets_the_context_right(exc): + try: + yield + finally: + raise exc + + exc1 = Exception(1) + exc2 = Exception(2) + exc3 = Exception(3) + exc4 = Exception(4) + + # The contextmanager already fixes the context, so prior to the + # fix, ExitStack would try to fix it *again* and get into an + # infinite self-referential loop + try: + with ExitStack() as stack: + stack.enter_context(gets_the_context_right(exc4)) + stack.enter_context(gets_the_context_right(exc3)) + stack.enter_context(gets_the_context_right(exc2)) + raise exc1 + except Exception as exc: + self.assertIs(exc, exc4) + self.assertIs(exc.__context__, exc3) + self.assertIs(exc.__context__.__context__, exc2) + self.assertIs(exc.__context__.__context__.__context__, exc1) + self.assertIsNone( + exc.__context__.__context__.__context__.__context__) + + def test_exit_exception_with_existing_context(self): + # Addresses a lack of test coverage discovered after checking in a + # fix for issue 20317 that still contained debugging code. + def raise_nested(inner_exc, outer_exc): + try: + raise inner_exc + finally: + raise outer_exc + exc1 = Exception(1) + exc2 = Exception(2) + exc3 = Exception(3) + exc4 = Exception(4) + exc5 = Exception(5) + try: + with ExitStack() as stack: + stack.callback(raise_nested, exc4, exc5) + stack.callback(raise_nested, exc2, exc3) + raise exc1 + except Exception as exc: + self.assertIs(exc, exc5) + self.assertIs(exc.__context__, exc4) + self.assertIs(exc.__context__.__context__, exc3) + self.assertIs(exc.__context__.__context__.__context__, exc2) + self.assertIs( + exc.__context__.__context__.__context__.__context__, exc1) + self.assertIsNone( + exc.__context__.__context__.__context__.__context__.__context__) + + + + def test_body_exception_suppress(self): + def suppress_exc(*exc_details): + return True + try: + with ExitStack() as stack: + stack.push(suppress_exc) + 1/0 + except IndexError as exc: + self.fail("Expected no exception, got IndexError") + + def test_exit_exception_chaining_suppress(self): + with ExitStack() as stack: + stack.push(lambda *exc: True) + stack.push(lambda *exc: 1/0) + stack.push(lambda *exc: {}[1]) + + def test_excessive_nesting(self): + # The original implementation would die with RecursionError here + with ExitStack() as stack: + for i in range(10000): + stack.callback(int) + + def test_instance_bypass(self): + class Example(object): pass + cm = Example() + cm.__exit__ = object() + stack = ExitStack() + self.assertRaises(AttributeError, stack.enter_context, cm) + stack.push(cm) + self.assertIs(stack._exit_callbacks[-1], cm) + +class TestRedirectStdout(unittest.TestCase): + + @support.requires_docstrings + def test_instance_docs(self): + # Issue 19330: ensure context manager instances have good docstrings + cm_docstring = redirect_stdout.__doc__ + obj = redirect_stdout(None) + self.assertEqual(obj.__doc__, cm_docstring) + + def test_no_redirect_in_init(self): + orig_stdout = sys.stdout + redirect_stdout(None) + self.assertIs(sys.stdout, orig_stdout) + + def test_redirect_to_string_io(self): + f = io.StringIO() + msg = "Consider an API like help(), which prints directly to stdout" + orig_stdout = sys.stdout + with redirect_stdout(f): + print(msg) + self.assertIs(sys.stdout, orig_stdout) + s = f.getvalue().strip() + self.assertEqual(s, msg) + + def test_enter_result_is_target(self): + f = io.StringIO() + with redirect_stdout(f) as enter_result: + self.assertIs(enter_result, f) + + def test_cm_is_reusable(self): + f = io.StringIO() + write_to_f = redirect_stdout(f) + orig_stdout = sys.stdout + with write_to_f: + print("Hello", end=" ") + with write_to_f: + print("World!") + self.assertIs(sys.stdout, orig_stdout) + s = f.getvalue() + self.assertEqual(s, "Hello World!\n") + + def test_cm_is_reentrant(self): + f = io.StringIO() + write_to_f = redirect_stdout(f) + orig_stdout = sys.stdout + with write_to_f: + print("Hello", end=" ") + with write_to_f: + print("World!") + self.assertIs(sys.stdout, orig_stdout) + s = f.getvalue() + self.assertEqual(s, "Hello World!\n") + + +class TestSuppress(unittest.TestCase): + + @support.requires_docstrings + def test_instance_docs(self): + # Issue 19330: ensure context manager instances have good docstrings + cm_docstring = suppress.__doc__ + obj = suppress() + self.assertEqual(obj.__doc__, cm_docstring) + + def test_no_result_from_enter(self): + with suppress(ValueError) as enter_result: + self.assertIsNone(enter_result) + + def test_no_exception(self): + with suppress(ValueError): + self.assertEqual(pow(2, 5), 32) + + def test_exact_exception(self): + with suppress(TypeError): + len(5) + + def test_exception_hierarchy(self): + with suppress(LookupError): + 'Hello'[50] + + def test_other_exception(self): + with self.assertRaises(ZeroDivisionError): + with suppress(TypeError): + 1/0 + + def test_no_args(self): + with self.assertRaises(ZeroDivisionError): + with suppress(): + 1/0 + + def test_multiple_exception_args(self): + with suppress(ZeroDivisionError, TypeError): + 1/0 + with suppress(ZeroDivisionError, TypeError): + len(5) + + def test_cm_is_reentrant(self): + ignore_exceptions = suppress(Exception) + with ignore_exceptions: + pass + with ignore_exceptions: + len(5) + with ignore_exceptions: + 1/0 + with ignore_exceptions: # Check nested usage + len(5) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file