mirror of
https://github.com/facebook/sapling.git
synced 2024-12-25 05:53:24 +03:00
win: make EdenDispatcher::read asynchronous
Summary: This merely moves Windows bits outside of the EdenDispatcher and into the PrjfsChannel, making the former less dependant on Windows, and paving the way to handling this callback fully asynchronous. One caveat currently is that while the callback supports specifying an offset and length, the underlying backing store only allows reading the entire file at once, thus these arguments are simply ignored in the dispatcher. Reviewed By: fanzeyi Differential Revision: D23745753 fbshipit-source-id: 266e1f448f9db536d746da1462a2a590ffad19a6
This commit is contained in:
parent
c5328d9d0e
commit
495cc257b4
@ -31,11 +31,6 @@ namespace facebook {
|
||||
namespace eden {
|
||||
|
||||
namespace {
|
||||
struct PrjAlignedBufferDeleter {
|
||||
void operator()(void* buffer) noexcept {
|
||||
::PrjFreeAlignedBuffer(buffer);
|
||||
}
|
||||
};
|
||||
|
||||
const RelativePath kDotEdenConfigPath{".eden/config"};
|
||||
const std::string kConfigRootPath{"root"};
|
||||
@ -62,9 +57,6 @@ std::string makeDotEdenConfig(EdenMount& mount) {
|
||||
|
||||
} // namespace
|
||||
|
||||
constexpr uint32_t kMinChunkSize = 512 * 1024; // 512 KiB
|
||||
constexpr uint32_t kMaxChunkSize = 5 * 1024 * 1024; // 5 MiB
|
||||
|
||||
EdenDispatcher::EdenDispatcher(EdenMount* mount)
|
||||
: mount_{mount}, dotEdenConfig_{makeDotEdenConfig(*mount)} {}
|
||||
|
||||
@ -225,171 +217,33 @@ folly::Future<bool> EdenDispatcher::access(
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
HRESULT readMultipleFileChunks(
|
||||
PRJ_NAMESPACE_VIRTUALIZATION_CONTEXT namespaceVirtualizationContext,
|
||||
const GUID& dataStreamId,
|
||||
const std::string& content,
|
||||
uint64_t startOffset,
|
||||
uint64_t length,
|
||||
uint64_t chunkSize) {
|
||||
HRESULT result;
|
||||
std::unique_ptr<void, PrjAlignedBufferDeleter> writeBuffer{
|
||||
PrjAllocateAlignedBuffer(namespaceVirtualizationContext, chunkSize)};
|
||||
|
||||
if (writeBuffer.get() == nullptr) {
|
||||
return E_OUTOFMEMORY;
|
||||
}
|
||||
|
||||
uint64_t remainingLength = length;
|
||||
|
||||
while (remainingLength > 0) {
|
||||
uint64_t copySize = std::min(remainingLength, chunkSize);
|
||||
|
||||
//
|
||||
// TODO(puneetk): Once backing store has the support for chunking the file
|
||||
// contents, we can read the chunks of large files here and then write
|
||||
// them to FS.
|
||||
//
|
||||
// TODO(puneetk): Build an interface to backing store so that we can pass
|
||||
// the aligned buffer to avoid coping here.
|
||||
//
|
||||
RtlCopyMemory(writeBuffer.get(), content.data() + startOffset, copySize);
|
||||
|
||||
// Write the data to the file in the local file system.
|
||||
result = PrjWriteFileData(
|
||||
namespaceVirtualizationContext,
|
||||
&dataStreamId,
|
||||
writeBuffer.get(),
|
||||
startOffset,
|
||||
folly::to_narrow(copySize));
|
||||
|
||||
if (FAILED(result)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
remainingLength -= copySize;
|
||||
startOffset += copySize;
|
||||
}
|
||||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT readSingleFileChunk(
|
||||
PRJ_NAMESPACE_VIRTUALIZATION_CONTEXT namespaceVirtualizationContext,
|
||||
const GUID& dataStreamId,
|
||||
const std::string& content,
|
||||
uint64_t startOffset,
|
||||
uint64_t length) {
|
||||
return readMultipleFileChunks(
|
||||
namespaceVirtualizationContext,
|
||||
dataStreamId,
|
||||
content,
|
||||
/*startOffset=*/startOffset,
|
||||
/*length=*/length,
|
||||
/*writeLength=*/length);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
static uint64_t BlockAlignTruncate(uint64_t ptr, uint32_t alignment) {
|
||||
return ((ptr) & (0 - (static_cast<uint64_t>(alignment))));
|
||||
}
|
||||
|
||||
HRESULT
|
||||
EdenDispatcher::getFileData(
|
||||
const PRJ_CALLBACK_DATA& callbackData,
|
||||
folly::Future<std::string> EdenDispatcher::read(
|
||||
RelativePath path,
|
||||
uint64_t byteOffset,
|
||||
uint32_t length) noexcept {
|
||||
try {
|
||||
auto relPath = RelativePath(callbackData.FilePathName);
|
||||
FB_LOGF(
|
||||
mount_->getStraceLogger(),
|
||||
DBG7,
|
||||
"read({}, off={}, len={})",
|
||||
relPath,
|
||||
byteOffset,
|
||||
length);
|
||||
uint32_t length,
|
||||
ObjectFetchContext& context) {
|
||||
FB_LOGF(
|
||||
mount_->getStraceLogger(),
|
||||
DBG7,
|
||||
"read({}, off={}, len={})",
|
||||
path,
|
||||
byteOffset,
|
||||
length);
|
||||
|
||||
static auto context = ObjectFetchContext::getNullContextWithCauseDetail(
|
||||
"win::EdenDispatcher::getFileData");
|
||||
auto content =
|
||||
mount_->getInode(relPath)
|
||||
.thenValue([](const InodePtr inode) {
|
||||
auto fileInode = inode.asFilePtr();
|
||||
return fileInode->readAll(*context);
|
||||
})
|
||||
.thenError(
|
||||
folly::tag_t<std::system_error>{},
|
||||
[relPath = std::move(relPath),
|
||||
this](const std::system_error& ex) {
|
||||
if (isEnoent(ex) && relPath == kDotEdenConfigPath) {
|
||||
return folly::makeFuture<std::string>(
|
||||
std::string(dotEdenConfig_));
|
||||
}
|
||||
return folly::makeFuture<std::string>(ex);
|
||||
})
|
||||
.get();
|
||||
|
||||
//
|
||||
// We should return file data which is smaller than
|
||||
// our kMaxChunkSize and meets the memory alignment requirements
|
||||
// of the virtualization instance's storage device.
|
||||
//
|
||||
|
||||
if (content.length() <= kMinChunkSize) {
|
||||
//
|
||||
// If the file is small - copy the whole file in one shot.
|
||||
//
|
||||
return readSingleFileChunk(
|
||||
callbackData.NamespaceVirtualizationContext,
|
||||
callbackData.DataStreamId,
|
||||
content,
|
||||
/*startOffset=*/0,
|
||||
/*writeLength=*/content.length());
|
||||
|
||||
} else if (length <= kMaxChunkSize) {
|
||||
//
|
||||
// If the request is with in our kMaxChunkSize - copy the entire request.
|
||||
//
|
||||
return readSingleFileChunk(
|
||||
callbackData.NamespaceVirtualizationContext,
|
||||
callbackData.DataStreamId,
|
||||
content,
|
||||
/*startOffset=*/byteOffset,
|
||||
/*writeLength=*/length);
|
||||
} else {
|
||||
//
|
||||
// When the request is larger than kMaxChunkSize we split the
|
||||
// request into multiple chunks.
|
||||
//
|
||||
PRJ_VIRTUALIZATION_INSTANCE_INFO instanceInfo;
|
||||
HRESULT result = PrjGetVirtualizationInstanceInfo(
|
||||
callbackData.NamespaceVirtualizationContext, &instanceInfo);
|
||||
|
||||
if (FAILED(result)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
uint64_t startOffset = byteOffset;
|
||||
uint64_t endOffset = BlockAlignTruncate(
|
||||
startOffset + kMaxChunkSize, instanceInfo.WriteAlignment);
|
||||
DCHECK(endOffset > 0);
|
||||
DCHECK(endOffset > startOffset);
|
||||
|
||||
uint64_t chunkSize = endOffset - startOffset;
|
||||
return readMultipleFileChunks(
|
||||
callbackData.NamespaceVirtualizationContext,
|
||||
callbackData.DataStreamId,
|
||||
content,
|
||||
/*startOffset=*/startOffset,
|
||||
/*length=*/length,
|
||||
/*chunkSize=*/chunkSize);
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
return exceptionToHResult(ex);
|
||||
}
|
||||
return mount_->getInode(path)
|
||||
.thenValue([&context](const InodePtr inode) {
|
||||
auto fileInode = inode.asFilePtr();
|
||||
return fileInode->readAll(context);
|
||||
})
|
||||
.thenError(
|
||||
folly::tag_t<std::system_error>{},
|
||||
[path = std::move(path), this](const std::system_error& ex) {
|
||||
if (isEnoent(ex) && path == kDotEdenConfigPath) {
|
||||
return folly::makeFuture<std::string>(
|
||||
std::string(dotEdenConfig_));
|
||||
}
|
||||
return folly::makeFuture<std::string>(ex);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
@ -58,11 +58,16 @@ class EdenDispatcher {
|
||||
|
||||
folly::Future<bool> access(RelativePath path, ObjectFetchContext& context);
|
||||
|
||||
HRESULT
|
||||
getFileData(
|
||||
const PRJ_CALLBACK_DATA& callbackData,
|
||||
uint64_t byteOffset,
|
||||
uint32_t length) noexcept;
|
||||
/** Returns the entire content of the file at path.
|
||||
*
|
||||
* In the future, this will return only what's in between offset and
|
||||
* offset+length.
|
||||
*/
|
||||
folly::Future<std::string> read(
|
||||
RelativePath path,
|
||||
uint64_t offset,
|
||||
uint32_t length,
|
||||
ObjectFetchContext& context);
|
||||
|
||||
folly::Future<folly::Unit> newFileCreated(
|
||||
RelativePathPiece relPath,
|
||||
|
@ -223,14 +223,158 @@ HRESULT queryFileName(const PRJ_CALLBACK_DATA* callbackData) noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
struct PrjAlignedBufferDeleter {
|
||||
void operator()(void* buffer) noexcept {
|
||||
::PrjFreeAlignedBuffer(buffer);
|
||||
}
|
||||
};
|
||||
|
||||
HRESULT readMultipleFileChunks(
|
||||
PRJ_NAMESPACE_VIRTUALIZATION_CONTEXT namespaceVirtualizationContext,
|
||||
const GUID& dataStreamId,
|
||||
const std::string& content,
|
||||
uint64_t startOffset,
|
||||
uint64_t length,
|
||||
uint64_t chunkSize) {
|
||||
HRESULT result;
|
||||
std::unique_ptr<void, PrjAlignedBufferDeleter> writeBuffer{
|
||||
PrjAllocateAlignedBuffer(namespaceVirtualizationContext, chunkSize)};
|
||||
|
||||
if (writeBuffer.get() == nullptr) {
|
||||
return E_OUTOFMEMORY;
|
||||
}
|
||||
|
||||
uint64_t remainingLength = length;
|
||||
|
||||
while (remainingLength > 0) {
|
||||
uint64_t copySize = std::min(remainingLength, chunkSize);
|
||||
|
||||
//
|
||||
// TODO(puneetk): Once backing store has the support for chunking the file
|
||||
// contents, we can read the chunks of large files here and then write
|
||||
// them to FS.
|
||||
//
|
||||
// TODO(puneetk): Build an interface to backing store so that we can pass
|
||||
// the aligned buffer to avoid coping here.
|
||||
//
|
||||
RtlCopyMemory(writeBuffer.get(), content.data() + startOffset, copySize);
|
||||
|
||||
// Write the data to the file in the local file system.
|
||||
result = PrjWriteFileData(
|
||||
namespaceVirtualizationContext,
|
||||
&dataStreamId,
|
||||
writeBuffer.get(),
|
||||
startOffset,
|
||||
folly::to_narrow(copySize));
|
||||
|
||||
if (FAILED(result)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
remainingLength -= copySize;
|
||||
startOffset += copySize;
|
||||
}
|
||||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT readSingleFileChunk(
|
||||
PRJ_NAMESPACE_VIRTUALIZATION_CONTEXT namespaceVirtualizationContext,
|
||||
const GUID& dataStreamId,
|
||||
const std::string& content,
|
||||
uint64_t startOffset,
|
||||
uint64_t length) {
|
||||
return readMultipleFileChunks(
|
||||
namespaceVirtualizationContext,
|
||||
dataStreamId,
|
||||
content,
|
||||
/*startOffset=*/startOffset,
|
||||
/*length=*/length,
|
||||
/*writeLength=*/length);
|
||||
}
|
||||
|
||||
uint64_t BlockAlignTruncate(uint64_t ptr, uint32_t alignment) {
|
||||
return ((ptr) & (0 - (static_cast<uint64_t>(alignment))));
|
||||
}
|
||||
|
||||
constexpr uint32_t kMinChunkSize = 512 * 1024; // 512 KiB
|
||||
constexpr uint32_t kMaxChunkSize = 5 * 1024 * 1024; // 5 MiB
|
||||
|
||||
HRESULT getFileData(
|
||||
const PRJ_CALLBACK_DATA* callbackData,
|
||||
UINT64 byteOffset,
|
||||
UINT32 length) noexcept {
|
||||
BAIL_ON_RECURSIVE_CALL(callbackData);
|
||||
return getChannel(callbackData)
|
||||
->getDispatcher()
|
||||
->getFileData(*callbackData, byteOffset, length);
|
||||
|
||||
try {
|
||||
auto channel = getChannel(callbackData);
|
||||
auto dispatcher = channel->getDispatcher();
|
||||
auto context =
|
||||
std::make_unique<PrjfsRequestContext>(channel, *callbackData);
|
||||
|
||||
auto path = RelativePath(callbackData->FilePathName);
|
||||
|
||||
auto content =
|
||||
dispatcher->read(std::move(path), byteOffset, length, *context).get();
|
||||
|
||||
//
|
||||
// We should return file data which is smaller than
|
||||
// our kMaxChunkSize and meets the memory alignment requirements
|
||||
// of the virtualization instance's storage device.
|
||||
//
|
||||
|
||||
if (content.length() <= kMinChunkSize) {
|
||||
//
|
||||
// If the file is small - copy the whole file in one shot.
|
||||
//
|
||||
return readSingleFileChunk(
|
||||
callbackData->NamespaceVirtualizationContext,
|
||||
callbackData->DataStreamId,
|
||||
content,
|
||||
/*startOffset=*/0,
|
||||
/*writeLength=*/content.length());
|
||||
|
||||
} else if (length <= kMaxChunkSize) {
|
||||
//
|
||||
// If the request is with in our kMaxChunkSize - copy the entire request.
|
||||
//
|
||||
return readSingleFileChunk(
|
||||
callbackData->NamespaceVirtualizationContext,
|
||||
callbackData->DataStreamId,
|
||||
content,
|
||||
/*startOffset=*/byteOffset,
|
||||
/*writeLength=*/length);
|
||||
} else {
|
||||
//
|
||||
// When the request is larger than kMaxChunkSize we split the
|
||||
// request into multiple chunks.
|
||||
//
|
||||
PRJ_VIRTUALIZATION_INSTANCE_INFO instanceInfo;
|
||||
HRESULT result = PrjGetVirtualizationInstanceInfo(
|
||||
callbackData->NamespaceVirtualizationContext, &instanceInfo);
|
||||
|
||||
if (FAILED(result)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
uint64_t startOffset = byteOffset;
|
||||
uint64_t endOffset = BlockAlignTruncate(
|
||||
startOffset + kMaxChunkSize, instanceInfo.WriteAlignment);
|
||||
DCHECK(endOffset > 0);
|
||||
DCHECK(endOffset > startOffset);
|
||||
|
||||
uint64_t chunkSize = endOffset - startOffset;
|
||||
return readMultipleFileChunks(
|
||||
callbackData->NamespaceVirtualizationContext,
|
||||
callbackData->DataStreamId,
|
||||
content,
|
||||
/*startOffset=*/startOffset,
|
||||
/*length=*/length,
|
||||
/*chunkSize=*/chunkSize);
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
return exceptionToHResult(ex);
|
||||
}
|
||||
}
|
||||
|
||||
void cancelCommand(const PRJ_CALLBACK_DATA* callbackData) noexcept {
|
||||
|
Loading…
Reference in New Issue
Block a user