sapling/eden/fs/service/EdenServer.cpp

371 lines
12 KiB
C++
Raw Normal View History

/*
* Copyright (c) 2016, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include "EdenServer.h"
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>
#include <folly/SocketAddress.h>
#include <folly/String.h>
#include <gflags/gflags.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include <wangle/concurrent/CPUThreadPoolExecutor.h>
#include <wangle/concurrent/GlobalExecutor.h>
#include "EdenServiceHandler.h"
#include "eden/fs/config/ClientConfig.h"
#include "eden/fs/inodes/Dirstate.h"
#include "eden/fs/inodes/EdenMount.h"
#include "eden/fs/store/LocalStore.h"
#include "eden/fs/store/NullBackingStore.h"
#include "eden/fs/store/git/GitBackingStore.h"
#include "eden/fs/store/hg/HgBackingStore.h"
#include "eden/fuse/MountPoint.h"
#include "eden/fuse/privhelper/PrivHelper.h"
DEFINE_bool(debug, false, "run fuse in debug mode");
DEFINE_int32(num_eden_threads, 12, "the number of eden CPU worker threads");
DEFINE_string(thrift_address, "", "The address for the thrift server socket");
DEFINE_int32(thrift_num_workers, 2, "The number of thrift worker threads");
DEFINE_int32(thrift_max_conns, 100, "Maximum number of thrift connections");
DEFINE_int32(
thrift_max_requests,
1000,
"Maximum number of active thrift requests");
DEFINE_bool(thrift_enable_codel, true, "Enable Codel queuing timeout");
DEFINE_int32(thrift_queue_len, 100, "Maximum number of unprocessed messages");
DEFINE_int32(
thrift_min_compress_bytes,
200,
"Minimum response compression size");
using apache::thrift::ThriftServer;
using folly::StringPiece;
using std::make_shared;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
folly::SocketAddress getThriftAddress(
StringPiece argument,
StringPiece edenDir);
std::string getPathToUnixDomainSocket(StringPiece edenDir);
}
namespace facebook {
namespace eden {
EdenServer::EdenServer(
StringPiece edenDir,
StringPiece systemConfigDir,
StringPiece configPath,
StringPiece rocksPath)
: edenDir_(AbsolutePath(edenDir)),
systemConfigDir_(systemConfigDir),
configPath_(configPath),
rocksPath_(rocksPath.str()) {}
EdenServer::~EdenServer() {
// Stop all of the mount points.
// They will each call mountFinished() as they exit.
{
std::lock_guard<std::mutex> guard(mountPointsMutex_);
for (const auto& mountPoint : mountPoints_) {
fusell::privilegedFuseUnmount(mountPoint.first);
}
}
{
// Wait for all the mounts to stop, and for mountPoints_ to become empty.
std::unique_lock<std::mutex> lock(mountPointsMutex_);
while (!mountPoints_.empty()) {
mountPointsCV_.wait(lock);
}
}
}
void EdenServer::run() {
acquireEdenLock();
createThriftServer();
localStore_ = make_shared<LocalStore>(rocksPath_);
auto pool =
make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_num_eden_threads);
wangle::setCPUExecutor(pool);
reloadConfig();
// Remount existing mount points
folly::dynamic dirs = folly::dynamic::object();
try {
dirs = ClientConfig::loadClientDirectoryMap(edenDir_);
} catch (const std::exception& ex) {
LOG(ERROR) << "Could not parse config.json file: " << ex.what()
<< " Skipping remount step.";
}
for (auto& client : dirs.items()) {
auto mountInfo = std::make_unique<MountInfo>();
mountInfo->mountPoint = client.first.c_str();
auto edenClientPath = edenDir_ + PathComponent("clients") +
PathComponent(client.second.c_str());
mountInfo->edenClientPath = edenClientPath.stringPiece().str();
try {
handler_->mount(std::move(mountInfo));
} catch (const std::exception& ex) {
LOG(ERROR) << "Failed to perform remount for " << client.first.c_str()
<< ": " << ex.what();
}
}
prepareThriftAddress();
runThriftServer();
}
void EdenServer::mount(shared_ptr<EdenMount> edenMount) {
// Add the mount point to mountPoints_.
// This also makes sure we don't have this path mounted already
auto mountPath = edenMount->getPath().stringPiece();
{
std::lock_guard<std::mutex> guard(mountPointsMutex_);
auto ret = mountPoints_.emplace(mountPath, edenMount);
if (!ret.second) {
// This mount point already exists.
throw EdenError(folly::to<string>(
"mount point \"", mountPath, "\" is already mounted"));
}
}
auto onFinish = [this, edenMount]() { this->mountFinished(edenMount.get()); };
try {
edenMount->getMountPoint()->start(FLAGS_debug, onFinish);
} catch (...) {
// If we fail to start the mount point, call mountFinished()
// to make sure it gets removed from mountPoints_.
//
// Note that we can't perform this clean-up using SCOPE_FAIL() for now, due
// to a bug in some versions of gcc:
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=62258
this->mountFinished(edenMount.get());
throw;
}
// Perform all of the bind mounts associated with the client.
for (auto& bindMount : edenMount->getBindMounts()) {
auto pathInMountDir = bindMount.pathInMountDir;
try {
// If pathInMountDir does not exist, then it must be created before the
// bind mount is performed.
boost::system::error_code errorCode;
boost::filesystem::path mountDir = pathInMountDir.c_str();
boost::filesystem::create_directories(mountDir, errorCode);
fusell::privilegedBindMount(
bindMount.pathInClientDir.c_str(), pathInMountDir.c_str());
} catch (...) {
// Consider recording all failed bind mounts in a way that can be
// communicated back to the caller in a structured way.
LOG(ERROR) << "Failed to perform bind mount for "
<< pathInMountDir.stringPiece() << ".";
}
}
}
void EdenServer::unmount(StringPiece mountPath) {
try {
fusell::privilegedFuseUnmount(mountPath);
} catch (const std::exception& ex) {
LOG(ERROR) << "Failed to perform unmount for \"" << mountPath
<< "\": " << folly::exceptionStr(ex);
throw ex;
}
}
void EdenServer::mountFinished(EdenMount* edenMount) {
auto mountPath = edenMount->getPath().stringPiece();
LOG(INFO) << "mount point \"" << mountPath << "\" stopped";
{
std::lock_guard<std::mutex> guard(mountPointsMutex_);
auto numErased = mountPoints_.erase(mountPath);
CHECK_EQ(numErased, 1);
eden: fix data race on shutdown Summary: We can't allow ~EdenServer to delete the memory until we're sure that the other threads are done. To ensure that, we need to notify the condition variable while the aux thread still holds the lock. This makes sure that the thread destroying the EdenServer waits for the aux thread to release the lock before we check the predicate and proceed to deleting the memory. ``` SUMMARY ThreadSanitizer: data race / /common/concurrency/Event.cpp:107 in facebook::common::concurrency::Event::set() const ================== I0909 14:51:18.543072 4147554 main.cpp:173] edenfs performing orderly shutdown I0909 14:51:18.555794 4148654 Channel.cpp:177] session completed I0909 14:51:18.556011 4148654 EdenServer.cpp:192] mount point "/tmp/eden_test.0ostuc90/mounts/main" stopped ================== WARNING: ThreadSanitizer: data race (pid=4147554) Write of size 8 at 0x7fff9e182d90 by main thread: #0 pthread_cond_destroy <null> (edenfs+0x00000007671a) #1 facebook::eden::EdenServer::~EdenServer() / /eden/fs/service/EdenServer.cpp:93 (libeden_fs_service_server.so+0x0000000b96cd) #2 main / /eden/fs/service/main.cpp:176 (edenfs+0x000000018515) Previous read of size 8 at 0x7fff9e182d90 by thread T73: #0 pthread_cond_broadcast <null> (edenfs+0x0000000765b7) #1 __gthread_cond_broadcast /home/engshare/third-party2/libgcc/4.9.x/src/gcc-4_9/x86_64-facebook-linux/libstdc++-v3/include/x86_64-facebook-linux/bits/gthr-default.h:852 (libstdc++.so.6+0x0000000e14f8) #2 std::condition_variable::notify_all() /home/engshare/third-party2/libgcc/4.9.x/src/gcc-4_9/x86_64-facebook-linux/libstdc++-v3/src/c++11/../../../.././libstdc++-v3/src/c++11/condition_variable.cc:72 (libstdc ++.so.6+0x0000000e14f8) #3 facebook::eden::EdenServer::mount(std::shared_ptr<facebook::eden::EdenMount>, std::unique_ptr<facebook::eden::ClientConfig, std::default_delete<facebook::eden::ClientConfig> >)::$_0::operator()() const / / /eden/fs/service/EdenServer.cpp:145 (libeden_fs_service_server.so+0x0000000bcdb5) #4 std::_Function_handler<void (), facebook::eden::EdenServer::mount(std::shared_ptr<facebook::eden::EdenMount>, std::unique_ptr<facebook::eden::ClientConfig, std::default_delete<facebook::eden::ClientConfig> >)::$_0>::_M_invoke(std::_Any_data const&) / /third-party-buck/gcc-4.9-glibc-2.20-fb/build/libgcc/include/c++/trunk/functional:2039 (libeden_fs_service_server.so+0x0000000bcab0) #5 std::function<void ()>::operator()() const / /third-party-buck/gcc-4.9-glibc-2.20-fb/build/libgcc/include/c++/trunk/functional:2439 (libeden_fuse_fusell.so+0x00000020fbb9) #6 facebook::eden::fusell::MountPoint::start(bool, std::function<void ()> const&)::$_0::operator()() const / /eden/fuse/MountPoint.cpp:69 (libeden_fuse_fusell.so+0x000000237447 ) #7 void std::_Bind_simple<facebook::eden::fusell::MountPoint::start(bool, std::function<void ()> const&)::$_0 ()>::_M_invoke<>(std::_Index_tuple<>) / /third-party-buck/gcc-4.9- glibc-2.20-fb/build/libgcc/include/c++/trunk/functional:1699 (libeden_fuse_fusell.so+0x000000237048) #8 std::_Bind_simple<facebook::eden::fusell::MountPoint::start(bool, std::function<void ()> const&)::$_0 ()>::operator()() / /third-party-buck/gcc-4.9-glibc-2.20-fb/build/libgc c/include/c++/trunk/functional:1688 (libeden_fuse_fusell.so+0x000000236ff8) #9 std::thread::_Impl<std::_Bind_simple<facebook::eden::fusell::MountPoint::start(bool, std::function<void ()> const&)::$_0 ()> >::_M_run() / /third-party-buck/gcc-4.9-glibc-2. 20-fb/build/libgcc/include/c++/trunk/thread:115 (libeden_fuse_fusell.so+0x000000236d8c) #10 execute_native_thread_routine /home/engshare/third-party2/libgcc/4.9.x/src/gcc-4_9/x86_64-facebook-linux/libstdc++-v3/src/c++11/../../../.././libstdc++-v3/src/c++11/thread.cc:84 (libstdc++.so.6+0x0000000e6 ec0) ``` Reviewed By: simpkins Differential Revision: D3844846 fbshipit-source-id: 545474bc1aff8621dbeb487dcd6b54c82828ff3b
2016-09-10 02:55:59 +03:00
// This notify and the erase above MUST happen while holding
// mountPointsMutex_ otherwise we can have a race in the case that there
// are two threads in mountFinished(). If the erasure and the notify are
// not done under the lock, the predicate check in ~EdenServer will see
// that mountPoints_ is empty and proceed to delete mountPointsCV_ out
// from under the other thread(s) as they attempt to call notify_all() on
// it.
mountPointsCV_.notify_all();
}
}
EdenServer::MountList EdenServer::getMountPoints() const {
MountList results;
{
std::lock_guard<std::mutex> guard(mountPointsMutex_);
for (const auto& entry : mountPoints_) {
results.emplace_back(entry.second);
}
}
return results;
}
shared_ptr<EdenMount> EdenServer::getMount(StringPiece mountPath) const {
std::lock_guard<std::mutex> guard(mountPointsMutex_);
auto it = mountPoints_.find(mountPath);
if (it == mountPoints_.end()) {
return nullptr;
}
return it->second;
}
void EdenServer::reloadConfig() {
*configData_.wlock() = make_shared<ConfigData>(ClientConfig::loadConfigData(
systemConfigDir_.piece(), configPath_.piece()));
}
shared_ptr<EdenServer::ConfigData> EdenServer::getConfig() {
return *configData_.rlock();
}
shared_ptr<BackingStore> EdenServer::getBackingStore(
StringPiece type,
StringPiece name) {
BackingStoreKey key{type.str(), name.str()};
SYNCHRONIZED(lockedStores, backingStores_) {
auto it = lockedStores.find(key);
if (it != lockedStores.end()) {
return it->second;
}
auto store = createBackingStore(type, name);
lockedStores.emplace(key, store);
return store;
}
// Ugh. The SYNCHRONIZED() macro is super lame.
// We have to return something here, since the compiler can't figure out
// that we always return inside SYNCHRONIZED.
LOG(FATAL) << "unreached";
abort();
}
shared_ptr<BackingStore> EdenServer::createBackingStore(
StringPiece type,
StringPiece name) {
if (type == "null") {
return make_shared<NullBackingStore>();
} else if (type == "hg") {
return make_shared<HgBackingStore>(name, localStore_.get());
} else if (type == "git") {
return make_shared<GitBackingStore>(name, localStore_.get());
} else {
throw std::domain_error(
folly::to<string>("unsupported backing store type: ", type));
}
}
void EdenServer::createThriftServer() {
auto address = getThriftAddress(FLAGS_thrift_address, edenDir_.stringPiece());
server_ = make_shared<ThriftServer>();
server_->setMaxConnections(FLAGS_thrift_max_conns);
server_->setMaxRequests(FLAGS_thrift_max_requests);
server_->setNumIOWorkerThreads(FLAGS_thrift_num_workers);
server_->setEnableCodel(FLAGS_thrift_enable_codel);
server_->setMaxNumPendingConnectionsPerWorker(FLAGS_thrift_queue_len);
server_->setMinCompressBytes(FLAGS_thrift_min_compress_bytes);
handler_ = make_shared<EdenServiceHandler>(this);
server_->setInterface(handler_);
server_->setAddress(address);
}
void EdenServer::acquireEdenLock() {
boost::filesystem::path edenPath{edenDir_.stringPiece().str()};
boost::filesystem::path lockPath = edenPath / "lock";
lockFile_ = folly::File(lockPath.string(), O_WRONLY | O_CREAT);
if (!lockFile_.try_lock()) {
throw std::runtime_error(
"another instance of Eden appears to be running for " +
edenDir_.stringPiece().str());
}
}
void EdenServer::prepareThriftAddress() {
// If we are serving on a local Unix socket, remove any old socket file
// that may be left over from a previous instance.
// We have already acquired the mount point lock at this time, so we know
// that any existing socket is unused and safe to remove.
const auto& addr = server_->getAddress();
if (addr.getFamily() != AF_UNIX) {
return;
}
int rc = unlink(addr.getPath().c_str());
if (rc != 0 && errno != ENOENT) {
// This might happen if we don't have permission to remove the file.
folly::throwSystemError(
"unable to remove old Eden thrift socket ", addr.getPath());
}
}
void EdenServer::stop() const {
server_->stop();
}
}
} // facebook::eden
namespace {
/*
* Parse the --thrift_address argument, and return a SocketAddress object
*/
folly::SocketAddress getThriftAddress(
StringPiece argument,
StringPiece edenDir) {
folly::SocketAddress addr;
// If the argument is empty, default to a Unix socket placed next
// to the mount point
if (argument.empty()) {
auto socketPath = getPathToUnixDomainSocket(edenDir);
addr.setFromPath(socketPath);
return addr;
}
// Check to see if the argument looks like a port number
uint16_t port;
bool validPort{false};
try {
port = folly::to<uint16_t>(argument);
validPort = true;
} catch (const std::range_error& ex) {
// validPort = false
}
if (validPort) {
addr.setFromLocalPort(port);
return addr;
}
// TODO: also support IPv4:PORT or [IPv6]:PORT
// Otherwise assume the address refers to a local unix socket path
addr.setFromPath(argument);
return addr;
}
std::string getPathToUnixDomainSocket(StringPiece edenDir) {
boost::filesystem::path edenPath{edenDir.str()};
boost::filesystem::path socketPath = edenPath / "socket";
return socketPath.string();
}
} // unnamed namespace