wrapping up

This commit is contained in:
Charlie Curtsinger 2013-11-06 16:45:13 -05:00
parent 27d5343dfa
commit 1d2395b37d
10 changed files with 279 additions and 240 deletions

View File

@ -1,205 +0,0 @@
#if !defined(CAUSAL_LIB_RUNTIME_DARWIN_HOST_H)
#define CAUSAL_LIB_RUNTIME_DARWIN_HOST_H
#include <dlfcn.h>
#include <mach/mach.h>
#include <mach/mach_time.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <unistd.h>
#include <set>
#include <vector>
#include "arch.h"
#include "causal.h"
#include "debug.h"
using namespace std;
typedef void* (*pthread_fn_t)(void*);
typedef void (*signal_handler_t)(int);
typedef void (*sigaction_handler_t)(int, siginfo_t*, void*);
enum {
PageSize = 0x1000
};
class DarwinThread {
private:
thread_act_t _thread;
pthread_t _pthread;
public:
DarwinThread(thread_act_t thread, pthread_t pthread) : _thread(thread), _pthread(pthread) {}
bool operator==(pthread_t t) const {
return _pthread == t;
}
bool operator!=(pthread_t t) const {
return _pthread != t;
}
bool operator==(const DarwinThread& t) const {
return _pthread == t._pthread;
}
bool operator!=(const DarwinThread& t) const {
return _pthread != t._pthread;
}
void signal(int signum) {
pthread_kill(_pthread, signum);
}
uintptr_t getPC() const {
if(_IS_X86_64) {
x86_thread_state64_t state;
mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT;
kern_return_t krc = thread_get_state(_thread, x86_THREAD_STATE64, (thread_state_t)&state, &count);
if(krc != KERN_SUCCESS) {
DEBUG("Failed to get thread PC");
return 0;
}
return state.__rip;
} else if(_IS_X86) {
x86_thread_state32_t state;
mach_msg_type_number_t count = x86_THREAD_STATE32_COUNT;
kern_return_t krc = thread_get_state(_thread, x86_THREAD_STATE32, (thread_state_t)&state, &count);
if(krc != KERN_SUCCESS) {
DEBUG("Failed to get thread PC");
return 0;
}
return state.__eip;
} else {
DEBUG("Unsupported architecture");
abort();
}
}
void show() const {
if(_pthread) {
char buf[256];
if(pthread_getname_np(_pthread, buf, 256) == 0)
sprintf(buf, "<%p>", _pthread);
printf("%s: at %p\n", buf, (void*)getPC());
} else {
printf("mach thread: at %p\n", (void*)getPC());
}
}
};
class DarwinHost {
private:
set<pthread_t> _ignored_threads;
public:
enum Time : uint64_t {
Nanosecond = 1,
Millisecond = 1000 * 1000,
Second = 1000 * Millisecond
};
typedef set<DarwinThread> ThreadsType;
typedef DarwinThread ThreadType;
static void* findSymbol(const char* sym) {
return dlsym(RTLD_DEFAULT, sym);
}
static void setSignalHandler(int signum, sigaction_handler_t handler) {
struct sigaction sa;
sa.sa_sigaction = handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_SIGINFO;
::sigaction(signum, &sa, NULL);
}
static void wait(uint64_t nanos) {
uint64_t end_time = mach_absolute_time() + nanos;
bool done = false;
do {
kern_return_t ret = mach_wait_until(end_time);
if(ret == KERN_SUCCESS)
done = true;
} while(!done);
}
static size_t getTime() {
return mach_absolute_time();
}
pthread_t createThread(pthread_fn_t fn, void* arg = NULL) {
pthread_t t;
if(::pthread_create(&t, NULL, fn, arg)) {
DEBUG("Failed to create thread");
abort();
}
_ignored_threads.insert(t);
return t;
}
vector<DarwinThread> getThreads() {
vector<DarwinThread> result;
thread_act_array_t threads;
mach_msg_type_number_t count;
kern_return_t krc = task_threads(mach_task_self(), &threads, &count);
if(krc != KERN_SUCCESS) {
DEBUG("Failed to get task threads");
abort();
}
for(size_t i=0; i<count; i++) {
pthread_t pthread = pthread_from_mach_thread_np(threads[i]);
if(pthread != NULL && _ignored_threads.find(pthread) == _ignored_threads.end()) {
result.push_back(DarwinThread(threads[i], pthread));
}
}
return result;
}
static int mprotectRange(uintptr_t base, uintptr_t limit, int prot) {
base -= base % PageSize;
limit += PageSize - 1;
limit -= limit % PageSize;
return mprotect((void*)base, limit - base, prot);
}
static int fork() {
static auto real_fork = (int (*)())dlsym(RTLD_NEXT, "fork");
return real_fork();
}
static signal_handler_t signal(int signum, signal_handler_t handler) {
static auto real_signal = (signal_handler_t (*)(int, signal_handler_t))dlsym(RTLD_NEXT, "signal");
return real_signal(signum, handler);
}
static int sigaction(int signum, const struct sigaction* act, struct sigaction* oldact) {
static auto real_sigaction = (int (*)(int, const struct sigaction*, struct sigaction*))dlsym(RTLD_NEXT, "sigaction");
return real_sigaction(signum, act, oldact);
}
static int sigprocmask(int how, const sigset_t* set, sigset_t* oldset) {
static auto real_sigprocmask = (int (*)(int, const sigset_t*, sigset_t*))dlsym(RTLD_NEXT, "sigprocmask");
return real_sigprocmask(how, set, oldset);
}
static int sigsuspend(const sigset_t* mask) {
static auto real_sigsuspend = (int (*)(const sigset_t*))dlsym(RTLD_NEXT, "sigsuspend");
return real_sigsuspend(mask);
}
static int pthread_sigmask(int how, const sigset_t* set, sigset_t* oldset) {
static auto real_pthread_sigmask = (int (*)(int, const sigset_t*, sigset_t*))dlsym(RTLD_NEXT, "pthread_sigmask");
return real_pthread_sigmask(how, set, oldset);
}
};
typedef DarwinHost Host;
#endif

