Removed function template wrapper, since it allocates memory (not safe in a signal handler). Counter backoff is in progress.

--HG--
rename : lib/support/libsuport.cpp => lib/support/libsupport.cpp
This commit is contained in:
Charlie Curtsinger 2014-07-17 10:54:41 -04:00
parent c1a2039028
commit 71e049fe8c
14 changed files with 234 additions and 87 deletions

View File

@ -1,6 +1,6 @@
ROOT = ..
DIRS = histogram kmeans linear_regression matrix_multiply \
pbzip2 pca producer_consumer string_match word_count
RECURSIVE_TARGETS = debug release test bench
RECURSIVE_TARGETS = debug test bench
include $(ROOT)/common.mk

View File

@ -35,7 +35,7 @@ INCLUDE_DIRS += $(ROOT)/include
RECURSIVE_TARGETS ?= clean debug release
# Build by default
all:: debug
all:: debug release
.PHONY: all debug release clean

View File

@ -11,21 +11,33 @@
extern "C" {
#endif
static void __init_counter(int kind, size_t* ctr, const char* name) {
void (*reg)(int, size_t*, const char*) = (void (*)(int, size_t*, const char*))dlsym(RTLD_DEFAULT, "__causal_register_counter");
if(reg != NULL) reg(kind, ctr, name);
typedef void (*causal_reg_ctr_t)(int, unsigned long*, unsigned long*, const char*);
static void _causal_init_counter(int kind,
unsigned long* ctr,
unsigned long* backoff,
const char* name) {
causal_reg_ctr_t reg = (causal_reg_ctr_t)dlsym(RTLD_DEFAULT, "__causal_register_counter");
if(reg)
reg(kind, ctr, backoff, name);
}
#define CAUSAL_INCREMENT_COUNTER(kind, name) \
if(1) { \
static unsigned int __causal_counter_initialized; \
static size_t __causal_counter; \
if(__causal_counter_initialized != 0xDEADBEEF && \
__atomic_exchange_n(&__causal_counter_initialized, 0xDEADBEEF, __ATOMIC_SEQ_CST) != 0xDEADBEEF) { \
__causal_counter = 0; \
__init_counter(kind, &__causal_counter, name); \
static unsigned char _initialized = 0; \
static unsigned long _global_counter = 0; \
static __thread unsigned long _local_counter; \
static unsigned long _backoff = 0; \
\
if(!_initialized) { \
_initialized = 1; \
_causal_init_counter(kind, &_global_counter, &_backoff, name); \
} \
\
++_local_counter; \
if(__builtin_ctz(_local_counter) >= __atomic_load_n(&_backoff, __ATOMIC_ACQUIRE)) { \
__atomic_add_fetch(&_global_counter, 1, __ATOMIC_RELAXED); \
} \
__atomic_fetch_add(&__causal_counter, 1, __ATOMIC_SEQ_CST); \
}
#define PROGRESS_COUNTER 1

View File

@ -18,10 +18,8 @@ public:
void startup(size_t sample_period);
void shutdown();
void baseline_start();
void baseline_end();
void speedup_start(std::shared_ptr<causal_support::line> line);
void speedup_end(size_t num_delays, size_t delay_size);
void start_round(causal_support::line* line);
void end_round(size_t num_delays, size_t delay_size);
private:
void write_counters();
@ -33,6 +31,7 @@ private:
FILE* _f = nullptr;
std::unordered_set<Counter*> _counters;
spinlock _counters_lock;
spinlock _output_lock;
};
#endif

View File

@ -46,7 +46,7 @@ public:
void set_ready_signal(int sig);
/// Apply a function to all available records in the mmapped ring buffer
void process(std::function<void(const record&)> handler);
void process(void (*handler)(const record&));
/// An enum class with all the available sampling data
enum class sample : uint64_t {

View File

@ -4,6 +4,7 @@
#include <atomic>
#include <cstdint>
#include <memory>
#include <random>
#include <string>
#include <vector>
@ -15,9 +16,10 @@
enum {
SampleSignal = SIGPROF,
SamplePeriod = 21000000, // 10ms
SamplePeriod = 100000, // 100us
SampleWakeupCount = 10,
MinRoundSamples = 1000
MinRoundSamples = 200,
SpeedupDivisions = 20
};
class profiler {
@ -49,10 +51,24 @@ public:
}
private:
profiler() {}
profiler() : _generator(get_time()), _delay_dist(0, SpeedupDivisions) {}
profiler(const profiler&) = delete;
void operator=(const profiler&) = delete;
/// Process all available samples and insert delays. This operation will return false if the sampler is not immediately available.
bool process_samples();
/// Process all available samples and insert delays. This operation will block until it succeeds.
void must_process_samples();
/// Static wrapper for the sample processing function
static void call_process_one_sample(const perf_event::record& r);
/// Process a single sample (callback from perf_event)
void process_one_sample(const perf_event::record& r);
static void samples_ready(int signum, siginfo_t* info, void* p);
/// Handle to the profiler's output
output* _out;
@ -66,19 +82,34 @@ private:
causal_support::memory_map _map;
/// The current round number
std::atomic<size_t> _global_round;
std::atomic<size_t> _global_round = ATOMIC_VAR_INIT(0);
/// The total number of delays inserted this round
std::atomic<size_t> _global_delays;
std::atomic<size_t> _global_delays = ATOMIC_VAR_INIT(0);
/// The number of samples collected this round
std::atomic<size_t> _round_samples;
std::atomic<size_t> _round_samples = ATOMIC_VAR_INIT(0);
/// The currently selected line for "speedup"
std::shared_ptr<causal_support::line> _selected_line;
/**
* The currently selected line for "speedup". This should never actually be read.
* Only exists to ensure keep an accurate reference count. Use _selected_line instead.
*/
std::shared_ptr<causal_support::line> _sentinel_selected_line;
/**
* The currently selected line for "speedup". Any thread that clears this line must
* also clear the _sentinel_selected_line to decrement the reference count.
*/
std::atomic<causal_support::line*> _selected_line = ATOMIC_VAR_INIT(nullptr);
/// The current delay size
size_t _delay_size;
std::atomic<size_t> _delay_size;
/// Random number source
std::default_random_engine _generator;
/// Distribution for random delays
std::uniform_int_distribution<size_t> _delay_dist;
};
#endif

View File

@ -29,12 +29,15 @@ static size_t get_time() {
#endif
}
static inline void wait(size_t ns) {
static inline size_t wait(size_t ns) {
struct timespec ts;
ts.tv_nsec = ns % (1000 * 1000 * 1000);
ts.tv_sec = (ns - ts.tv_nsec) / (1000 * 1000 * 1000);
size_t start_time = get_time();
while(nanosleep(&ts, &ts) != 0) {}
return get_time() - start_time;
}
static inline int rt_tgsigqueueinfo(pid_t tgid, pid_t tid, int sig, siginfo_t *uinfo) {

View File

@ -24,7 +24,10 @@ main_fn_t real_main;
/**
* Called by the application to register a progress counter
*/
extern "C" void __causal_register_counter(CounterType kind, size_t* counter, const char* name) {
extern "C" void __causal_register_counter(CounterType kind,
size_t* counter,
size_t* backoff,
const char* name) {
profiler::get_instance().register_counter(new SourceCounter(kind, counter, name));
}
@ -85,8 +88,15 @@ int wrapped_main(int argc, char** argv, char** env) {
args["progress"].as<vector<string>>(),
args["fixed"].as<string>());
// Start the profiler on the main thread (round and delays are zero)
profiler::get_instance().thread_startup(0, 0);
// Run the real main function
int result = real_main(argc - causal_argc - 1, &argv[causal_argc + 1], env);
// Stop the profiler on the main thread
profiler::get_instance().thread_shutdown();
// Shut down the profiler
profiler::get_instance().shutdown();

View File

@ -32,10 +32,12 @@ void output::add_counter(Counter* c) {
* Log the start of a profile run, along with instrumentation calibration info
*/
void output::startup(size_t sample_period) {
_output_lock.lock();
fprintf(_f, "startup\ttime=%lu\n", get_time());
fprintf(_f, "info\tsample-period=%lu\n", sample_period);
//fprintf(_f, "info\tsource-counter-overhead=%lu\n", SourceCounter::calibrate());
fprintf(_f, "info\tperf-counter-overhead=%lu\n", PerfCounter::calibrate());
_output_lock.unlock();
// Drop all counters, so we don't use any calibration counters during the real execution
//counters.clear();
@ -45,7 +47,9 @@ void output::startup(size_t sample_period) {
* Log profiler shutdown
*/
void output::shutdown() {
_output_lock.lock();
fprintf(_f, "shutdown\ttime=%lu\n", get_time());
_output_lock.unlock();
}
/**
@ -58,40 +62,26 @@ void output::write_counters() {
}
}
/**
* Log the beginning of a baseline profiling round
*/
void output::baseline_start() {
// Write out time and progress counter values
fprintf(_f, "start-baseline\ttime=%lu\n", get_time());
write_counters();
}
/**
* Log the end of a baseline profiling round
*/
void output::baseline_end() {
// Write out time and progress counter values
fprintf(_f, "end-baseline\ttime=%lu\n", get_time());
write_counters();
}
/**
* Log the beginning of a speedup profiling round
*/
void output::speedup_start(shared_ptr<line> selected) {
void output::start_round(line* selected) {
_output_lock.lock();
// Write out time, selected line, and progress counter values
fprintf(_f, "start-speedup\tline=%s:%lu\ttime=%lu\n",
fprintf(_f, "start-round\tline=%s:%lu\ttime=%lu\n",
selected->get_file()->get_name().c_str(), selected->get_line(), get_time());
write_counters();
_output_lock.unlock();
}
/**
* Log the end of a speedup profiling round
*/
void output::speedup_end(size_t num_delays, size_t delay_size) {
void output::end_round(size_t num_delays, size_t delay_size) {
_output_lock.lock();
// Write out time, progress counter values, delay count, and total delay
fprintf(_f, "end-speedup\tdelays=%lu\tdelay-size=%lu\ttime=%lu\n",
fprintf(_f, "end-round\tdelays=%lu\tdelay-size=%lu\ttime=%lu\n",
num_delays, delay_size, get_time());
write_counters();
_output_lock.unlock();
}

View File

@ -150,7 +150,7 @@ void perf_event::set_ready_signal(int sig) {
<< "failed to set the owner of the perf_event file";
}
void perf_event::process(function<void(const record&)> handler) {
void perf_event::process(void (*handler)(const record&)) {
// If this isn't a sampling event, just return
if(_mapping == nullptr)
return;

View File

@ -59,7 +59,7 @@ void profiler::startup(const string& output_filename,
// Set up the sampling signal handler
struct sigaction sa = {
.sa_sigaction = samples_ready,
.sa_sigaction = profiler::samples_ready,
.sa_flags = SA_SIGINFO | SA_ONSTACK
};
real::sigaction()(SampleSignal, &sa, nullptr);
@ -92,6 +92,9 @@ void profiler::startup(const string& output_filename,
WARNING << "Progress line \"" << line_name << "\" was not found.";
}
}
// Log the start of this execution
_out->startup(SamplePeriod);
}
/**
@ -99,6 +102,8 @@ void profiler::startup(const string& output_filename,
*/
void profiler::shutdown() {
if(_shutdown_run.test_and_set() == false) {
// Log the end of this execution
_out->shutdown();
delete _out;
}
}
@ -108,8 +113,8 @@ void profiler::thread_startup(size_t parent_round, size_t parent_delays) {
local_delays = parent_delays;
struct perf_event_attr pe = {
.type = PERF_TYPE_HARDWARE,
.config = PERF_COUNT_HW_CPU_CYCLES,
.type = PERF_TYPE_SOFTWARE,
.config = PERF_COUNT_SW_TASK_CLOCK,
.sample_type = PERF_SAMPLE_IP | PERF_SAMPLE_CALLCHAIN,
.sample_period = SamplePeriod,
.wakeup_events = SampleWakeupCount, // This is ignored on linux 3.13 (why?)
@ -128,7 +133,8 @@ void profiler::thread_startup(size_t parent_round, size_t parent_delays) {
}
void profiler::thread_shutdown() {
// TODO: catch up on delays before exiting
// Catch up on delays before exiting
must_process_samples();
// Claim the sampler object and free it
perf_event* s = sampler.exchange(nullptr);
@ -143,28 +149,130 @@ size_t profiler::get_local_delays() {
return local_delays;
}
void samples_ready(int signum, siginfo_t* info, void* p) {
void profiler::must_process_samples() {
// Attempt to process all remaining samples until it succeeds
while(!process_samples()) {
__asm__("pause");
}
}
void profiler::call_process_one_sample(const perf_event::record& r) {
profiler::get_instance().process_one_sample(r);
}
void profiler::process_one_sample(const perf_event::record& r) {
if(r.is_sample()) {
// Find the line that contains this sample
shared_ptr<line> l = _map.find_line(r.get_ip());
// Load the current round number and selected line
size_t current_round = _global_round.load();
line* current_line = _selected_line.load();
// If there isn't a currently selected line, try to start a new round
if(!current_line) {
// Is this sample in a known line?
if(l) {
// Try to set the selected line and start a new round
if(_selected_line.compare_exchange_weak(current_line, l.get())) {
// The swap succeeded! Also update the sentinel to keep reference counts accurate
_sentinel_selected_line = l;
// Update the round number (and the local copy of it)
current_round = ++_global_round;
// Clear the count of samples this round
_round_samples.store(0);
// Update the local round counter
local_round = current_round;
// Update the local copy of the current line
current_line = l.get();
// Clear the global and local delay counts
_global_delays = 0;
local_delays = 0;
// Generate a new random delay size
_delay_size.store(_delay_dist(_generator) * SamplePeriod / SpeedupDivisions);
// Log the start of a new speedup round
_out->start_round(current_line);
} else {
// Another thread must have changed the round and selected line. Reload them and continue.
current_round = _global_round.load();
current_line = _selected_line.load();
}
} else {
// Sample is in some out-of-scope code. Nothing can be done with this sample.
return;
}
}
// Is there a currently selected line?
if(current_line) {
// Yes. There is an active speedup round
// Does this thread's round number match the global round?
if(current_round != local_round) {
// If not, clear the local delay count and advance to the next round
local_round = current_round;
local_delays = 0;
}
// Is this sample in the selected line?
if(l.get() == current_line) {
// This thread can skip a delay (possibly one it adds to global_delays below)
local_delays++;
}
// Is this the final sample in the round?
if(++_round_samples == MinRoundSamples) {
// Log the end of the speedup round
_out->end_round(_global_delays.load(), _delay_size.load());
// Clear the selected line
_selected_line.store(nullptr);
// Also clear the sentinel to keep an accurate reference count
_sentinel_selected_line.reset();
}
}
}
}
bool profiler::process_samples() {
// Attempt to claim the sampler
perf_event* s = sampler.exchange(nullptr);
// If the sampler was not available to be claimed, try again later
if(!s)
return;
// If the sampler is unavailable, give up
if(!s) return false;
// Stop sampling
s->stop();
s->process([](const perf_event::record& r) {
if(r.is_sample()) {
//fprintf(stderr, "Sample at %p\n", (void*)r.get_ip());
}
});
s->process(profiler::call_process_one_sample);
// Take a snapshot of the global delay count
size_t global_delays = _global_delays.load();
// If this thread has more delays + visits than the global delay count, update the global count
if(local_delays > global_delays) {
_global_delays += (local_delays - global_delays);
} else if(local_delays < global_delays) {
wait(_delay_size.load() * (global_delays - local_delays));
}
// Resume sampling
s->start();
// Return the sampler to the shared atomic pointer
// Release the sampler
sampler.exchange(s);
return true;
}
void profiler::samples_ready(int signum, siginfo_t* info, void* p) {
// Process all available samples
profiler::get_instance().process_samples();
}
void on_error(int signum, siginfo_t* info, void* p) {

View File

@ -34,29 +34,13 @@ def main(filename):
elif command == 'info':
if 'sample-period' in data:
period = int(data['sample-period'])
elif command == 'start-baseline':
phase_start_time = int(data['time'])
(i, counter_start_values) = readCounters(lines, i)
elif command == 'end-baseline':
phase_time = int(data['time']) - phase_start_time
(i, counter_end_values) = readCounters(lines, i)
for counter in counter_start_values:
if counter in counter_end_values:
difference = counter_end_values[counter] - counter_start_values[counter]
if difference > 0:
if counter not in baseline_rates:
baseline_rates[counter] = []
baseline_rates[counter].append((difference, phase_time))
elif command == 'start-speedup':
elif command == 'start-round':
phase_start_time = int(data['time'])
speedup_line = data['line']
(i, counter_start_values) = readCounters(lines, i)
elif command == 'end-speedup':
elif command == 'end-round':
phase_time = int(data['time']) - phase_start_time
delay_count = int(data['delays'])
delay_size = int(data['delay-size'])
@ -78,6 +62,16 @@ def main(filename):
if counter not in speedup_rates[speedup_line][delay_size]:
speedup_rates[speedup_line][delay_size][counter] = []
speedup_rates[speedup_line][delay_size][counter].append((difference, phase_time))
# When delay size is 0, this is also a baseline measurement
if delay_size == 0:
for counter in counter_start_values:
if counter in counter_end_values:
difference = counter_end_values[counter] - counter_start_values[counter]
if difference > 0:
if counter not in baseline_rates:
baseline_rates[counter] = []
baseline_rates[counter].append((difference, phase_time))
print "line\tline_speedup\tcounter\tcounter_speedup\tbaseline_period\tspeedup_period\tsamples"

View File

@ -1,5 +1,5 @@
ROOT = ..
DIRS = loopy
RECURSIVE_TARGETS = debug release test
RECURSIVE_TARGETS = debug test
include $(ROOT)/common.mk