Change the Windows pipe functions to not return until all the bytes are read/written or error.

Summary: Change the Windows pipe read and write functions to read/write in a loop. Plus changed the functions prototype to match the POSIX version.

Reviewed By: strager

Differential Revision: D13091785

fbshipit-source-id: 375b22bd9e62d371a78d410f42068945b966a743
This commit is contained in:
Puneet Kaushik 2018-11-26 08:16:38 -08:00 committed by Facebook Github Bot
parent 060d62f3c6
commit f257be1f9a
6 changed files with 101 additions and 99 deletions

View File

@ -29,6 +29,7 @@
#else
#include "eden/win/fs/utils/Pipe.h" // @manual
#include "eden/win/fs/utils/Subprocess.h" // @manual
#include "eden/win/fs/utils/WinError.h" // @manual
#endif
#include <mutex>
@ -55,8 +56,8 @@ using folly::StringPiece;
#ifndef EDEN_WIN
using folly::Subprocess;
#else
using facebook::edenwin::Pipe;
using facebook::edenwin::Subprocess;
using facebook::eden::Pipe;
using facebook::eden::Subprocess;
#endif
using folly::io::Appender;
using folly::io::Cursor;
@ -746,13 +747,12 @@ HgImporter::TransactionID HgImporter::sendFetchTreeRequest(
void HgImporter::readFromHelper(void* buf, size_t size, StringPiece context) {
size_t bytesRead;
#ifdef EDEN_WIN
DWORD winBytesRead;
int result = 0;
try {
facebook::edenwin::Pipe::read(helperOut_, buf, size, &winBytesRead);
bytesRead = Pipe::read(helperOut_, buf, size);
} catch (const std::exception& ex) {
// The Pipe::read() code can throw std::system_error. Translate this to
// The Pipe::read() code can throw std::system_error. Translate this to
// HgImporterError so that the higher-level code will retry on this error.
HgImporterError importErr(
"error reading ",
@ -762,7 +762,6 @@ void HgImporter::readFromHelper(void* buf, size_t size, StringPiece context) {
XLOG(ERR) << importErr.what();
throw importErr;
}
bytesRead = winBytesRead;
#else
auto result = folly::readFull(helperOut_, buf, size);
if (result < 0) {
@ -781,7 +780,7 @@ void HgImporter::readFromHelper(void* buf, size_t size, StringPiece context) {
// This generally means that it exited.
HgImporterError err(
"received unexpected EOF from hg_import_helper.py after ",
result,
bytesRead,
" bytes while reading ",
context);
XLOG(ERR) << err.what();
@ -795,7 +794,7 @@ void HgImporter::writeToHelper(
StringPiece context) {
#ifdef EDEN_WIN
try {
facebook::edenwin::Pipe::writeiov(helperIn_, iov, numIov);
auto result = Pipe::writeiov(helperIn_, iov, numIov);
} catch (const std::exception& ex) {
// The Pipe::read() code can throw std::system_error. Translate this to
// HgImporterError so that the higher-level code will retry on this error.

View File

@ -291,7 +291,7 @@ class HgImporter : public Importer {
#ifndef EDEN_WIN
folly::Subprocess helper_;
#else
facebook::edenwin::Subprocess helper_;
facebook::eden::Subprocess helper_;
#endif
const AbsolutePath repoPath_;
LocalStore* const store_{nullptr};

View File

@ -16,10 +16,11 @@
#include <iostream>
#include <memory>
#include <vector>
#include "eden/win/fs/utils/WinError.h"
#include "folly/logging/xlog.h"
namespace facebook {
namespace edenwin {
namespace eden {
// Pipe constructor will either use security attr or the inherit flag.
// If the security attribute is nullptr it will create one and will use the
@ -35,7 +36,7 @@ Pipe::Pipe(PSECURITY_ATTRIBUTES securityAttr, bool inherit) {
if (!CreatePipe(&readHandle, &writeHandle, securityAttr, NULL)) {
throw std::system_error(
GetLastError(), std::system_category(), "CreatePipe failed\n");
GetLastError(), Win32ErrorCategory(), "Failed to create a pipe");
}
XLOG(DBG5) << "Handle Created: Read: " << readHandle
<< " Write: " << writeHandle << std::endl;
@ -50,89 +51,90 @@ Pipe::~Pipe() {
}
}
void Pipe::read(void* buffer, DWORD bytesToRead, LPDWORD bytesRead) {
DWORD localBytesRead;
if (!ReadFile(
readHandle,
buffer,
bytesToRead,
bytesRead ? bytesRead : &localBytesRead,
nullptr)) {
throw std::system_error(
GetLastError(), std::system_category(), "ReadFile failed");
}
}
size_t Pipe::read(HANDLE handle, void* buffer, DWORD bytesToRead) {
size_t bytesRead = 0;
DWORD read = 0;
DWORD remainingBytes = bytesToRead;
void Pipe::write(void* buffer, DWORD bytesToWrite, LPDWORD bytesWritten) {
DWORD localBytesWritten;
if (!WriteFile(
writeHandle,
buffer,
bytesToWrite,
bytesWritten ? bytesWritten : &localBytesWritten,
nullptr)) {
throw std::system_error(
GetLastError(), std::system_category(), "WriteFile failed");
}
}
while (remainingBytes > 0) {
if (!ReadFile(
handle,
((char*)buffer + bytesRead),
remainingBytes,
&read,
nullptr)) {
DWORD error = GetLastError();
XLOGF(
ERR,
"Error while reading from the pipe : bytesRead {}, Error: {} : {}",
bytesRead,
error,
win32ErrorToString(error));
void Pipe::read(
HANDLE handle,
void* buffer,
DWORD bytesToRead,
LPDWORD bytesRead) {
DWORD localBytesRead;
if (!ReadFile(handle, buffer, bytesToRead, &localBytesRead, nullptr)) {
throw std::system_error(
GetLastError(), std::system_category(), "ReadFile failed");
throw std::system_error(
error,
Win32ErrorCategory::get(),
"Error while reading from the pipe");
}
bytesRead += read;
remainingBytes -= read;
}
if (bytesRead) {
*bytesRead = localBytesRead;
}
XLOG(DBG5) << "Pipe::read-- bytesToRead:" << bytesToRead << "bytesRead"
<< localBytesRead << std::endl;
<< bytesRead << std::endl;
return bytesRead;
}
size_t Pipe::write(HANDLE handle, void* buffer, DWORD bytesToWrite) {
size_t bytesWritten = 0;
DWORD written = 0;
DWORD remainingBytes = bytesToWrite;
while (remainingBytes > 0) {
if (!WriteFile(
handle,
(void*)((char*)buffer + bytesWritten),
remainingBytes,
&written,
nullptr)) {
DWORD error = GetLastError();
XLOGF(
ERR,
"Error while writing to the pipe : bytesWritten {}, {} : {}",
bytesWritten,
error,
win32ErrorToString(error));
throw std::system_error(
error, Win32ErrorCategory::get(), "Error while writing to the pipe");
}
bytesWritten += written;
remainingBytes -= written;
}
XLOG(DBG5) << "Pipe::Write-- bytesToWrite:" << bytesToWrite << "bytesWritten"
<< bytesWritten << std::endl;
return bytesWritten;
}
size_t Pipe::writeiov(HANDLE handle, iovec* iov, int count) {
DWORD localBytesWritten;
size_t bytesWritten = 0;
DWORD written = 0;
for (int i = 0; i < count; i++) {
if (!WriteFile(
handle,
iov[i].iov_base,
iov[i].iov_len,
&localBytesWritten,
nullptr)) {
throw std::system_error(
GetLastError(), std::system_category(), "WriteFile failed");
}
written = write(handle, iov[i].iov_base, iov[i].iov_len);
bytesWritten += written;
}
// TODO: localBytesWritten - it should be sum of all the write ops
return localBytesWritten;
return bytesWritten;
}
void Pipe::write(
HANDLE handle,
void* buffer,
DWORD bytesToWrite,
LPDWORD bytesWritten) {
DWORD localBytesWritten;
if (!WriteFile(handle, buffer, bytesToWrite, &localBytesWritten, nullptr)) {
throw std::system_error(
GetLastError(), std::system_category(), "WriteFile failed");
}
FlushFileBuffers(handle);
if (bytesWritten) {
*bytesWritten = localBytesWritten;
}
XLOG(DBG5) << "Pipe::write-- bytesToWrite" << bytesToWrite << "bytesWritten"
<< localBytesWritten << std::endl;
size_t Pipe::read(void* buffer, DWORD bytesToRead) {
return read(readHandle, buffer, bytesToRead);
}
} // namespace edenwin
size_t Pipe::write(void* buffer, DWORD bytesToWrite) {
return write(writeHandle, buffer, bytesToWrite);
}
} // namespace eden
} // namespace facebook

View File

@ -12,7 +12,7 @@
#include <folly/portability/Windows.h>
namespace facebook {
namespace edenwin {
namespace eden {
class Pipe {
public:
@ -22,20 +22,18 @@ class Pipe {
Pipe(PSECURITY_ATTRIBUTES securityAttr, bool inherit);
~Pipe();
void read(void* buffer, DWORD BytesToRead, LPDWORD BytesRead = nullptr);
void write(void* buffer, DWORD BytesToWrite, LPDWORD BytesWritten = nullptr);
//
// Following read/write pipe functions return the number of bytes read or <0
// on error
//
static void read(
HANDLE handle,
void* buffer,
DWORD BytesToRead,
LPDWORD BytesRead = nullptr);
static void write(
HANDLE handle,
void* buffer,
DWORD BytesToWrite,
LPDWORD BytesWritten = nullptr);
size_t read(void* buffer, DWORD BytesToRead);
size_t write(void* buffer, DWORD BytesToWrite);
static size_t read(HANDLE handle, void* buffer, DWORD BytesToRead);
static size_t write(HANDLE handle, void* buffer, DWORD BytesToWrite);
static size_t writeiov(HANDLE handle, iovec* iov, int count);
};
} // namespace edenwin
} // namespace eden
} // namespace facebook

View File

@ -18,7 +18,8 @@
#include "Pipe.h"
#include "eden/win/fs/Edenwin.h"
using namespace facebook::edenwin;
namespace facebook {
namespace eden {
using namespace std;
Subprocess::Subprocess() {}
@ -71,3 +72,5 @@ void Subprocess::createSubprocess(
CloseHandle(procInfo.hThread);
}
}
} // namespace eden
} // namespace facebook

View File

@ -14,7 +14,7 @@
#include <vector>
namespace facebook {
namespace edenwin {
namespace eden {
class Pipe;
@ -36,5 +36,5 @@ class Subprocess {
const int bufferSize_ = 4096;
};
} // namespace edenwin
} // namespace eden
} // namespace facebook