mirror of
https://github.com/facebook/sapling.git
synced 2024-10-11 01:07:15 +03:00
fbdb46f5cb
Reviewed By: chadaustin Differential Revision: D17872966 fbshipit-source-id: cd60a364a2146f0dadbeca693b1d4a5d7c97ff63
241 lines
7.4 KiB
C++
241 lines
7.4 KiB
C++
/*
|
|
* Copyright (c) Facebook, Inc. and its affiliates.
|
|
*
|
|
* This software may be used and distributed according to the terms of the
|
|
* GNU General Public License version 2.
|
|
*/
|
|
|
|
#include "ProcessAccessLog.h"
|
|
|
|
#include <folly/Exception.h>
|
|
#include <folly/MapUtil.h>
|
|
#include <folly/MicroLock.h>
|
|
#include <folly/ThreadLocal.h>
|
|
|
|
#include "eden/fs/utils/ProcessNameCache.h"
|
|
|
|
namespace facebook {
|
|
namespace eden {
|
|
|
|
struct ThreadLocalBucket {
|
|
explicit ThreadLocalBucket(ProcessAccessLog* processAccessLog)
|
|
: state_{folly::in_place, processAccessLog} {}
|
|
|
|
~ThreadLocalBucket() {
|
|
// This thread is going away, so merge our data into the parent.
|
|
mergeUpstream();
|
|
}
|
|
|
|
/**
|
|
* Returns whether the pid was newly-recorded in this thread-second or not.
|
|
*/
|
|
bool add(
|
|
uint64_t secondsSinceStart,
|
|
pid_t pid,
|
|
ProcessAccessLog::AccessType type) {
|
|
auto state = state_.lock();
|
|
|
|
// isNewPid must be initialized because BucketedLog::add will not call
|
|
// Bucket::add if secondsSinceStart is too old and the sample is dropped.
|
|
// (In that case, it's unnecessary to record the process name.)
|
|
bool isNewPid = false;
|
|
state->buckets.add(secondsSinceStart, pid, isNewPid, type);
|
|
return isNewPid;
|
|
}
|
|
|
|
bool add(
|
|
uint64_t secondsSinceStart,
|
|
pid_t pid,
|
|
std::chrono::nanoseconds duration) {
|
|
auto state = state_.lock();
|
|
|
|
bool isNewPid = false;
|
|
state->buckets.add(secondsSinceStart, pid, isNewPid, duration);
|
|
return isNewPid;
|
|
}
|
|
|
|
void mergeUpstream() {
|
|
auto state = state_.lock();
|
|
if (!state->owner) {
|
|
return;
|
|
}
|
|
state->owner->state_.withWLock(
|
|
[&](auto& ownerState) { ownerState.buckets.merge(state->buckets); });
|
|
state->buckets.clear();
|
|
}
|
|
|
|
void clearOwnerIfMe(ProcessAccessLog* owner) {
|
|
auto state = state_.lock();
|
|
if (state->owner == owner) {
|
|
state->owner = nullptr;
|
|
}
|
|
}
|
|
|
|
private:
|
|
/**
|
|
* Sadly, because getAllAccesses needs to access all of the buckets, it
|
|
* needs a mechanism to stop writers for the duration of the read.
|
|
*
|
|
* Reading the data (merging up-stream from all of the threads) is
|
|
* exceptionally rare, so this lock should largely stay uncontended. I
|
|
* considered using folly::SpinLock, but the documentation strongly suggests
|
|
* not. I am hoping that acquiring an uncontended MicroLock
|
|
* boils down to a single CAS, even though lock xchg can be painful by itself.
|
|
*
|
|
* This lock must always be acquired before the owner's buckets lock.
|
|
*/
|
|
struct State {
|
|
explicit State(ProcessAccessLog* pal) : owner{pal} {}
|
|
ProcessAccessLog::Buckets buckets;
|
|
ProcessAccessLog* owner;
|
|
};
|
|
|
|
struct InitedMicroLock : folly::MicroLock {
|
|
InitedMicroLock() {
|
|
init();
|
|
}
|
|
};
|
|
folly::Synchronized<State, InitedMicroLock> state_;
|
|
};
|
|
|
|
namespace {
|
|
struct BucketTag;
|
|
folly::ThreadLocalPtr<ThreadLocalBucket, BucketTag> threadLocalBucketPtr;
|
|
} // namespace
|
|
|
|
void ProcessAccessLog::Bucket::clear() {
|
|
accessCountsByPid.clear();
|
|
}
|
|
|
|
void ProcessAccessLog::Bucket::add(
|
|
pid_t pid,
|
|
bool& isNewPid,
|
|
ProcessAccessLog::AccessType type) {
|
|
auto [it, contains] = accessCountsByPid.emplace(pid, PerBucketAccessCounts{});
|
|
it->second[type]++;
|
|
isNewPid = contains;
|
|
}
|
|
|
|
void ProcessAccessLog::Bucket::add(
|
|
pid_t pid,
|
|
bool& isNewPid,
|
|
std::chrono::nanoseconds duration) {
|
|
auto [it, contains] = accessCountsByPid.emplace(pid, PerBucketAccessCounts{});
|
|
it->second.duration += duration;
|
|
isNewPid = contains;
|
|
}
|
|
|
|
void ProcessAccessLog::Bucket::merge(const Bucket& other) {
|
|
for (auto [pid, otherAccessCounts] : other.accessCountsByPid) {
|
|
for (int type = 0; type != static_cast<int>(AccessType::Last); type++) {
|
|
accessCountsByPid[pid].counts[type] += otherAccessCounts.counts[type];
|
|
}
|
|
accessCountsByPid[pid].duration += otherAccessCounts.duration;
|
|
}
|
|
}
|
|
|
|
ProcessAccessLog::ProcessAccessLog(
|
|
std::shared_ptr<ProcessNameCache> processNameCache)
|
|
: processNameCache_{std::move(processNameCache)} {
|
|
CHECK(processNameCache_) << "Process name cache is mandatory";
|
|
}
|
|
|
|
ProcessAccessLog::~ProcessAccessLog() {
|
|
for (auto& tlb : threadLocalBucketPtr.accessAllThreads()) {
|
|
tlb.clearOwnerIfMe(this);
|
|
}
|
|
}
|
|
|
|
ThreadLocalBucket* ProcessAccessLog::getTlb() {
|
|
auto tlb = threadLocalBucketPtr.get();
|
|
if (!tlb) {
|
|
threadLocalBucketPtr.reset(std::make_unique<ThreadLocalBucket>(this));
|
|
tlb = threadLocalBucketPtr.get();
|
|
}
|
|
return tlb;
|
|
}
|
|
|
|
uint64_t ProcessAccessLog::getSecondsSinceEpoch() {
|
|
return std::chrono::duration_cast<std::chrono::seconds>(
|
|
std::chrono::steady_clock::now().time_since_epoch())
|
|
.count();
|
|
}
|
|
|
|
void ProcessAccessLog::recordAccess(
|
|
pid_t pid,
|
|
ProcessAccessLog::AccessType type) {
|
|
// This function is called very frequently from different threads. It's a
|
|
// write-often, read-rarely use case, so, to avoid synchronization overhead,
|
|
// record to thread-local storage and only merge into the access log when the
|
|
// calling thread dies or when the data must be read.
|
|
bool isNewPid = getTlb()->add(getSecondsSinceEpoch(), pid, type);
|
|
|
|
// Many processes are short-lived, so grab the executable name during the
|
|
// access. We could potentially get away with grabbing executable names a
|
|
// bit later on another thread, but we'll only readlink() once per pid.
|
|
|
|
// Sometimes we receive requests from pid 0. Record the access,
|
|
// but don't try to look up a name.
|
|
if (pid != 0) {
|
|
// Since recordAccess is called a lot by latency- and throughput-sensitive
|
|
// code, only try to lookup and cache the process name if we haven't seen
|
|
// it this thread-second.
|
|
if (isNewPid) {
|
|
// It's a bit unfortunate that ProcessNameCache maintains its own
|
|
// SharedMutex, but it will be shared with thrift counters.
|
|
processNameCache_->add(pid);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ProcessAccessLog::recordDuration(
|
|
pid_t pid,
|
|
std::chrono::nanoseconds duration) {
|
|
bool isNewPid = getTlb()->add(getSecondsSinceEpoch(), pid, duration);
|
|
if (pid != 0 && isNewPid) {
|
|
processNameCache_->add(pid);
|
|
}
|
|
}
|
|
|
|
std::unordered_map<pid_t, AccessCounts> ProcessAccessLog::getAccessCounts(
|
|
std::chrono::seconds lastNSeconds) {
|
|
auto secondCount = lastNSeconds.count();
|
|
// First, merge all the thread-local buckets into their owners, including us.
|
|
for (auto& tlb : threadLocalBucketPtr.accessAllThreads()) {
|
|
// This must be done outside of acquiring our own state_ lock.
|
|
tlb.mergeUpstream();
|
|
}
|
|
|
|
auto state = state_.wlock();
|
|
auto allBuckets = state->buckets.getAll(getSecondsSinceEpoch());
|
|
|
|
if (secondCount < 0) {
|
|
return {};
|
|
}
|
|
|
|
Bucket bucket;
|
|
uint64_t count = std::min(
|
|
static_cast<uint64_t>(allBuckets.size()),
|
|
static_cast<uint64_t>(secondCount));
|
|
for (auto iter = allBuckets.end() - count; iter != allBuckets.end(); ++iter) {
|
|
bucket.merge(*iter);
|
|
}
|
|
|
|
// Transfer to a Thrift map
|
|
std::unordered_map<pid_t, AccessCounts> accessCountsByPid;
|
|
for (auto& [pid, accessCounts] : bucket.accessCountsByPid) {
|
|
accessCountsByPid[pid].fuseReads = accessCounts[AccessType::FuseRead];
|
|
accessCountsByPid[pid].fuseWrites = accessCounts[AccessType::FuseWrite];
|
|
accessCountsByPid[pid].fuseTotal = accessCounts[AccessType::FuseRead] +
|
|
accessCounts[AccessType::FuseWrite] +
|
|
accessCounts[AccessType::FuseOther];
|
|
accessCountsByPid[pid].fuseBackingStoreImports =
|
|
accessCounts[AccessType::FuseBackingStoreImport];
|
|
accessCountsByPid[pid].fuseDurationNs = accessCounts.duration.count();
|
|
}
|
|
return accessCountsByPid;
|
|
}
|
|
|
|
} // namespace eden
|
|
} // namespace facebook
|