From b55e4dbb58bc57784bfb214625fecfbe609716d3 Mon Sep 17 00:00:00 2001 From: Chad Austin Date: Wed, 5 Sep 2018 15:05:54 -0700 Subject: [PATCH] introduce a ProcessAccessLog Summary: Add a ProcessAccessLog that supports cheaply tracking pids passed into FUSE. It's a write-many, read-maybe access pattern, so the code is careful to add as little latency as possible on the hot path. Reviewed By: strager Differential Revision: D9477839 fbshipit-source-id: 6928c1d09d55c2b0c0958ac2cb0dc91ec21b370c --- eden/fs/utils/ProcessAccessLog.cpp | 189 ++++++++++++++++++ eden/fs/utils/ProcessAccessLog.h | 88 ++++++++ eden/fs/utils/test/BenchmarkMain.cpp | 16 ++ .../utils/test/ProcessAccessLogBenchmark.cpp | 64 ++++++ .../utils/test/ProcessNameCacheBenchmark.cpp | 11 +- 5 files changed, 360 insertions(+), 8 deletions(-) create mode 100644 eden/fs/utils/ProcessAccessLog.cpp create mode 100644 eden/fs/utils/ProcessAccessLog.h create mode 100644 eden/fs/utils/test/BenchmarkMain.cpp create mode 100644 eden/fs/utils/test/ProcessAccessLogBenchmark.cpp diff --git a/eden/fs/utils/ProcessAccessLog.cpp b/eden/fs/utils/ProcessAccessLog.cpp new file mode 100644 index 0000000000..5c3d1674ea --- /dev/null +++ b/eden/fs/utils/ProcessAccessLog.cpp @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ +#include "ProcessAccessLog.h" + +#include +#include +#include +#include +#include "eden/fs/utils/ProcessNameCache.h" + +namespace facebook { +namespace eden { + +struct ThreadLocalBucket { + explicit ThreadLocalBucket(ProcessAccessLog* processAccessLog) + : state_{folly::in_place, processAccessLog} {} + + ~ThreadLocalBucket() { + // This thread is going away, so merge our data into the parent. + mergeUpstream(); + } + + /** + * Returns whether the pid was newly-recorded in this thread-second or not. + */ + bool add(uint64_t secondsSinceStart, pid_t pid) { + auto state = state_.lock(); + + // isNewPid must be initialized because BucketedLog::add will not call + // Bucket::add if secondsSinceStart is too old and the sample is dropped. + // (In that case, it's unnecessary to record the process name.) + bool isNewPid = false; + state->buckets.add(secondsSinceStart, pid, isNewPid); + return isNewPid; + } + + void mergeUpstream() { + auto state = state_.lock(); + if (!state->owner) { + return; + } + state->owner->state_.withWLock( + [&](auto& ownerState) { ownerState.buckets.merge(state->buckets); }); + state->buckets.clear(); + } + + void clearOwnerIfMe(ProcessAccessLog* owner) { + auto state = state_.lock(); + if (state->owner == owner) { + state->owner = nullptr; + } + } + + private: + /** + * Sadly, because getAllAccesses needs to access all of the buckets, it + * needs a mechanism to stop writers for the duration of the read. + * + * Reading the data (merging up-stream from all of the threads) is + * exceptionally rare, so this lock should largely stay uncontended. I + * considered using folly::SpinLock, but the documentation strongly suggests + * not. I am hoping that acquiring an uncontended MicroLock + * boils down to a single CAS, even though lock xchg can be painful by itself. + * + * This lock must always be acquired before the owner's buckets lock. + */ + struct State { + explicit State(ProcessAccessLog* pal) : owner{pal} {} + ProcessAccessLog::Buckets buckets; + ProcessAccessLog* owner; + }; + + struct InitedMicroLock : folly::MicroLock { + InitedMicroLock() { + init(); + } + }; + folly::Synchronized state_; +}; + +namespace { +struct BucketTag; +folly::ThreadLocalPtr threadLocalBucketPtr; +} // namespace + +void ProcessAccessLog::Bucket::clear() { + accessCounts.clear(); +} + +void ProcessAccessLog::Bucket::add(pid_t pid, bool& isNew) { + auto [iter, inserted] = accessCounts.emplace(pid, 1); + if (!inserted) { + ++iter->second; + } + isNew = inserted; +} + +void ProcessAccessLog::Bucket::merge(const Bucket& other) { + for (auto& [pid, otherCount] : other.accessCounts) { + accessCounts[pid] += otherCount; + } +} + +ProcessAccessLog::ProcessAccessLog( + std::shared_ptr processNameCache) + : processNameCache_{std::move(processNameCache)} {} + +ProcessAccessLog::~ProcessAccessLog() { + for (auto& tlb : threadLocalBucketPtr.accessAllThreads()) { + tlb.clearOwnerIfMe(this); + } +} + +void ProcessAccessLog::recordAccess(pid_t pid) { + // This function is called very frequently from different threads. It's a + // write-often, read-rarely use case, so, to avoid synchronization overhead, + // record to thread-local storage and only merge into the access log when the + // calling thread dies or when the data must be read. + + auto tlb = threadLocalBucketPtr.get(); + if (!tlb) { + threadLocalBucketPtr.reset(std::make_unique(this)); + tlb = threadLocalBucketPtr.get(); + } + + uint64_t secondsSinceEpoch = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + bool isNewPid = tlb->add(secondsSinceEpoch, pid); + + // Many processes are short-lived, so grab the executable name during the + // access. We could potentially get away with grabbing executable names a bit + // later on another thread, but we'll only readlink() once per pid. + + // Sometimes we receive requests from pid 0. Record the access, + // but don't try to look up a name. + if (pid) { + // Since recordAccess is called a lot by latency- and throughput-sensitive + // code, only try to lookup and cache the process name if we haven't seen it + // this thread-second. + if (isNewPid) { + // It's a bit unfortunate that ProcessNameCache maintains its own + // SharedMutex, but it will be shared with thrift counters. + processNameCache_->add(pid); + } + } +} + +std::unordered_map ProcessAccessLog::getAllAccesses( + std::chrono::seconds lastNSeconds) { + auto secondCount = lastNSeconds.count(); + // First, merge all the thread-local buckets into their owners, including us. + for (auto& tlb : threadLocalBucketPtr.accessAllThreads()) { + // This must be done outside of acquiring our own state_ lock. + tlb.mergeUpstream(); + } + + uint64_t secondsSinceEpoch = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + auto state = state_.wlock(); + auto allBuckets = state->buckets.getAll(secondsSinceEpoch); + + if (secondCount < 0) { + return {}; + } + + Bucket bucket; + uint64_t count = + std::min(allBuckets.size(), static_cast(secondCount)); + for (auto iter = allBuckets.end() - count; iter != allBuckets.end(); ++iter) { + bucket.merge(*iter); + } + return bucket.accessCounts; +} + +} // namespace eden +} // namespace facebook diff --git a/eden/fs/utils/ProcessAccessLog.h b/eden/fs/utils/ProcessAccessLog.h new file mode 100644 index 0000000000..c89e613fe1 --- /dev/null +++ b/eden/fs/utils/ProcessAccessLog.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ +#pragma once + +#include +#include +#include +#include "eden/fs/utils/BucketedLog.h" + +namespace facebook { +namespace eden { + +class ProcessNameCache; + +/** + * An inexpensive mechanism for counting accesses by pids. Intended for counting + * FUSE and Thrift calls from external processes. + * + * The first time a thread calls recordAccess, that thread is then coupled to + * this particular ProcessAccessLog, even if it calls recordAccess on another + * ProcessAccessLog instance. Thus, use one ProcessAccessLog per pool of + * threads. + */ +class ProcessAccessLog { + public: + explicit ProcessAccessLog(std::shared_ptr processNameCache); + ~ProcessAccessLog(); + + /** + * Records an access by a process ID. The first call to recordAccess by a + * particular thread binds that thread to this access log. Future recordAccess + * calls on that thread will accumulate within this access log. + * + * Process IDs passed to recordAccess are also inserted into the + * ProcessNameCache. + */ + void recordAccess(pid_t pid); + + /** + * Returns the number of times each pid was passed to recordAccess() in + * `lastNSeconds`. + * + * Note: ProcessAccessLog buckets by whole seconds, so this number should be + * considered an approximation. + */ + std::unordered_map getAllAccesses( + std::chrono::seconds lastNSeconds); + + private: + // Data for one second. + struct Bucket { + void clear(); + + /** + * Returns whether the added pid is newly observed or not in the `isNew` out + * parameter. + */ + void add(pid_t pid, bool& isNew); + + void merge(const Bucket& other); + + std::unordered_map accessCounts; + }; + + // Keep up to ten seconds of data, but use a power of two so BucketedLog + // generates smaller, faster code. + static constexpr uint64_t kBucketCount = 16; + using Buckets = BucketedLog; + + struct State { + Buckets buckets; + }; + + const std::shared_ptr processNameCache_; + folly::Synchronized state_; + + friend struct ThreadLocalBucket; +}; + +} // namespace eden +} // namespace facebook diff --git a/eden/fs/utils/test/BenchmarkMain.cpp b/eden/fs/utils/test/BenchmarkMain.cpp new file mode 100644 index 0000000000..6c34ec2e24 --- /dev/null +++ b/eden/fs/utils/test/BenchmarkMain.cpp @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ +#include +#include + +int main(int argc, char** argv) { + folly::init(&argc, &argv); + folly::runBenchmarks(); +} diff --git a/eden/fs/utils/test/ProcessAccessLogBenchmark.cpp b/eden/fs/utils/test/ProcessAccessLogBenchmark.cpp new file mode 100644 index 0000000000..b5263b4ec3 --- /dev/null +++ b/eden/fs/utils/test/ProcessAccessLogBenchmark.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ +#include "eden/fs/utils/ProcessAccessLog.h" + +#include +#include +#include +#include "eden/fs/utils/ProcessNameCache.h" + +using namespace facebook::eden; + +/** + * A high but realistic amount of contention. + */ +constexpr size_t kThreadCount = 4; + +BENCHMARK(ProcessAccessLog_repeatedly_add_self, iters) { + folly::BenchmarkSuspender suspender; + + auto processNameCache = std::make_shared(); + ProcessAccessLog processAccessLog{processNameCache}; + + std::vector threads; + std::array, kThreadCount> batons; + + size_t remainingIterations = iters; + size_t totalIterations = 0; + for (size_t i = 0; i < kThreadCount; ++i) { + size_t remainingThreads = kThreadCount - i; + size_t assignedIterations = remainingIterations / remainingThreads; + remainingIterations -= assignedIterations; + totalIterations += assignedIterations; + threads.emplace_back([&processAccessLog, + baton = &batons[i], + assignedIterations, + myPid = getpid()] { + baton->wait(); + for (size_t j = 0; j < assignedIterations; ++j) { + processAccessLog.recordAccess(myPid); + } + }); + } + + CHECK_EQ(totalIterations, iters); + + suspender.dismiss(); + + // Now wake the threads. + for (auto& baton : batons) { + baton.post(); + } + + // Wait until they're done. + for (auto& thread : threads) { + thread.join(); + } +} diff --git a/eden/fs/utils/test/ProcessNameCacheBenchmark.cpp b/eden/fs/utils/test/ProcessNameCacheBenchmark.cpp index 54e52e22b0..44e2539e46 100644 --- a/eden/fs/utils/test/ProcessNameCacheBenchmark.cpp +++ b/eden/fs/utils/test/ProcessNameCacheBenchmark.cpp @@ -7,11 +7,11 @@ * of patent rights can be found in the PATENTS file in the same directory. * */ -#include -#include -#include #include "eden/fs/utils/ProcessNameCache.h" +#include +#include + using namespace facebook::eden; /** @@ -58,8 +58,3 @@ BENCHMARK(ProcessNameCache_repeatedly_add_self, iters) { thread.join(); } } - -int main(int argc, char** argv) { - folly::init(&argc, &argv); - folly::runBenchmarks(); -}