barrier/mt/CThreadRep.cpp
crs 0759cbc104 changed waitForEvent() to handle a peculiar feature of
MsgWaitForMultipleObjects():  it will not return immediately
if an event already in the queue when it's called was already
in the queue during the last call to GetMessage()/PeekMessage().
also now discarding screen saver events if there are any other
screen saver events in the queue already.  this prevents these
events from piling up in the queue, which they'd do because we
sleep for 250ms when handling each one.
2002-07-18 16:58:08 +00:00

738 lines
14 KiB
C++

#include "CThreadRep.h"
#include "CLock.h"
#include "CMutex.h"
#include "CThread.h"
#include "XThread.h"
#include "CLog.h"
#include "IJob.h"
#if HAVE_PTHREAD
# include <signal.h>
# define SIGWAKEUP SIGUSR1
#elif WINDOWS_LIKE
# if !defined(_MT)
# error multithreading compile option is required
# endif
# include <process.h>
#else
#error unsupported platform for multithreading
#endif
// FIXME -- temporary exception type
class XThreadUnavailable { };
//
// CThreadRep
//
CMutex* CThreadRep::s_mutex = NULL;
CThreadRep* CThreadRep::s_head = NULL;
#if HAVE_PTHREAD
pthread_t CThreadRep::s_signalThread;
#endif
CThreadRep::CThreadRep() :
m_prev(NULL),
m_next(NULL),
m_refCount(1),
m_job(NULL),
m_userData(NULL)
{
// note -- s_mutex must be locked on entry
assert(s_mutex != NULL);
// initialize stuff
init();
#if HAVE_PTHREAD
// get main thread id
m_thread = pthread_self();
#elif WINDOWS_LIKE
// get main thread id
m_thread = NULL;
m_id = GetCurrentThreadId();
#endif
// insert ourself into linked list
if (s_head != NULL) {
s_head->m_prev = this;
m_next = s_head;
}
s_head = this;
}
CThreadRep::CThreadRep(IJob* job, void* userData) :
m_prev(NULL),
m_next(NULL),
m_refCount(2), // 1 for us, 1 for thread
m_job(job),
m_userData(userData)
{
assert(m_job != NULL);
assert(s_mutex != NULL);
// create a thread rep for the main thread if the current thread
// is unknown. note that this might cause multiple "main" threads
// if threads are created external to this library.
getCurrentThreadRep()->unref();
// initialize
init();
// hold mutex while we create the thread
CLock lock(s_mutex);
// start the thread. throw if it doesn't start.
#if HAVE_PTHREAD
// mask some signals in all threads except the main thread
sigset_t sigset, oldsigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
int status = pthread_create(&m_thread, NULL, threadFunc, (void*)this);
pthread_sigmask(SIG_SETMASK, &oldsigset, NULL);
if (status != 0) {
throw XThreadUnavailable();
}
#elif WINDOWS_LIKE
unsigned int id;
m_thread = reinterpret_cast<HANDLE>(_beginthreadex(NULL, 0,
threadFunc, (void*)this, 0, &id));
m_id = static_cast<DWORD>(id);
if (m_thread == 0) {
throw XThreadUnavailable();
}
#endif
// insert ourself into linked list
if (s_head != NULL) {
s_head->m_prev = this;
m_next = s_head;
}
s_head = this;
// returning releases the locks, allowing the child thread to run
}
CThreadRep::~CThreadRep()
{
// note -- s_mutex must be locked on entry
// remove ourself from linked list
if (m_prev != NULL) {
m_prev->m_next = m_next;
}
if (m_next != NULL) {
m_next->m_prev = m_prev;
}
if (s_head == this) {
s_head = m_next;
}
// clean up
fini();
}
void
CThreadRep::initThreads()
{
if (s_mutex == NULL) {
s_mutex = new CMutex;
#if HAVE_PTHREAD
// install SIGWAKEUP handler
struct sigaction act;
sigemptyset(&act.sa_mask);
# if defined(SA_INTERRUPT)
act.sa_flags = SA_INTERRUPT;
# else
act.sa_flags = 0;
# endif
act.sa_handler = &threadCancel;
sigaction(SIGWAKEUP, &act, NULL);
// set signal mask
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGWAKEUP);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
// fire up the INT and TERM signal handler thread. we could
// instead arrange to catch and handle these signals but
// we'd be unable to cancel the main thread since no pthread
// calls are allowed in a signal handler.
int status = pthread_create(&s_signalThread, NULL,
&CThreadRep::threadSignalHandler,
getCurrentThreadRep());
if (status != 0) {
// can't create thread to wait for signal so don't block
// the signals.
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}
#endif // HAVE_PTHREAD
}
}
void
CThreadRep::ref()
{
CLock lock(s_mutex);
++m_refCount;
}
void
CThreadRep::unref()
{
CLock lock(s_mutex);
if (--m_refCount == 0) {
delete this;
}
}
bool
CThreadRep::enableCancel(bool enable)
{
CLock lock(s_mutex);
const bool old = m_cancellable;
m_cancellable = enable;
return old;
}
bool
CThreadRep::isCancellable() const
{
CLock lock(s_mutex);
return (m_cancellable && !m_cancelling);
}
void*
CThreadRep::getResult() const
{
// no lock necessary since thread isn't running
return m_result;
}
void*
CThreadRep::getUserData() const
{
// no lock necessary because the value never changes
return m_userData;
}
CThreadRep*
CThreadRep::getCurrentThreadRep()
{
assert(s_mutex != NULL);
#if HAVE_PTHREAD
const pthread_t thread = pthread_self();
#elif WINDOWS_LIKE
const DWORD id = GetCurrentThreadId();
#endif
// lock list while we search
CLock lock(s_mutex);
// search
CThreadRep* scan = s_head;
while (scan != NULL) {
#if HAVE_PTHREAD
if (scan->m_thread == thread) {
break;
}
#elif WINDOWS_LIKE
if (scan->m_id == id) {
break;
}
#endif
scan = scan->m_next;
}
// create and use main thread rep if thread not found
if (scan == NULL) {
scan = new CThreadRep();
}
// ref for caller
++scan->m_refCount;
return scan;
}
void
CThreadRep::doThreadFunc()
{
// default priority is slightly below normal
setPriority(1);
// wait for parent to initialize this object
{ CLock lock(s_mutex); }
void* result = NULL;
try {
// go
log((CLOG_DEBUG1 "thread %p entry", this));
m_job->run();
log((CLOG_DEBUG1 "thread %p exit", this));
}
catch (XThreadCancel&) {
// client called cancel()
log((CLOG_DEBUG1 "caught cancel on thread %p", this));
}
catch (XThreadExit& e) {
// client called exit()
result = e.m_result;
log((CLOG_DEBUG1 "caught exit on thread %p", this));
}
catch (...) {
log((CLOG_DEBUG1 "exception on thread %p", this));
// note -- don't catch (...) to avoid masking bugs
delete m_job;
throw;
}
// done with job
delete m_job;
// store exit result (no lock necessary because the result will
// not be accessed until m_exit is set)
m_result = result;
}
#if HAVE_PTHREAD
#include "CStopwatch.h"
#if TIME_WITH_SYS_TIME
# include <sys/time.h>
# include <time.h>
#else
# if HAVE_SYS_TIME_H
# include <sys/time.h>
# else
# include <time.h>
# endif
#endif
void
CThreadRep::init()
{
m_result = NULL;
m_cancellable = true;
m_cancelling = false;
m_cancel = false;
m_exit = false;
}
void
CThreadRep::fini()
{
// main thread has NULL job
if (m_job != NULL) {
pthread_detach(m_thread);
}
}
void
CThreadRep::sleep(
double timeout)
{
if (timeout < 0.0) {
return;
}
struct timespec t;
t.tv_sec = (long)timeout;
t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec));
while (nanosleep(&t, &t) < 0)
testCancel();
}
void
CThreadRep::cancel()
{
CLock lock(s_mutex);
if (m_cancellable && !m_cancelling) {
m_cancel = true;
}
else {
return;
}
// break out of system calls
log((CLOG_DEBUG1 "cancel thread %p", this));
pthread_kill(m_thread, SIGWAKEUP);
}
void
CThreadRep::testCancel()
{
{
CLock lock(s_mutex);
// done if not cancelled, not cancellable, or already cancelling
if (!m_cancel || !m_cancellable || m_cancelling) {
return;
}
// update state for cancel
m_cancel = false;
m_cancelling = true;
}
// start cancel
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
throw XThreadCancel();
}
bool
CThreadRep::wait(CThreadRep* target, double timeout)
{
if (target == this) {
return false;
}
testCancel();
if (target->isExited()) {
return true;
}
if (timeout != 0.0) {
CStopwatch timer;
do {
sleep(0.05);
testCancel();
if (target->isExited()) {
return true;
}
} while (timeout < 0.0 || timer.getTime() <= timeout);
}
return false;
}
void
CThreadRep::setPriority(int)
{
// FIXME
}
bool
CThreadRep::isExited() const
{
CLock lock(s_mutex);
return m_exit;
}
void*
CThreadRep::threadFunc(void* arg)
{
CThreadRep* rep = (CThreadRep*)arg;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
// run thread
rep->doThreadFunc();
{
// mark as terminated
CLock lock(s_mutex);
rep->m_exit = true;
}
// unref thread
rep->unref();
// terminate the thread
return NULL;
}
void
CThreadRep::threadCancel(int)
{
// do nothing
}
void*
CThreadRep::threadSignalHandler(void* vrep)
{
CThreadRep* mainThreadRep = reinterpret_cast<CThreadRep*>(vrep);
// detach
pthread_detach(pthread_self());
// add signal to mask
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
// also wait on SIGABRT. on linux (others?) this thread (process)
// will persist after all the other threads evaporate due to an
// assert unless we wait on SIGABRT. that means our resources (like
// the socket we're listening on) are not released and never will be
// until the lingering thread is killed. i don't know why sigwait()
// should protect the thread from being killed. note that sigwait()
// doesn't actually return if we receive SIGABRT and, for some
// reason, we don't have to block SIGABRT.
sigaddset(&sigset, SIGABRT);
// we exit the loop via thread cancellation in sigwait()
for (;;) {
// wait
int signal;
sigwait(&sigset, &signal);
// if we get here then the signal was raised. cancel the thread.
mainThreadRep->cancel();
}
}
#endif // HAVE_PTHREAD
#if WINDOWS_LIKE
void
CThreadRep::init()
{
m_result = NULL;
m_cancellable = true;
m_cancelling = false;
m_exit = CreateEvent(NULL, TRUE, FALSE, NULL);
m_cancel = CreateEvent(NULL, TRUE, FALSE, NULL);
}
void
CThreadRep::fini()
{
// destroy the events
CloseHandle(m_cancel);
CloseHandle(m_exit);
// close the handle (main thread has a NULL handle)
if (m_thread != NULL) {
CloseHandle(m_thread);
}
}
void
CThreadRep::sleep(
double timeout)
{
if (isCancellable()) {
WaitForSingleObject(m_cancel, (DWORD)(1000.0 * timeout));
}
else {
Sleep((DWORD)(1000.0 * timeout));
}
}
void
CThreadRep::cancel()
{
log((CLOG_DEBUG1 "cancel thread %p", this));
SetEvent(m_cancel);
}
void
CThreadRep::testCancel()
{
// poll cancel event. return if not set.
const DWORD result = WaitForSingleObject(getCancelEvent(), 0);
if (result != WAIT_OBJECT_0) {
return;
}
{
// ignore if disabled or already cancelling
CLock lock(s_mutex);
if (!m_cancellable || m_cancelling)
return;
// update state for cancel
m_cancelling = true;
ResetEvent(m_cancel);
}
// start cancel
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
throw XThreadCancel();
}
bool
CThreadRep::wait(CThreadRep* target, double timeout)
{
// get the current thread. if it's the same as the target thread
// then the thread is waiting on itself.
CThreadPtr currentRep(CThreadRep::getCurrentThreadRep());
if (target == this) {
return false;
}
// is cancellation enabled?
const DWORD n = (isCancellable() ? 2 : 1);
// convert timeout
DWORD t;
if (timeout < 0.0) {
t = INFINITE;
}
else {
t = (DWORD)(1000.0 * timeout);
}
// wait for this thread to be cancelled or for the target thread to
// terminate.
HANDLE handles[2];
handles[0] = target->getExitEvent();
handles[1] = m_cancel;
DWORD result = WaitForMultipleObjects(n, handles, FALSE, t);
// cancel takes priority
if (n == 2 && result != WAIT_OBJECT_0 + 1 &&
WaitForSingleObject(handles[1], 0) == WAIT_OBJECT_0) {
result = WAIT_OBJECT_0 + 1;
}
// handle result
switch (result) {
case WAIT_OBJECT_0 + 0:
// target thread terminated
return true;
case WAIT_OBJECT_0 + 1:
// this thread was cancelled. does not return.
testCancel();
default:
// timeout or error
return false;
}
}
bool
CThreadRep::waitForEvent(double timeout)
{
// check if messages are available first. if we don't do this then
// MsgWaitForMultipleObjects() will block even if the queue isn't
// empty if the messages in the queue were there before the last
// call to GetMessage()/PeekMessage().
if (HIWORD(GetQueueStatus(QS_ALLINPUT)) != 0) {
return true;
}
// is cancellation enabled?
const DWORD n = (isCancellable() ? 1 : 0);
// convert timeout
DWORD t;
if (timeout < 0.0) {
t = INFINITE;
}
else {
t = (DWORD)(1000.0 * timeout);
}
// wait for this thread to be cancelled or for the target thread to
// terminate.
HANDLE handles[1];
handles[0] = m_cancel;
DWORD result = MsgWaitForMultipleObjects(n, handles, FALSE, t, QS_ALLINPUT);
// handle result
switch (result) {
case WAIT_OBJECT_0 + 1:
// message is available
return true;
case WAIT_OBJECT_0 + 0:
// this thread was cancelled. does not return.
testCancel();
default:
// timeout or error
return false;
}
}
void
CThreadRep::setPriority(int n)
{
DWORD pClass = NORMAL_PRIORITY_CLASS;
if (n < 0) {
switch (-n) {
case 1: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
case 2: n = THREAD_PRIORITY_HIGHEST; break;
default:
pClass = HIGH_PRIORITY_CLASS;
switch (-n - 3) {
case 0: n = THREAD_PRIORITY_LOWEST; break;
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
case 2: n = THREAD_PRIORITY_NORMAL; break;
case 3: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
default: n = THREAD_PRIORITY_HIGHEST; break;
}
break;
}
}
else {
switch (n) {
case 0: n = THREAD_PRIORITY_NORMAL; break;
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
case 2: n = THREAD_PRIORITY_LOWEST; break;
default: n = THREAD_PRIORITY_IDLE; break;
}
}
SetPriorityClass(m_thread, pClass);
SetThreadPriority(m_thread, n);
}
HANDLE
CThreadRep::getExitEvent() const
{
// no lock necessary because the value never changes
return m_exit;
}
HANDLE
CThreadRep::getCancelEvent() const
{
// no lock necessary because the value never changes
return m_cancel;
}
unsigned int __stdcall
CThreadRep::threadFunc(void* arg)
{
CThreadRep* rep = (CThreadRep*)arg;
// run thread
rep->doThreadFunc();
// signal termination
SetEvent(rep->m_exit);
// unref thread
rep->unref();
// terminate the thread
return 0;
}
#endif // WINDOWS_LIKE