mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 00:14:35 +03:00
e1c353db12
This is similar in spirit to contextlib.nested in Python <= 2.6, but uses an extra level of indirection to avoid its inability to clean up if an __enter__ method raises an exception. Why add this mechanism? It greatly simplifies scoped resource management, and lets us eliminate several hundred lines of try/finally blocks. In many of these cases the "finally" is separated from the "try" by hundreds of lines of code, which makes the connection between resource acquisition and disposal difficult to follow. (The preferred mechanism would be the "multi-with" syntax of 2.7+, but Mercurial can't move to 2.7 for a while.) Intended use: >>> with ctxmanager(lambda: file('foo'), lambda: file('bar')) as c: >>> f1, f2 = c() This will open both foo and bar when c() is invoked, and will close both upon exit from the block. If the attempt to open bar raises an exception, the block will not be entered - but foo will still be closed.
78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
from __future__ import absolute_import
|
|
|
|
import silenttestrunner
|
|
import unittest
|
|
|
|
from mercurial.util import ctxmanager
|
|
|
|
class contextmanager(object):
|
|
def __init__(self, name, trace):
|
|
self.name = name
|
|
self.entered = False
|
|
self.exited = False
|
|
self.trace = trace
|
|
|
|
def __enter__(self):
|
|
self.entered = True
|
|
self.trace(('enter', self.name))
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.exited = exc_type, exc_val, exc_tb
|
|
self.trace(('exit', self.name))
|
|
|
|
def __repr__(self):
|
|
return '<ctx %r>' % self.name
|
|
|
|
class ctxerror(Exception):
|
|
pass
|
|
|
|
class raise_on_enter(contextmanager):
|
|
def __enter__(self):
|
|
self.trace(('raise', self.name))
|
|
raise ctxerror(self.name)
|
|
|
|
class raise_on_exit(contextmanager):
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.trace(('raise', self.name))
|
|
raise ctxerror(self.name)
|
|
|
|
def ctxmgr(name, trace):
|
|
return lambda: contextmanager(name, trace)
|
|
|
|
class test_ctxmanager(unittest.TestCase):
|
|
def test_basics(self):
|
|
trace = []
|
|
addtrace = trace.append
|
|
with ctxmanager(ctxmgr('a', addtrace), ctxmgr('b', addtrace)) as c:
|
|
a, b = c()
|
|
c.atexit(addtrace, ('atexit', 'x'))
|
|
c.atexit(addtrace, ('atexit', 'y'))
|
|
self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'),
|
|
('atexit', 'y'), ('atexit', 'x'),
|
|
('exit', 'b'), ('exit', 'a')])
|
|
|
|
def test_raise_on_enter(self):
|
|
trace = []
|
|
addtrace = trace.append
|
|
with self.assertRaises(ctxerror):
|
|
with ctxmanager(ctxmgr('a', addtrace),
|
|
lambda: raise_on_enter('b', addtrace)) as c:
|
|
c()
|
|
addtrace('unreachable')
|
|
self.assertEqual(trace, [('enter', 'a'), ('raise', 'b'), ('exit', 'a')])
|
|
|
|
def test_raise_on_exit(self):
|
|
trace = []
|
|
addtrace = trace.append
|
|
with self.assertRaises(ctxerror):
|
|
with ctxmanager(ctxmgr('a', addtrace),
|
|
lambda: raise_on_exit('b', addtrace)) as c:
|
|
c()
|
|
addtrace('running')
|
|
self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), 'running',
|
|
('raise', 'b'), ('exit', 'a')])
|
|
|
|
if __name__ == '__main__':
|
|
silenttestrunner.main(__name__)
|