diff --git a/eden/scm/edenscm/mercurial/connectionpool.py b/eden/scm/edenscm/mercurial/connectionpool.py index c7ab3a0115..594ff1c165 100644 --- a/eden/scm/edenscm/mercurial/connectionpool.py +++ b/eden/scm/edenscm/mercurial/connectionpool.py @@ -70,19 +70,7 @@ class connectionpool(object): pass if conn is None: - - def _cleanup(orig): - # close pipee first so peer._cleanup reading it won't deadlock, - # if there are other processes with pipeo open (i.e. us). - peer = orig.__self__ - if util.safehasattr(peer, "_pipee"): - peer._pipee.close() - return orig() - peer = hg.peer(self._repo.ui, {}, path) - if util.safehasattr(peer, "_cleanup"): - extensions.wrapfunction(peer, "_cleanup", _cleanup) - conn = connection(self._repo.ui, pathpool, peer, path) else: self._repo.ui.debug("reusing connection from pool\n") diff --git a/eden/scm/edenscm/mercurial/sshpeer.py b/eden/scm/edenscm/mercurial/sshpeer.py index 7949b2ca29..64b4c86307 100644 --- a/eden/scm/edenscm/mercurial/sshpeer.py +++ b/eden/scm/edenscm/mercurial/sshpeer.py @@ -99,7 +99,6 @@ class threadedstderr(object): def __init__(self, ui, stderr): self._ui = ui self._stderr = stderr - self._stop = False self._thread = None def start(self): @@ -111,16 +110,21 @@ class threadedstderr(object): def run(self): # type: () -> None - while not self._stop: - buf = self._stderr.readline() + while True: + try: + buf = self._stderr.readline() + except (Exception, KeyboardInterrupt): + # Not fatal. Treat it as if the stderr stream has ended. + break if len(buf) == 0: break _writessherror(self._ui, buf) - def close(self): - # type: () -> None - self._stop = True + # Close the pipe. It's likely already closed on the other end. + # Note: during readline(), close() will raise an IOError. So there is + # no "close" method that can be used by the main thread. + self._stderr.close() def join(self, timeout): if self._thread: @@ -271,8 +275,44 @@ class sshpeer(wireproto.wirepeer): global _totalbytessent, _totalbytesreceived if self._pipeo is None: return + + # Close the pipe connecting to the stdin of the remote ssh process. + # This means if the remote process tries to read its stdin, it will get + # an empty buffer that indicates EOF. The remote process should then + # exit, which will close its stdout and stderr so the background stderr + # reader thread will notice that it reaches EOF and becomes joinable. self._pipeo.close() + + # Wait for the stderr thread to complete reading all stderr text from + # the remote ssh process (i.e. hitting EOF). + # + # This must be after pipeo.close(). Otherwise the remote process might + # still wait for stdin and does not close its stderr. + # + # This is better before pipei.close(). Otherwise the remote process + # might nondeterministically get EPIPE when writing to its stdout, + # which can trigger different code paths nondeterministically that + # might affect stderr. In other words, moving this after pipei.close() + # can potentially increase test flakiness. + if util.istest(): + # In the test environment, we control all remote processes. They + # are expected to exit after getting EOF from stdin. Wait + # indefinitely to make sure all stderr messages are received. + # + # If this line hangs forever, that indicates a bug in the remote + # process, not here. + self._pipee.join(None) + else: + # In real world environment, remote processes might mis-behave. + # Therefore be inpatient on waiting. + self._pipee.join(1) + + # Close the pipe connected to the stdout of the remote process. + # The remote end of the pipe is likely already closed since we waited + # the pipee thread. If not, the remote process will get EPIPE or + # SIGPIPE if it writes a bit more to its stdout. self._pipei.close() + _totalbytessent += self._pipeo._totalbytes _totalbytesreceived += self._pipei._totalbytes self.ui.log( @@ -281,14 +321,6 @@ class sshpeer(wireproto.wirepeer): sshbytessent=_totalbytessent, sshbytesreceived=_totalbytesreceived, ) - self._pipee.close() - - if util.istest(): - # Let's give the thread a bit of time to complete, in the case - # where the pipe is somehow still open, the read call will block on it - # forever. In this case, there isn't anything to read anyway, so - # waiting more would just cause Mercurial to hang. - self._pipee.join(1) __del__ = _cleanup