mirror of
https://github.com/debauchee/barrier.git
synced 2024-12-26 20:53:22 +03:00
0759cbc104
MsgWaitForMultipleObjects(): it will not return immediately if an event already in the queue when it's called was already in the queue during the last call to GetMessage()/PeekMessage(). also now discarding screen saver events if there are any other screen saver events in the queue already. this prevents these events from piling up in the queue, which they'd do because we sleep for 250ms when handling each one.
738 lines
14 KiB
C++
738 lines
14 KiB
C++
#include "CThreadRep.h"
|
|
#include "CLock.h"
|
|
#include "CMutex.h"
|
|
#include "CThread.h"
|
|
#include "XThread.h"
|
|
#include "CLog.h"
|
|
#include "IJob.h"
|
|
|
|
#if HAVE_PTHREAD
|
|
|
|
# include <signal.h>
|
|
# define SIGWAKEUP SIGUSR1
|
|
|
|
#elif WINDOWS_LIKE
|
|
|
|
# if !defined(_MT)
|
|
# error multithreading compile option is required
|
|
# endif
|
|
# include <process.h>
|
|
|
|
#else
|
|
|
|
#error unsupported platform for multithreading
|
|
|
|
#endif
|
|
|
|
// FIXME -- temporary exception type
|
|
class XThreadUnavailable { };
|
|
|
|
//
|
|
// CThreadRep
|
|
//
|
|
|
|
CMutex* CThreadRep::s_mutex = NULL;
|
|
CThreadRep* CThreadRep::s_head = NULL;
|
|
#if HAVE_PTHREAD
|
|
pthread_t CThreadRep::s_signalThread;
|
|
#endif
|
|
|
|
CThreadRep::CThreadRep() :
|
|
m_prev(NULL),
|
|
m_next(NULL),
|
|
m_refCount(1),
|
|
m_job(NULL),
|
|
m_userData(NULL)
|
|
{
|
|
// note -- s_mutex must be locked on entry
|
|
assert(s_mutex != NULL);
|
|
|
|
// initialize stuff
|
|
init();
|
|
#if HAVE_PTHREAD
|
|
// get main thread id
|
|
m_thread = pthread_self();
|
|
#elif WINDOWS_LIKE
|
|
// get main thread id
|
|
m_thread = NULL;
|
|
m_id = GetCurrentThreadId();
|
|
#endif
|
|
|
|
// insert ourself into linked list
|
|
if (s_head != NULL) {
|
|
s_head->m_prev = this;
|
|
m_next = s_head;
|
|
}
|
|
s_head = this;
|
|
}
|
|
|
|
CThreadRep::CThreadRep(IJob* job, void* userData) :
|
|
m_prev(NULL),
|
|
m_next(NULL),
|
|
m_refCount(2), // 1 for us, 1 for thread
|
|
m_job(job),
|
|
m_userData(userData)
|
|
{
|
|
assert(m_job != NULL);
|
|
assert(s_mutex != NULL);
|
|
|
|
// create a thread rep for the main thread if the current thread
|
|
// is unknown. note that this might cause multiple "main" threads
|
|
// if threads are created external to this library.
|
|
getCurrentThreadRep()->unref();
|
|
|
|
// initialize
|
|
init();
|
|
|
|
// hold mutex while we create the thread
|
|
CLock lock(s_mutex);
|
|
|
|
// start the thread. throw if it doesn't start.
|
|
#if HAVE_PTHREAD
|
|
// mask some signals in all threads except the main thread
|
|
sigset_t sigset, oldsigset;
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGINT);
|
|
sigaddset(&sigset, SIGTERM);
|
|
pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
|
|
int status = pthread_create(&m_thread, NULL, threadFunc, (void*)this);
|
|
pthread_sigmask(SIG_SETMASK, &oldsigset, NULL);
|
|
if (status != 0) {
|
|
throw XThreadUnavailable();
|
|
}
|
|
#elif WINDOWS_LIKE
|
|
unsigned int id;
|
|
m_thread = reinterpret_cast<HANDLE>(_beginthreadex(NULL, 0,
|
|
threadFunc, (void*)this, 0, &id));
|
|
m_id = static_cast<DWORD>(id);
|
|
if (m_thread == 0) {
|
|
throw XThreadUnavailable();
|
|
}
|
|
#endif
|
|
|
|
// insert ourself into linked list
|
|
if (s_head != NULL) {
|
|
s_head->m_prev = this;
|
|
m_next = s_head;
|
|
}
|
|
s_head = this;
|
|
|
|
// returning releases the locks, allowing the child thread to run
|
|
}
|
|
|
|
CThreadRep::~CThreadRep()
|
|
{
|
|
// note -- s_mutex must be locked on entry
|
|
|
|
// remove ourself from linked list
|
|
if (m_prev != NULL) {
|
|
m_prev->m_next = m_next;
|
|
}
|
|
if (m_next != NULL) {
|
|
m_next->m_prev = m_prev;
|
|
}
|
|
if (s_head == this) {
|
|
s_head = m_next;
|
|
}
|
|
|
|
// clean up
|
|
fini();
|
|
}
|
|
|
|
void
|
|
CThreadRep::initThreads()
|
|
{
|
|
if (s_mutex == NULL) {
|
|
s_mutex = new CMutex;
|
|
|
|
#if HAVE_PTHREAD
|
|
// install SIGWAKEUP handler
|
|
struct sigaction act;
|
|
sigemptyset(&act.sa_mask);
|
|
# if defined(SA_INTERRUPT)
|
|
act.sa_flags = SA_INTERRUPT;
|
|
# else
|
|
act.sa_flags = 0;
|
|
# endif
|
|
act.sa_handler = &threadCancel;
|
|
sigaction(SIGWAKEUP, &act, NULL);
|
|
|
|
// set signal mask
|
|
sigset_t sigset;
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGWAKEUP);
|
|
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGPIPE);
|
|
sigaddset(&sigset, SIGINT);
|
|
sigaddset(&sigset, SIGTERM);
|
|
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
|
|
|
|
// fire up the INT and TERM signal handler thread. we could
|
|
// instead arrange to catch and handle these signals but
|
|
// we'd be unable to cancel the main thread since no pthread
|
|
// calls are allowed in a signal handler.
|
|
int status = pthread_create(&s_signalThread, NULL,
|
|
&CThreadRep::threadSignalHandler,
|
|
getCurrentThreadRep());
|
|
if (status != 0) {
|
|
// can't create thread to wait for signal so don't block
|
|
// the signals.
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGINT);
|
|
sigaddset(&sigset, SIGTERM);
|
|
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
|
|
}
|
|
#endif // HAVE_PTHREAD
|
|
}
|
|
}
|
|
|
|
void
|
|
CThreadRep::ref()
|
|
{
|
|
CLock lock(s_mutex);
|
|
++m_refCount;
|
|
}
|
|
|
|
void
|
|
CThreadRep::unref()
|
|
{
|
|
CLock lock(s_mutex);
|
|
if (--m_refCount == 0) {
|
|
delete this;
|
|
}
|
|
}
|
|
|
|
bool
|
|
CThreadRep::enableCancel(bool enable)
|
|
{
|
|
CLock lock(s_mutex);
|
|
const bool old = m_cancellable;
|
|
m_cancellable = enable;
|
|
return old;
|
|
}
|
|
|
|
bool
|
|
CThreadRep::isCancellable() const
|
|
{
|
|
CLock lock(s_mutex);
|
|
return (m_cancellable && !m_cancelling);
|
|
}
|
|
|
|
void*
|
|
CThreadRep::getResult() const
|
|
{
|
|
// no lock necessary since thread isn't running
|
|
return m_result;
|
|
}
|
|
|
|
void*
|
|
CThreadRep::getUserData() const
|
|
{
|
|
// no lock necessary because the value never changes
|
|
return m_userData;
|
|
}
|
|
|
|
CThreadRep*
|
|
CThreadRep::getCurrentThreadRep()
|
|
{
|
|
assert(s_mutex != NULL);
|
|
|
|
#if HAVE_PTHREAD
|
|
const pthread_t thread = pthread_self();
|
|
#elif WINDOWS_LIKE
|
|
const DWORD id = GetCurrentThreadId();
|
|
#endif
|
|
|
|
// lock list while we search
|
|
CLock lock(s_mutex);
|
|
|
|
// search
|
|
CThreadRep* scan = s_head;
|
|
while (scan != NULL) {
|
|
#if HAVE_PTHREAD
|
|
if (scan->m_thread == thread) {
|
|
break;
|
|
}
|
|
#elif WINDOWS_LIKE
|
|
if (scan->m_id == id) {
|
|
break;
|
|
}
|
|
#endif
|
|
scan = scan->m_next;
|
|
}
|
|
|
|
// create and use main thread rep if thread not found
|
|
if (scan == NULL) {
|
|
scan = new CThreadRep();
|
|
}
|
|
|
|
// ref for caller
|
|
++scan->m_refCount;
|
|
|
|
return scan;
|
|
}
|
|
|
|
void
|
|
CThreadRep::doThreadFunc()
|
|
{
|
|
// default priority is slightly below normal
|
|
setPriority(1);
|
|
|
|
// wait for parent to initialize this object
|
|
{ CLock lock(s_mutex); }
|
|
|
|
void* result = NULL;
|
|
try {
|
|
// go
|
|
log((CLOG_DEBUG1 "thread %p entry", this));
|
|
m_job->run();
|
|
log((CLOG_DEBUG1 "thread %p exit", this));
|
|
}
|
|
|
|
catch (XThreadCancel&) {
|
|
// client called cancel()
|
|
log((CLOG_DEBUG1 "caught cancel on thread %p", this));
|
|
}
|
|
|
|
catch (XThreadExit& e) {
|
|
// client called exit()
|
|
result = e.m_result;
|
|
log((CLOG_DEBUG1 "caught exit on thread %p", this));
|
|
}
|
|
catch (...) {
|
|
log((CLOG_DEBUG1 "exception on thread %p", this));
|
|
// note -- don't catch (...) to avoid masking bugs
|
|
delete m_job;
|
|
throw;
|
|
}
|
|
|
|
// done with job
|
|
delete m_job;
|
|
|
|
// store exit result (no lock necessary because the result will
|
|
// not be accessed until m_exit is set)
|
|
m_result = result;
|
|
}
|
|
|
|
#if HAVE_PTHREAD
|
|
|
|
#include "CStopwatch.h"
|
|
#if TIME_WITH_SYS_TIME
|
|
# include <sys/time.h>
|
|
# include <time.h>
|
|
#else
|
|
# if HAVE_SYS_TIME_H
|
|
# include <sys/time.h>
|
|
# else
|
|
# include <time.h>
|
|
# endif
|
|
#endif
|
|
|
|
void
|
|
CThreadRep::init()
|
|
{
|
|
m_result = NULL;
|
|
m_cancellable = true;
|
|
m_cancelling = false;
|
|
m_cancel = false;
|
|
m_exit = false;
|
|
}
|
|
|
|
void
|
|
CThreadRep::fini()
|
|
{
|
|
// main thread has NULL job
|
|
if (m_job != NULL) {
|
|
pthread_detach(m_thread);
|
|
}
|
|
}
|
|
|
|
void
|
|
CThreadRep::sleep(
|
|
double timeout)
|
|
{
|
|
if (timeout < 0.0) {
|
|
return;
|
|
}
|
|
struct timespec t;
|
|
t.tv_sec = (long)timeout;
|
|
t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec));
|
|
while (nanosleep(&t, &t) < 0)
|
|
testCancel();
|
|
}
|
|
|
|
void
|
|
CThreadRep::cancel()
|
|
{
|
|
CLock lock(s_mutex);
|
|
if (m_cancellable && !m_cancelling) {
|
|
m_cancel = true;
|
|
}
|
|
else {
|
|
return;
|
|
}
|
|
|
|
// break out of system calls
|
|
log((CLOG_DEBUG1 "cancel thread %p", this));
|
|
pthread_kill(m_thread, SIGWAKEUP);
|
|
}
|
|
|
|
void
|
|
CThreadRep::testCancel()
|
|
{
|
|
{
|
|
CLock lock(s_mutex);
|
|
|
|
// done if not cancelled, not cancellable, or already cancelling
|
|
if (!m_cancel || !m_cancellable || m_cancelling) {
|
|
return;
|
|
}
|
|
|
|
// update state for cancel
|
|
m_cancel = false;
|
|
m_cancelling = true;
|
|
}
|
|
|
|
// start cancel
|
|
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
|
|
throw XThreadCancel();
|
|
}
|
|
|
|
bool
|
|
CThreadRep::wait(CThreadRep* target, double timeout)
|
|
{
|
|
if (target == this) {
|
|
return false;
|
|
}
|
|
|
|
testCancel();
|
|
if (target->isExited()) {
|
|
return true;
|
|
}
|
|
|
|
if (timeout != 0.0) {
|
|
CStopwatch timer;
|
|
do {
|
|
sleep(0.05);
|
|
testCancel();
|
|
if (target->isExited()) {
|
|
return true;
|
|
}
|
|
} while (timeout < 0.0 || timer.getTime() <= timeout);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void
|
|
CThreadRep::setPriority(int)
|
|
{
|
|
// FIXME
|
|
}
|
|
|
|
bool
|
|
CThreadRep::isExited() const
|
|
{
|
|
CLock lock(s_mutex);
|
|
return m_exit;
|
|
}
|
|
|
|
void*
|
|
CThreadRep::threadFunc(void* arg)
|
|
{
|
|
CThreadRep* rep = (CThreadRep*)arg;
|
|
|
|
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
|
|
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
|
|
|
// run thread
|
|
rep->doThreadFunc();
|
|
|
|
{
|
|
// mark as terminated
|
|
CLock lock(s_mutex);
|
|
rep->m_exit = true;
|
|
}
|
|
|
|
// unref thread
|
|
rep->unref();
|
|
|
|
// terminate the thread
|
|
return NULL;
|
|
}
|
|
|
|
void
|
|
CThreadRep::threadCancel(int)
|
|
{
|
|
// do nothing
|
|
}
|
|
|
|
void*
|
|
CThreadRep::threadSignalHandler(void* vrep)
|
|
{
|
|
CThreadRep* mainThreadRep = reinterpret_cast<CThreadRep*>(vrep);
|
|
|
|
// detach
|
|
pthread_detach(pthread_self());
|
|
|
|
// add signal to mask
|
|
sigset_t sigset;
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGINT);
|
|
sigaddset(&sigset, SIGTERM);
|
|
|
|
// also wait on SIGABRT. on linux (others?) this thread (process)
|
|
// will persist after all the other threads evaporate due to an
|
|
// assert unless we wait on SIGABRT. that means our resources (like
|
|
// the socket we're listening on) are not released and never will be
|
|
// until the lingering thread is killed. i don't know why sigwait()
|
|
// should protect the thread from being killed. note that sigwait()
|
|
// doesn't actually return if we receive SIGABRT and, for some
|
|
// reason, we don't have to block SIGABRT.
|
|
sigaddset(&sigset, SIGABRT);
|
|
|
|
// we exit the loop via thread cancellation in sigwait()
|
|
for (;;) {
|
|
// wait
|
|
int signal;
|
|
sigwait(&sigset, &signal);
|
|
|
|
// if we get here then the signal was raised. cancel the thread.
|
|
mainThreadRep->cancel();
|
|
}
|
|
}
|
|
|
|
#endif // HAVE_PTHREAD
|
|
|
|
#if WINDOWS_LIKE
|
|
|
|
void
|
|
CThreadRep::init()
|
|
{
|
|
m_result = NULL;
|
|
m_cancellable = true;
|
|
m_cancelling = false;
|
|
m_exit = CreateEvent(NULL, TRUE, FALSE, NULL);
|
|
m_cancel = CreateEvent(NULL, TRUE, FALSE, NULL);
|
|
}
|
|
|
|
void
|
|
CThreadRep::fini()
|
|
{
|
|
// destroy the events
|
|
CloseHandle(m_cancel);
|
|
CloseHandle(m_exit);
|
|
|
|
// close the handle (main thread has a NULL handle)
|
|
if (m_thread != NULL) {
|
|
CloseHandle(m_thread);
|
|
}
|
|
}
|
|
|
|
void
|
|
CThreadRep::sleep(
|
|
double timeout)
|
|
{
|
|
if (isCancellable()) {
|
|
WaitForSingleObject(m_cancel, (DWORD)(1000.0 * timeout));
|
|
}
|
|
else {
|
|
Sleep((DWORD)(1000.0 * timeout));
|
|
}
|
|
}
|
|
|
|
void
|
|
CThreadRep::cancel()
|
|
{
|
|
log((CLOG_DEBUG1 "cancel thread %p", this));
|
|
SetEvent(m_cancel);
|
|
}
|
|
|
|
void
|
|
CThreadRep::testCancel()
|
|
{
|
|
// poll cancel event. return if not set.
|
|
const DWORD result = WaitForSingleObject(getCancelEvent(), 0);
|
|
if (result != WAIT_OBJECT_0) {
|
|
return;
|
|
}
|
|
|
|
{
|
|
// ignore if disabled or already cancelling
|
|
CLock lock(s_mutex);
|
|
if (!m_cancellable || m_cancelling)
|
|
return;
|
|
|
|
// update state for cancel
|
|
m_cancelling = true;
|
|
ResetEvent(m_cancel);
|
|
}
|
|
|
|
// start cancel
|
|
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
|
|
throw XThreadCancel();
|
|
}
|
|
|
|
bool
|
|
CThreadRep::wait(CThreadRep* target, double timeout)
|
|
{
|
|
// get the current thread. if it's the same as the target thread
|
|
// then the thread is waiting on itself.
|
|
CThreadPtr currentRep(CThreadRep::getCurrentThreadRep());
|
|
if (target == this) {
|
|
return false;
|
|
}
|
|
|
|
// is cancellation enabled?
|
|
const DWORD n = (isCancellable() ? 2 : 1);
|
|
|
|
// convert timeout
|
|
DWORD t;
|
|
if (timeout < 0.0) {
|
|
t = INFINITE;
|
|
}
|
|
else {
|
|
t = (DWORD)(1000.0 * timeout);
|
|
}
|
|
|
|
// wait for this thread to be cancelled or for the target thread to
|
|
// terminate.
|
|
HANDLE handles[2];
|
|
handles[0] = target->getExitEvent();
|
|
handles[1] = m_cancel;
|
|
DWORD result = WaitForMultipleObjects(n, handles, FALSE, t);
|
|
|
|
// cancel takes priority
|
|
if (n == 2 && result != WAIT_OBJECT_0 + 1 &&
|
|
WaitForSingleObject(handles[1], 0) == WAIT_OBJECT_0) {
|
|
result = WAIT_OBJECT_0 + 1;
|
|
}
|
|
|
|
// handle result
|
|
switch (result) {
|
|
case WAIT_OBJECT_0 + 0:
|
|
// target thread terminated
|
|
return true;
|
|
|
|
case WAIT_OBJECT_0 + 1:
|
|
// this thread was cancelled. does not return.
|
|
testCancel();
|
|
|
|
default:
|
|
// timeout or error
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool
|
|
CThreadRep::waitForEvent(double timeout)
|
|
{
|
|
// check if messages are available first. if we don't do this then
|
|
// MsgWaitForMultipleObjects() will block even if the queue isn't
|
|
// empty if the messages in the queue were there before the last
|
|
// call to GetMessage()/PeekMessage().
|
|
if (HIWORD(GetQueueStatus(QS_ALLINPUT)) != 0) {
|
|
return true;
|
|
}
|
|
|
|
// is cancellation enabled?
|
|
const DWORD n = (isCancellable() ? 1 : 0);
|
|
|
|
// convert timeout
|
|
DWORD t;
|
|
if (timeout < 0.0) {
|
|
t = INFINITE;
|
|
}
|
|
else {
|
|
t = (DWORD)(1000.0 * timeout);
|
|
}
|
|
|
|
// wait for this thread to be cancelled or for the target thread to
|
|
// terminate.
|
|
HANDLE handles[1];
|
|
handles[0] = m_cancel;
|
|
DWORD result = MsgWaitForMultipleObjects(n, handles, FALSE, t, QS_ALLINPUT);
|
|
|
|
// handle result
|
|
switch (result) {
|
|
case WAIT_OBJECT_0 + 1:
|
|
// message is available
|
|
return true;
|
|
|
|
case WAIT_OBJECT_0 + 0:
|
|
// this thread was cancelled. does not return.
|
|
testCancel();
|
|
|
|
default:
|
|
// timeout or error
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void
|
|
CThreadRep::setPriority(int n)
|
|
{
|
|
DWORD pClass = NORMAL_PRIORITY_CLASS;
|
|
if (n < 0) {
|
|
switch (-n) {
|
|
case 1: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
|
|
case 2: n = THREAD_PRIORITY_HIGHEST; break;
|
|
default:
|
|
pClass = HIGH_PRIORITY_CLASS;
|
|
switch (-n - 3) {
|
|
case 0: n = THREAD_PRIORITY_LOWEST; break;
|
|
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
|
|
case 2: n = THREAD_PRIORITY_NORMAL; break;
|
|
case 3: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
|
|
default: n = THREAD_PRIORITY_HIGHEST; break;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
else {
|
|
switch (n) {
|
|
case 0: n = THREAD_PRIORITY_NORMAL; break;
|
|
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
|
|
case 2: n = THREAD_PRIORITY_LOWEST; break;
|
|
default: n = THREAD_PRIORITY_IDLE; break;
|
|
}
|
|
}
|
|
SetPriorityClass(m_thread, pClass);
|
|
SetThreadPriority(m_thread, n);
|
|
}
|
|
|
|
HANDLE
|
|
CThreadRep::getExitEvent() const
|
|
{
|
|
// no lock necessary because the value never changes
|
|
return m_exit;
|
|
}
|
|
|
|
HANDLE
|
|
CThreadRep::getCancelEvent() const
|
|
{
|
|
// no lock necessary because the value never changes
|
|
return m_cancel;
|
|
}
|
|
|
|
unsigned int __stdcall
|
|
CThreadRep::threadFunc(void* arg)
|
|
{
|
|
CThreadRep* rep = (CThreadRep*)arg;
|
|
|
|
// run thread
|
|
rep->doThreadFunc();
|
|
|
|
// signal termination
|
|
SetEvent(rep->m_exit);
|
|
|
|
// unref thread
|
|
rep->unref();
|
|
|
|
// terminate the thread
|
|
return 0;
|
|
}
|
|
|
|
#endif // WINDOWS_LIKE
|