barrier/mt/CThreadRep.cpp

715 lines
13 KiB
C++
Raw Normal View History

2001-10-06 18:13:28 +04:00
#include "CThreadRep.h"
#include "CLock.h"
#include "CMutex.h"
#include "CThread.h"
#include "XThread.h"
2001-10-14 23:16:54 +04:00
#include "CLog.h"
2001-10-06 18:13:28 +04:00
#include "IJob.h"
#if HAVE_PTHREAD
# include <signal.h>
# define SIGWAKEUP SIGUSR1
#elif WINDOWS_LIKE
# if !defined(_MT)
# error multithreading compile option is required
# endif
# include <process.h>
#else
#error unsupported platform for multithreading
2001-10-06 18:13:28 +04:00
#endif
2001-10-06 18:13:28 +04:00
// FIXME -- temporary exception type
class XThreadUnavailable { };
//
// CThreadRep
//
CMutex* CThreadRep::s_mutex = NULL;
2001-10-06 18:13:28 +04:00
CThreadRep* CThreadRep::s_head = NULL;
#if HAVE_PTHREAD
pthread_t CThreadRep::s_signalThread;
#endif
2001-10-06 18:13:28 +04:00
CThreadRep::CThreadRep() :
m_prev(NULL),
m_next(NULL),
m_refCount(1),
m_job(NULL),
m_userData(NULL)
2001-10-06 18:13:28 +04:00
{
// note -- s_mutex must be locked on entry
assert(s_mutex != NULL);
2001-10-06 18:13:28 +04:00
// initialize stuff
init();
#if HAVE_PTHREAD
2001-10-06 18:13:28 +04:00
// get main thread id
m_thread = pthread_self();
#elif WINDOWS_LIKE
2001-10-06 18:13:28 +04:00
// get main thread id
m_thread = NULL;
m_id = GetCurrentThreadId();
#endif
// insert ourself into linked list
if (s_head != NULL) {
s_head->m_prev = this;
m_next = s_head;
}
s_head = this;
}
CThreadRep::CThreadRep(IJob* job, void* userData) :
m_prev(NULL),
m_next(NULL),
m_refCount(2), // 1 for us, 1 for thread
m_job(job),
m_userData(userData)
2001-10-06 18:13:28 +04:00
{
assert(m_job != NULL);
assert(s_mutex != NULL);
2001-10-06 18:13:28 +04:00
// create a thread rep for the main thread if the current thread
// is unknown. note that this might cause multiple "main" threads
// if threads are created external to this library.
getCurrentThreadRep()->unref();
// initialize
init();
// hold mutex while we create the thread
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
// start the thread. throw if it doesn't start.
#if HAVE_PTHREAD
// mask some signals in all threads except the main thread
sigset_t sigset, oldsigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
2001-10-06 18:13:28 +04:00
int status = pthread_create(&m_thread, NULL, threadFunc, (void*)this);
pthread_sigmask(SIG_SETMASK, &oldsigset, NULL);
if (status != 0) {
2001-10-06 18:13:28 +04:00
throw XThreadUnavailable();
}
#elif WINDOWS_LIKE
2001-10-06 18:13:28 +04:00
unsigned int id;
m_thread = reinterpret_cast<HANDLE>(_beginthreadex(NULL, 0,
threadFunc, (void*)this, 0, &id));
m_id = static_cast<DWORD>(id);
if (m_thread == 0) {
2001-10-06 18:13:28 +04:00
throw XThreadUnavailable();
}
2001-10-06 18:13:28 +04:00
#endif
// insert ourself into linked list
if (s_head != NULL) {
s_head->m_prev = this;
m_next = s_head;
}
s_head = this;
// returning releases the locks, allowing the child thread to run
}
CThreadRep::~CThreadRep()
{
// note -- s_mutex must be locked on entry
// remove ourself from linked list
if (m_prev != NULL) {
m_prev->m_next = m_next;
}
if (m_next != NULL) {
m_next->m_prev = m_prev;
}
if (s_head == this) {
s_head = m_next;
}
// clean up
fini();
}
void
CThreadRep::initThreads()
{
if (s_mutex == NULL) {
s_mutex = new CMutex;
#if HAVE_PTHREAD
// install SIGWAKEUP handler
struct sigaction act;
2001-10-21 04:21:02 +04:00
sigemptyset(&act.sa_mask);
# if defined(SA_INTERRUPT)
act.sa_flags = SA_INTERRUPT;
# else
act.sa_flags = 0;
# endif
2001-10-21 04:21:02 +04:00
act.sa_handler = &threadCancel;
sigaction(SIGWAKEUP, &act, NULL);
// set signal mask
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGWAKEUP);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
2001-10-21 04:21:02 +04:00
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
2001-10-21 04:21:02 +04:00
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
// fire up the INT and TERM signal handler thread
2002-06-09 21:35:28 +04:00
// FIXME -- i've seen this thread hanging around after the app
// asserted. should figure out how it stays alive and prevent
// it from happening.
int status = pthread_create(&s_signalThread, NULL,
&CThreadRep::threadSignalHandler,
getCurrentThreadRep());
if (status != 0) {
// can't create thread to wait for signal so don't block
// the signals.
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}
#endif // HAVE_PTHREAD
}
}
void
CThreadRep::ref()
2001-10-06 18:13:28 +04:00
{
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
++m_refCount;
}
void
CThreadRep::unref()
2001-10-06 18:13:28 +04:00
{
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
if (--m_refCount == 0) {
delete this;
}
}
bool
CThreadRep::enableCancel(bool enable)
2001-10-06 18:13:28 +04:00
{
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
const bool old = m_cancellable;
m_cancellable = enable;
return old;
}
bool
CThreadRep::isCancellable() const
2001-10-06 18:13:28 +04:00
{
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
return (m_cancellable && !m_cancelling);
}
void*
CThreadRep::getResult() const
2001-10-06 18:13:28 +04:00
{
// no lock necessary since thread isn't running
return m_result;
}
void*
CThreadRep::getUserData() const
2001-10-06 18:13:28 +04:00
{
// no lock necessary because the value never changes
return m_userData;
}
CThreadRep*
CThreadRep::getCurrentThreadRep()
2001-10-06 18:13:28 +04:00
{
assert(s_mutex != NULL);
#if HAVE_PTHREAD
2001-10-06 18:13:28 +04:00
const pthread_t thread = pthread_self();
#elif WINDOWS_LIKE
2001-10-06 18:13:28 +04:00
const DWORD id = GetCurrentThreadId();
#endif
// lock list while we search
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
// search
CThreadRep* scan = s_head;
while (scan != NULL) {
#if HAVE_PTHREAD
2001-10-06 18:13:28 +04:00
if (scan->m_thread == thread) {
break;
}
#elif WINDOWS_LIKE
2001-10-06 18:13:28 +04:00
if (scan->m_id == id) {
break;
}
#endif
scan = scan->m_next;
}
// create and use main thread rep if thread not found
if (scan == NULL) {
scan = new CThreadRep();
}
// ref for caller
++scan->m_refCount;
return scan;
}
void
CThreadRep::doThreadFunc()
2001-10-06 18:13:28 +04:00
{
// default priority is slightly below normal
setPriority(1);
// wait for parent to initialize this object
{ CLock lock(s_mutex); }
2001-10-06 18:13:28 +04:00
void* result = NULL;
try {
// go
m_job->run();
}
catch (XThreadCancel&) {
2001-10-06 18:13:28 +04:00
// client called cancel()
log((CLOG_DEBUG1 "caught cancel on thread %p", this));
2001-10-06 18:13:28 +04:00
}
catch (XThreadExit& e) {
// client called exit()
result = e.m_result;
log((CLOG_DEBUG1 "caught exit on thread %p", this));
2001-10-14 23:16:54 +04:00
}
catch (...) {
log((CLOG_DEBUG1 "exception on thread %p", this));
2001-10-14 23:16:54 +04:00
// note -- don't catch (...) to avoid masking bugs
delete m_job;
2001-10-14 23:16:54 +04:00
throw;
2001-10-06 18:13:28 +04:00
}
// done with job
delete m_job;
// store exit result (no lock necessary because the result will
// not be accessed until m_exit is set)
m_result = result;
}
#if HAVE_PTHREAD
2001-10-06 18:13:28 +04:00
#include "CStopwatch.h"
#if TIME_WITH_SYS_TIME
# include <sys/time.h>
# include <time.h>
#else
# if HAVE_SYS_TIME_H
# include <sys/time.h>
# else
# include <time.h>
# endif
#endif
2001-10-06 18:13:28 +04:00
void
CThreadRep::init()
2001-10-06 18:13:28 +04:00
{
m_result = NULL;
m_cancellable = true;
m_cancelling = false;
m_cancel = false;
m_exit = false;
}
void
CThreadRep::fini()
2001-10-06 18:13:28 +04:00
{
// main thread has NULL job
if (m_job != NULL) {
pthread_detach(m_thread);
}
}
void
CThreadRep::sleep(
double timeout)
2001-10-06 18:13:28 +04:00
{
if (timeout < 0.0) {
2001-10-06 18:13:28 +04:00
return;
}
2001-10-06 18:13:28 +04:00
struct timespec t;
t.tv_sec = (long)timeout;
t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec));
while (nanosleep(&t, &t) < 0)
testCancel();
2001-10-06 18:13:28 +04:00
}
void
CThreadRep::cancel()
2001-10-06 18:13:28 +04:00
{
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
if (m_cancellable && !m_cancelling) {
m_cancel = true;
}
else {
return;
}
// break out of system calls
log((CLOG_DEBUG1 "cancel thread %p", this));
pthread_kill(m_thread, SIGWAKEUP);
2001-10-06 18:13:28 +04:00
}
void
CThreadRep::testCancel()
2001-10-06 18:13:28 +04:00
{
{
CLock lock(s_mutex);
// done if not cancelled, not cancellable, or already cancelling
if (!m_cancel || !m_cancellable || m_cancelling) {
2001-10-06 18:13:28 +04:00
return;
}
2001-10-06 18:13:28 +04:00
// update state for cancel
m_cancel = false;
m_cancelling = true;
}
// start cancel
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
2001-10-06 18:13:28 +04:00
throw XThreadCancel();
}
bool
2002-06-17 17:31:21 +04:00
CThreadRep::wait(CThreadRep* target, double timeout)
2001-10-06 18:13:28 +04:00
{
if (target == this) {
2001-10-06 18:13:28 +04:00
return false;
}
2001-10-06 18:13:28 +04:00
testCancel();
if (target->isExited()) {
2001-10-06 18:13:28 +04:00
return true;
}
2001-10-06 18:13:28 +04:00
if (timeout != 0.0) {
2001-10-06 18:13:28 +04:00
CStopwatch timer;
do {
sleep(0.05);
testCancel();
if (target->isExited()) {
2001-10-06 18:13:28 +04:00
return true;
}
} while (timeout < 0.0 || timer.getTime() <= timeout);
2001-10-06 18:13:28 +04:00
}
return false;
}
void
2002-06-17 17:31:21 +04:00
CThreadRep::setPriority(int)
2001-10-06 18:13:28 +04:00
{
// FIXME
}
bool
CThreadRep::isExited() const
2001-10-06 18:13:28 +04:00
{
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
return m_exit;
}
void*
CThreadRep::threadFunc(void* arg)
2001-10-06 18:13:28 +04:00
{
CThreadRep* rep = (CThreadRep*)arg;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
// run thread
rep->doThreadFunc();
{
// mark as terminated
CLock lock(s_mutex);
rep->m_exit = true;
}
2001-10-06 18:13:28 +04:00
// unref thread
rep->unref();
// terminate the thread
return NULL;
}
void
2002-06-17 17:31:21 +04:00
CThreadRep::threadCancel(int)
2001-10-06 18:13:28 +04:00
{
// do nothing
}
void*
2002-06-17 17:31:21 +04:00
CThreadRep::threadSignalHandler(void* vrep)
{
CThreadRep* mainThreadRep = reinterpret_cast<CThreadRep*>(vrep);
// add signal to mask
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
// we exit the loop via thread cancellation in sigwait()
for (;;) {
// wait
int signal;
sigwait(&sigset, &signal);
// if we get here then the signal was raised. cancel the thread.
mainThreadRep->cancel();
}
}
#endif // HAVE_PTHREAD
#if WINDOWS_LIKE
2001-10-06 18:13:28 +04:00
void
CThreadRep::init()
2001-10-06 18:13:28 +04:00
{
m_result = NULL;
m_cancellable = true;
m_cancelling = false;
m_exit = CreateEvent(NULL, TRUE, FALSE, NULL);
m_cancel = CreateEvent(NULL, TRUE, FALSE, NULL);
}
void
CThreadRep::fini()
2001-10-06 18:13:28 +04:00
{
// destroy the events
CloseHandle(m_cancel);
CloseHandle(m_exit);
// close the handle (main thread has a NULL handle)
if (m_thread != NULL) {
CloseHandle(m_thread);
}
}
void
CThreadRep::sleep(
double timeout)
2001-10-06 18:13:28 +04:00
{
if (isCancellable()) {
2001-10-06 18:13:28 +04:00
WaitForSingleObject(m_cancel, (DWORD)(1000.0 * timeout));
}
else {
Sleep((DWORD)(1000.0 * timeout));
}
2001-10-06 18:13:28 +04:00
}
void
CThreadRep::cancel()
2001-10-06 18:13:28 +04:00
{
log((CLOG_DEBUG1 "cancel thread %p", this));
2001-10-06 18:13:28 +04:00
SetEvent(m_cancel);
}
void
CThreadRep::testCancel()
2001-10-06 18:13:28 +04:00
{
// poll cancel event. return if not set.
const DWORD result = WaitForSingleObject(getCancelEvent(), 0);
if (result != WAIT_OBJECT_0) {
2001-10-06 18:13:28 +04:00
return;
}
2001-10-06 18:13:28 +04:00
{
// ignore if disabled or already cancelling
CLock lock(s_mutex);
2001-10-06 18:13:28 +04:00
if (!m_cancellable || m_cancelling)
return;
// update state for cancel
m_cancelling = true;
ResetEvent(m_cancel);
}
// start cancel
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
2001-10-06 18:13:28 +04:00
throw XThreadCancel();
}
bool
2002-06-17 17:31:21 +04:00
CThreadRep::wait(CThreadRep* target, double timeout)
2001-10-06 18:13:28 +04:00
{
// get the current thread. if it's the same as the target thread
// then the thread is waiting on itself.
CThreadPtr currentRep(CThreadRep::getCurrentThreadRep());
if (target == this) {
2001-10-06 18:13:28 +04:00
return false;
}
2001-10-06 18:13:28 +04:00
// is cancellation enabled?
const DWORD n = (isCancellable() ? 2 : 1);
// convert timeout
DWORD t;
if (timeout < 0.0) {
2001-10-06 18:13:28 +04:00
t = INFINITE;
}
else {
2001-10-06 18:13:28 +04:00
t = (DWORD)(1000.0 * timeout);
}
2001-10-06 18:13:28 +04:00
// wait for this thread to be cancelled or for the target thread to
// terminate.
HANDLE handles[2];
handles[0] = target->getExitEvent();
handles[1] = m_cancel;
DWORD result = WaitForMultipleObjects(n, handles, FALSE, t);
2001-10-06 18:13:28 +04:00
// cancel takes priority
if (n == 2 && result != WAIT_OBJECT_0 + 1 &&
WaitForSingleObject(handles[1], 0) == WAIT_OBJECT_0) {
2001-10-06 18:13:28 +04:00
result = WAIT_OBJECT_0 + 1;
}
2001-10-06 18:13:28 +04:00
// handle result
switch (result) {
2002-04-29 18:40:01 +04:00
case WAIT_OBJECT_0 + 0:
2001-10-06 18:13:28 +04:00
// target thread terminated
return true;
2002-04-29 18:40:01 +04:00
case WAIT_OBJECT_0 + 1:
2001-10-06 18:13:28 +04:00
// this thread was cancelled. does not return.
testCancel();
2002-04-29 18:40:01 +04:00
default:
// timeout or error
return false;
}
}
bool
2002-06-17 17:31:21 +04:00
CThreadRep::waitForEvent(double timeout)
{
// is cancellation enabled?
const DWORD n = (isCancellable() ? 1 : 0);
// convert timeout
DWORD t;
if (timeout < 0.0) {
t = INFINITE;
}
else {
t = (DWORD)(1000.0 * timeout);
}
// wait for this thread to be cancelled or for the target thread to
// terminate.
HANDLE handles[1];
handles[0] = m_cancel;
DWORD result = MsgWaitForMultipleObjects(n, handles, FALSE, t, QS_ALLINPUT);
// handle result
switch (result) {
case WAIT_OBJECT_0 + 1:
// message is available
return true;
case WAIT_OBJECT_0 + 0:
// this thread was cancelled. does not return.
testCancel();
default:
// timeout or error
2001-10-06 18:13:28 +04:00
return false;
}
}
void
2002-06-17 17:31:21 +04:00
CThreadRep::setPriority(int n)
2001-10-06 18:13:28 +04:00
{
DWORD pClass = NORMAL_PRIORITY_CLASS;
2001-10-06 18:13:28 +04:00
if (n < 0) {
switch (-n) {
2002-04-29 18:40:01 +04:00
case 1: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
case 2: n = THREAD_PRIORITY_HIGHEST; break;
default:
pClass = HIGH_PRIORITY_CLASS;
switch (-n - 3) {
case 0: n = THREAD_PRIORITY_LOWEST; break;
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
case 2: n = THREAD_PRIORITY_NORMAL; break;
case 3: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
default: n = THREAD_PRIORITY_HIGHEST; break;
}
break;
2001-10-06 18:13:28 +04:00
}
}
else {
switch (n) {
2002-04-29 18:40:01 +04:00
case 0: n = THREAD_PRIORITY_NORMAL; break;
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
case 2: n = THREAD_PRIORITY_LOWEST; break;
default: n = THREAD_PRIORITY_IDLE; break;
2001-10-06 18:13:28 +04:00
}
}
SetPriorityClass(m_thread, pClass);
2001-10-06 18:13:28 +04:00
SetThreadPriority(m_thread, n);
}
HANDLE
CThreadRep::getExitEvent() const
2001-10-06 18:13:28 +04:00
{
// no lock necessary because the value never changes
return m_exit;
}
HANDLE
CThreadRep::getCancelEvent() const
2001-10-06 18:13:28 +04:00
{
// no lock necessary because the value never changes
return m_cancel;
}
unsigned int __stdcall
2002-06-17 17:31:21 +04:00
CThreadRep::threadFunc(void* arg)
2001-10-06 18:13:28 +04:00
{
CThreadRep* rep = (CThreadRep*)arg;
// run thread
rep->doThreadFunc();
// signal termination
SetEvent(rep->m_exit);
// unref thread
rep->unref();
// terminate the thread
return 0;
}
#endif // WINDOWS_LIKE