sshpeer: always read all stderr messages

Summary:
In the past I saw test breakages where the stderr from the remote ssh process
becomes incomplete. It's hard to reproduce by running the tests directly.
But inserting a sleep in the background stderr thread exposes it trivially:

```
# sshpeer.py:class threadedstderr
     def run(self):
         # type: () -> None
         while not self._stop:
             buf = self._stderr.readline()
+            import time
+            time.sleep(5)
             if len(buf) == 0:
                 break
```

Example test breakage:

```
 --- a/test-commitcloud-sync.t
+++ b/test-commitcloud-sync.t.err
@@ -167,8 +167,7 @@ Make a commit in the first client, and sync it
   $ hg cloud sync
   commitcloud: synchronizing 'server' with 'user/test/default'
   backing up stack rooted at fa5d62c46fd7
   remote: pushing 1 commit:
-  remote:     fa5d62c46fd7  commit1
   commitcloud: commits synchronized
   finished in * (glob)
....
```

Upon investigation it's caused by 2 factors:
- The connection pool calls pipee.close() before pipeo.close(), to workaround
  an issue that I suspect solved by D19794281.
- The new threaded stderr (pipee)'s close() method does not actually closes the
  pipe immediately. Instead, it limits the text to read to one more line at
  most, which causes those incomplete messages.

This diff made the following changes:
- Remove the `pipee.close` workaround in connectionpool.
- Remove `pipee.close`. Embed it in `pipee.join` to prevent misuses.
- Add detailed comments in sshpeer.py for the subtle behaviors.

Reviewed By: xavierd

Differential Revision: D19872610

fbshipit-source-id: 4b61ef8f9db81c6c347ac4a634e41dec544c05d0
This commit is contained in:
Jun Wu 2020-02-26 17:05:43 -08:00 committed by Facebook Github Bot
parent 7f38170116
commit 251fe1b775
2 changed files with 46 additions and 26 deletions

View File

@ -70,19 +70,7 @@ class connectionpool(object):
pass
if conn is None:
def _cleanup(orig):
# close pipee first so peer._cleanup reading it won't deadlock,
# if there are other processes with pipeo open (i.e. us).
peer = orig.__self__
if util.safehasattr(peer, "_pipee"):
peer._pipee.close()
return orig()
peer = hg.peer(self._repo.ui, {}, path)
if util.safehasattr(peer, "_cleanup"):
extensions.wrapfunction(peer, "_cleanup", _cleanup)
conn = connection(self._repo.ui, pathpool, peer, path)
else:
self._repo.ui.debug("reusing connection from pool\n")

View File

@ -99,7 +99,6 @@ class threadedstderr(object):
def __init__(self, ui, stderr):
self._ui = ui
self._stderr = stderr
self._stop = False
self._thread = None
def start(self):
@ -111,16 +110,21 @@ class threadedstderr(object):
def run(self):
# type: () -> None
while not self._stop:
buf = self._stderr.readline()
while True:
try:
buf = self._stderr.readline()
except (Exception, KeyboardInterrupt):
# Not fatal. Treat it as if the stderr stream has ended.
break
if len(buf) == 0:
break
_writessherror(self._ui, buf)
def close(self):
# type: () -> None
self._stop = True
# Close the pipe. It's likely already closed on the other end.
# Note: during readline(), close() will raise an IOError. So there is
# no "close" method that can be used by the main thread.
self._stderr.close()
def join(self, timeout):
if self._thread:
@ -271,8 +275,44 @@ class sshpeer(wireproto.wirepeer):
global _totalbytessent, _totalbytesreceived
if self._pipeo is None:
return
# Close the pipe connecting to the stdin of the remote ssh process.
# This means if the remote process tries to read its stdin, it will get
# an empty buffer that indicates EOF. The remote process should then
# exit, which will close its stdout and stderr so the background stderr
# reader thread will notice that it reaches EOF and becomes joinable.
self._pipeo.close()
# Wait for the stderr thread to complete reading all stderr text from
# the remote ssh process (i.e. hitting EOF).
#
# This must be after pipeo.close(). Otherwise the remote process might
# still wait for stdin and does not close its stderr.
#
# This is better before pipei.close(). Otherwise the remote process
# might nondeterministically get EPIPE when writing to its stdout,
# which can trigger different code paths nondeterministically that
# might affect stderr. In other words, moving this after pipei.close()
# can potentially increase test flakiness.
if util.istest():
# In the test environment, we control all remote processes. They
# are expected to exit after getting EOF from stdin. Wait
# indefinitely to make sure all stderr messages are received.
#
# If this line hangs forever, that indicates a bug in the remote
# process, not here.
self._pipee.join(None)
else:
# In real world environment, remote processes might mis-behave.
# Therefore be inpatient on waiting.
self._pipee.join(1)
# Close the pipe connected to the stdout of the remote process.
# The remote end of the pipe is likely already closed since we waited
# the pipee thread. If not, the remote process will get EPIPE or
# SIGPIPE if it writes a bit more to its stdout.
self._pipei.close()
_totalbytessent += self._pipeo._totalbytes
_totalbytesreceived += self._pipei._totalbytes
self.ui.log(
@ -281,14 +321,6 @@ class sshpeer(wireproto.wirepeer):
sshbytessent=_totalbytessent,
sshbytesreceived=_totalbytesreceived,
)
self._pipee.close()
if util.istest():
# Let's give the thread a bit of time to complete, in the case
# where the pipe is somehow still open, the read call will block on it
# forever. In this case, there isn't anything to read anyway, so
# waiting more would just cause Mercurial to hang.
self._pipee.join(1)
__del__ = _cleanup