From 251fe1b775fab74c00d6fbfde754b07a2207fb86 Mon Sep 17 00:00:00 2001 From: Jun Wu Date: Wed, 26 Feb 2020 17:05:43 -0800 Subject: [PATCH] sshpeer: always read all stderr messages Summary: In the past I saw test breakages where the stderr from the remote ssh process becomes incomplete. It's hard to reproduce by running the tests directly. But inserting a sleep in the background stderr thread exposes it trivially: ``` # sshpeer.py:class threadedstderr def run(self): # type: () -> None while not self._stop: buf = self._stderr.readline() + import time + time.sleep(5) if len(buf) == 0: break ``` Example test breakage: ``` --- a/test-commitcloud-sync.t +++ b/test-commitcloud-sync.t.err @@ -167,8 +167,7 @@ Make a commit in the first client, and sync it $ hg cloud sync commitcloud: synchronizing 'server' with 'user/test/default' backing up stack rooted at fa5d62c46fd7 remote: pushing 1 commit: - remote: fa5d62c46fd7 commit1 commitcloud: commits synchronized finished in * (glob) .... ``` Upon investigation it's caused by 2 factors: - The connection pool calls pipee.close() before pipeo.close(), to workaround an issue that I suspect solved by D19794281. - The new threaded stderr (pipee)'s close() method does not actually closes the pipe immediately. Instead, it limits the text to read to one more line at most, which causes those incomplete messages. This diff made the following changes: - Remove the `pipee.close` workaround in connectionpool. - Remove `pipee.close`. Embed it in `pipee.join` to prevent misuses. - Add detailed comments in sshpeer.py for the subtle behaviors. Reviewed By: xavierd Differential Revision: D19872610 fbshipit-source-id: 4b61ef8f9db81c6c347ac4a634e41dec544c05d0 --- eden/scm/edenscm/mercurial/connectionpool.py | 12 ---- eden/scm/edenscm/mercurial/sshpeer.py | 60 +++++++++++++++----- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/eden/scm/edenscm/mercurial/connectionpool.py b/eden/scm/edenscm/mercurial/connectionpool.py index c7ab3a0115..594ff1c165 100644 --- a/eden/scm/edenscm/mercurial/connectionpool.py +++ b/eden/scm/edenscm/mercurial/connectionpool.py @@ -70,19 +70,7 @@ class connectionpool(object): pass if conn is None: - - def _cleanup(orig): - # close pipee first so peer._cleanup reading it won't deadlock, - # if there are other processes with pipeo open (i.e. us). - peer = orig.__self__ - if util.safehasattr(peer, "_pipee"): - peer._pipee.close() - return orig() - peer = hg.peer(self._repo.ui, {}, path) - if util.safehasattr(peer, "_cleanup"): - extensions.wrapfunction(peer, "_cleanup", _cleanup) - conn = connection(self._repo.ui, pathpool, peer, path) else: self._repo.ui.debug("reusing connection from pool\n") diff --git a/eden/scm/edenscm/mercurial/sshpeer.py b/eden/scm/edenscm/mercurial/sshpeer.py index 7949b2ca29..64b4c86307 100644 --- a/eden/scm/edenscm/mercurial/sshpeer.py +++ b/eden/scm/edenscm/mercurial/sshpeer.py @@ -99,7 +99,6 @@ class threadedstderr(object): def __init__(self, ui, stderr): self._ui = ui self._stderr = stderr - self._stop = False self._thread = None def start(self): @@ -111,16 +110,21 @@ class threadedstderr(object): def run(self): # type: () -> None - while not self._stop: - buf = self._stderr.readline() + while True: + try: + buf = self._stderr.readline() + except (Exception, KeyboardInterrupt): + # Not fatal. Treat it as if the stderr stream has ended. + break if len(buf) == 0: break _writessherror(self._ui, buf) - def close(self): - # type: () -> None - self._stop = True + # Close the pipe. It's likely already closed on the other end. + # Note: during readline(), close() will raise an IOError. So there is + # no "close" method that can be used by the main thread. + self._stderr.close() def join(self, timeout): if self._thread: @@ -271,8 +275,44 @@ class sshpeer(wireproto.wirepeer): global _totalbytessent, _totalbytesreceived if self._pipeo is None: return + + # Close the pipe connecting to the stdin of the remote ssh process. + # This means if the remote process tries to read its stdin, it will get + # an empty buffer that indicates EOF. The remote process should then + # exit, which will close its stdout and stderr so the background stderr + # reader thread will notice that it reaches EOF and becomes joinable. self._pipeo.close() + + # Wait for the stderr thread to complete reading all stderr text from + # the remote ssh process (i.e. hitting EOF). + # + # This must be after pipeo.close(). Otherwise the remote process might + # still wait for stdin and does not close its stderr. + # + # This is better before pipei.close(). Otherwise the remote process + # might nondeterministically get EPIPE when writing to its stdout, + # which can trigger different code paths nondeterministically that + # might affect stderr. In other words, moving this after pipei.close() + # can potentially increase test flakiness. + if util.istest(): + # In the test environment, we control all remote processes. They + # are expected to exit after getting EOF from stdin. Wait + # indefinitely to make sure all stderr messages are received. + # + # If this line hangs forever, that indicates a bug in the remote + # process, not here. + self._pipee.join(None) + else: + # In real world environment, remote processes might mis-behave. + # Therefore be inpatient on waiting. + self._pipee.join(1) + + # Close the pipe connected to the stdout of the remote process. + # The remote end of the pipe is likely already closed since we waited + # the pipee thread. If not, the remote process will get EPIPE or + # SIGPIPE if it writes a bit more to its stdout. self._pipei.close() + _totalbytessent += self._pipeo._totalbytes _totalbytesreceived += self._pipei._totalbytes self.ui.log( @@ -281,14 +321,6 @@ class sshpeer(wireproto.wirepeer): sshbytessent=_totalbytessent, sshbytesreceived=_totalbytesreceived, ) - self._pipee.close() - - if util.istest(): - # Let's give the thread a bit of time to complete, in the case - # where the pipe is somehow still open, the read call will block on it - # forever. In this case, there isn't anything to read anyway, so - # waiting more would just cause Mercurial to hang. - self._pipee.join(1) __del__ = _cleanup