View File

@ -1,6 +1,6 @@
LEVEL = ../../../..
LIBRARYNAME = causal_rt
SOURCES = libcausal_rt.cpp# $(shell uname -s)_host.cpp
SOURCES = libcausal_rt.cpp
CXXFLAGS += -std=c++11 -stdlib=libc++ -nostdinc++ -I/usr/src/libcxx/include -L/usr/src/libcxx/lib

View File

@ -1,6 +1,7 @@
#if !defined(CAUSAL_LIB_RUNTIME_CAUSAL_H)
#define CAUSAL_LIB_RUNTIME_CAUSAL_H
#include <cxxabi.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
@ -18,16 +19,22 @@ using namespace std;
enum class Mode {
Idle,
Slowdown,
Speedup
Speedup,
};
struct Causal : public SigThief<Host, SIGUSR1, SIGUSR2> {
enum {
DelaySignal = SIGUSR1,
PauseSignal = SIGUSR2
};
struct Causal : public SigThief<Host, DelaySignal, PauseSignal> {
private:
atomic<bool> _initialized = ATOMIC_VAR_INIT(false);
Mode _mode = Mode::Idle;
uintptr_t _perturb_point;
size_t _delay_size;
atomic<Mode> _mode = ATOMIC_VAR_INIT(Mode::Idle);
atomic<uintptr_t> _perturb_point;
atomic<size_t> _delay_size;
atomic<size_t> _progress_visits;
atomic<size_t> _perturb_visits;
@ -39,47 +46,86 @@ private:
vector<Probe*> _blocks;
pthread_t profiler_thread;
mutex _thread_blocker;
atomic<size_t> _thread_arrivals;
Causal() {}
void slowdownExperiment(Probe* p, size_t delay, size_t duration, size_t min_trips = 10, size_t max_retries = 10) {
void insertProbes(Probe* perturb) {
pauseThreads();
_perturb_point = perturb->getRet();
perturb->restore();
for(Probe* p : _progress_points) {
p->restore();
}
resumeThreads();
}
void slowdownExperiment(Probe* perturb, size_t delay, size_t max_duration, size_t min_trips = 50) {
// Set up the mode and delay so probes are not removed, but no delay is added
_delay_size.store(0);
_mode.store(Mode::Slowdown);
// Insert probes at the perturbed point, and all progress points
insertProbes(perturb);
// Measure the baseline progress rate
size_t duration = 0;
size_t wait_size = Host::Time::Millisecond;
// Clear counters, and start to measure the baseline
_progress_visits.store(0);
Host::wait(duration);
_perturb_visits.store(0);
// Repeat the measurement until we have enough samples, or the maximum time has elapsed
do {
Host::wait(wait_size);
duration += wait_size;
wait_size *= 2;
} while((_progress_visits.load() < min_trips || _perturb_visits.load() < min_trips)
&& duration + wait_size < max_duration / 2);
// Read the results. If we didn't get enough samples, abort
size_t control_count = _progress_visits.load();
if(control_count < min_trips || _perturb_visits.load() < min_trips) {
//DEBUG("aborting experiment at %p (progress: %zu, perturb: %zu)", (void*)perturb->getBase(),
// control_count, _perturb_visits.load());
return;
}
// Measure the perturbed progress rate
_delay_size = delay;
_perturb_point = p->getRet();
p->restore();
_delay_size.store(delay);
_progress_visits.store(0);
_perturb_visits.store(0);
Host::wait(duration);
_perturb_point = 0;
//p->remove();
size_t treatment_count = _progress_visits.load();
size_t num_perturbs = _perturb_visits.load();
if(num_perturbs < min_trips || control_count < min_trips || treatment_count < min_trips) {
if(num_perturbs > 0 && max_retries > 0) {
slowdownExperiment(p, delay, duration * 2, min_trips, max_retries - 1);
}
// Return to idle mode. Probes will be removed as they are encountered
_mode.store(Mode::Idle);
float control_progress_period = (float)duration / (float)control_count;
float treatment_progress_period = (float)duration / (float)treatment_count;
Dl_info info;
char* name;
if(dladdr((void*)perturb->getBase(), &info) == 0) {
name = "unknown";
} else {
float control_progress_period = (float)duration / (float)control_count;
float treatment_progress_period = (float)duration / (float)treatment_count;
printf("%p: %f %f\n", (void*)p->getBase(), (float)delay, treatment_progress_period - control_progress_period);
name = abi::__cxa_demangle(info.dli_sname, NULL, NULL, NULL);
}
DEBUG("%s+%zu: impact = %f (progress: %zu, perturb: %zu)", name, perturb->getBase() - (uintptr_t)info.dli_saddr,
(treatment_progress_period - control_progress_period) / delay,
treatment_count, num_perturbs);
}
void profilerThread() {
while(true) {
// Sleep for 10ms
Host::wait(500 * Host::Time::Millisecond);
//Host::wait(500 * Host::Time::Millisecond);
if(_blocks.size() > 0) {
Probe* p = _blocks[rand() % _blocks.size()];
slowdownExperiment(p, 1000 * Host::Time::Nanosecond, 100 * Host::Time::Millisecond);
slowdownExperiment(p, Host::Time::Millisecond, 500 * Host::Time::Millisecond);
}
}
}
@ -88,6 +134,45 @@ private:
getInstance().profilerThread();
return NULL;
}
void pauseThreads() {
size_t count = 0;
_thread_blocker.lock();
_thread_arrivals = ATOMIC_VAR_INIT(0);
for(pthread_t thread : Host::getThreads()) {
if(pthread_kill(thread, SIGUSR2) == 0) {
count++;
}
}
while(_thread_arrivals.load() < count) {
__asm__("pause");
}
}
void resumeThreads() {
_thread_blocker.unlock();
}
void onPause() {
_thread_arrivals++;
_thread_blocker.lock();
_thread_blocker.unlock();
}
void onDelay() {
DEBUG("TODO!");
}
static void startSignalHandler(int signum, siginfo_t* info, void* p) {
if(signum == PauseSignal) {
getInstance().onPause();
} else if(signum == DelaySignal) {
getInstance().onDelay();
} else {
DEBUG("Unexpected signal received!");
abort();
}
}
public:
static Causal& getInstance() {
@ -99,6 +184,8 @@ public:
void initialize() {
if(!_initialized.exchange(true)) {
DEBUG("Initializing");
Host::setSignalHandler(SIGUSR1, startSignalHandler);
Host::setSignalHandler(SIGUSR2, startSignalHandler);
profiler_thread = Host::createThread(startProfilerThread);
}
}

View File

@ -2,9 +2,9 @@
#define CAUSAL_LIB_RUNTIME_HOST_H
#if defined(__APPLE__)
# include "Darwin/host.h"
# include "host/Darwin.h"
#elif defined(__linux__)
# include "Linux/host.h"
# include "host/Linux.h"
#else
# error "Unsupported host platform"
#endif

94
lib/runtime/host/Darwin.h Normal file
View File

@ -0,0 +1,94 @@
#if !defined(CAUSAL_LIB_RUNTIME_HOST_DARWIN_H)
#define CAUSAL_LIB_RUNTIME_HOST_DARWIN_H
#include <dlfcn.h>
#include <mach/mach.h>
#include <mach/mach_time.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <unistd.h>
#include <set>
#include <vector>
#include "arch.h"
#include "causal.h"
#include "debug.h"
#include "host/common.h"
using namespace std;
class DarwinHost : public CommonHost {
private:
set<pthread_t> _ignored_threads;
public:
enum Time : uint64_t {
Nanosecond = 1,
Millisecond = 1000 * 1000,
Second = 1000 * Millisecond
};
static void* findSymbol(const char* sym) {
return dlsym(RTLD_DEFAULT, sym);
}
static void setSignalHandler(int signum, sigaction_handler_t handler) {
struct sigaction sa;
sa.sa_sigaction = handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_SIGINFO;
CommonHost::sigaction(signum, &sa, NULL);
}
static void wait(uint64_t nanos) {
uint64_t end_time = mach_absolute_time() + nanos;
bool done = false;
do {
kern_return_t ret = mach_wait_until(end_time);
if(ret == KERN_SUCCESS)
done = true;
} while(!done);
}
static size_t getTime() {
return mach_absolute_time();
}
pthread_t createThread(pthread_fn_t fn, void* arg = NULL) {
pthread_t t;
if(pthread_create(&t, NULL, fn, arg)) {
DEBUG("Failed to create thread");
abort();
}
_ignored_threads.insert(t);
return t;
}
vector<pthread_t> getThreads() {
// Get thread list from the kernel
thread_act_array_t threads;
mach_msg_type_number_t count;
kern_return_t krc = task_threads(mach_task_self(), &threads, &count);
if(krc != KERN_SUCCESS) {
DEBUG("Failed to get task threads");
abort();
}
// Build a vector to return
vector<pthread_t> result(count);
for(size_t i=0; i<count; i++) {
pthread_t pthread = pthread_from_mach_thread_np(threads[i]);
if(pthread != NULL && _ignored_threads.find(pthread) == _ignored_threads.end()) {
result.push_back(pthread);
}
}
return result;
}
};
typedef DarwinHost Host;
#endif

54
lib/runtime/host/common.h Normal file
View File

@ -0,0 +1,54 @@
#if !defined(CAUSAL_LIB_RUNTIME_HOST_COMMON_H)
#define CAUSAL_LIB_RUNTIME_HOST_COMMON_H
#include <dlfcn.h>
enum {
PageSize = 0x1000
};
typedef void* (*pthread_fn_t)(void*);
typedef void (*signal_handler_t)(int);
typedef void (*sigaction_handler_t)(int, siginfo_t*, void*);
class CommonHost {
public:
static int mprotectRange(uintptr_t base, uintptr_t limit, int prot) {
base -= base % PageSize;
limit += PageSize - 1;
limit -= limit % PageSize;
return mprotect((void*)base, limit - base, prot);
}
static int fork() {
static auto real_fork = (int (*)())dlsym(RTLD_NEXT, "fork");
return real_fork();
}
static signal_handler_t signal(int signum, signal_handler_t handler) {
static auto real_signal = (signal_handler_t (*)(int, signal_handler_t))dlsym(RTLD_NEXT, "signal");
return real_signal(signum, handler);
}
static int sigaction(int signum, const struct sigaction* act, struct sigaction* oldact) {
static auto real_sigaction = (int (*)(int, const struct sigaction*, struct sigaction*))dlsym(RTLD_NEXT, "sigaction");
return real_sigaction(signum, act, oldact);
}
static int sigprocmask(int how, const sigset_t* set, sigset_t* oldset) {
static auto real_sigprocmask = (int (*)(int, const sigset_t*, sigset_t*))dlsym(RTLD_NEXT, "sigprocmask");
return real_sigprocmask(how, set, oldset);
}
static int sigsuspend(const sigset_t* mask) {
static auto real_sigsuspend = (int (*)(const sigset_t*))dlsym(RTLD_NEXT, "sigsuspend");
return real_sigsuspend(mask);
}
static int pthread_sigmask(int how, const sigset_t* set, sigset_t* oldset) {
static auto real_pthread_sigmask = (int (*)(int, const sigset_t*, sigset_t*))dlsym(RTLD_NEXT, "pthread_sigmask");
return real_pthread_sigmask(how, set, oldset);
}
};
#endif

View File

@ -8,6 +8,7 @@
#include <sys/mman.h>
#include <atomic>
#include <map>
#include <mutex>
#include "util.h"
@ -46,10 +47,19 @@ private:
public:
static Probe* get(uintptr_t ret, uintptr_t target) {
uintptr_t call = find_call(ret, target);
if(call == 0)
return NULL;
return new Probe(call, ret);
static map<uintptr_t, Probe*> probes;
static mutex m;
m.lock();
if(probes.find(ret) == probes.end()) {
uintptr_t call = find_call(ret, target);
if(call == 0)
probes[ret] = NULL;
else
probes[ret] = new Probe(call, ret);
}
Probe* p = probes[ret];
m.unlock();
return p;
}
uintptr_t getBase() {
@ -74,7 +84,6 @@ public:
void restore() {
if(!_in_place) {
CallInst* p = (CallInst*)_base;
p->opcode = 0xCC;
p->offset = _saved.offset;
p->opcode = _saved.opcode;
_in_place = true;

View File

@ -1,6 +1,6 @@
CAUSAL_ROOT = ../..
CXX = clang++
CXXFLAGS = -std=c++11 -stdlib=libc++ -nostdinc++ -I/usr/src/libcxx/include -L/usr/src/libcxx/lib
CXXFLAGS = -g -O0 -std=c++11 -stdlib=libc++ -nostdinc++ -I/usr/src/libcxx/include -L/usr/src/libcxx/lib
CAUSAL_CXXFLAGS = -Xclang -load -Xclang LLVMCausal.dylib $(CXXFLAGS)
default: work_queue

View File

@ -11,8 +11,8 @@
using namespace std;
enum {
WorkerCount = 5,
WorkItemCount = 40000,
WorkerCount = 4,
WorkItemCount = 100000,
WeightA = 2,
WeightB = 3,
WeightC = 1
@ -36,7 +36,7 @@ void* worker(void* arg) {
}
// Take an item off the queue and unlock it
work_item_t item = work_queue.back();
work_item_t item = work_queue.front();
work_queue.pop();
pthread_mutex_unlock(&work_queue_lock);
// Do work