barrier/lib/mt/CThreadRep.cpp
2002-08-02 19:57:46 +00:00

749 lines
14 KiB
C++

/*
* synergy -- mouse and keyboard sharing utility
* Copyright (C) 2002 Chris Schoeneman
*
* This package is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* found in the file COPYING that should have accompanied this file.
*
* This package is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
#include "CThreadRep.h"
#include "CLock.h"
#include "CMutex.h"
#include "CThread.h"
#include "XThread.h"
#include "CLog.h"
#include "IJob.h"
#if HAVE_PTHREAD
# include <signal.h>
# define SIGWAKEUP SIGUSR1
#elif WINDOWS_LIKE
# if !defined(_MT)
# error multithreading compile option is required
# endif
# include <process.h>
#else
#error unsupported platform for multithreading
#endif
//
// CThreadRep
//
CMutex* CThreadRep::s_mutex = NULL;
CThreadRep* CThreadRep::s_head = NULL;
#if HAVE_PTHREAD
pthread_t CThreadRep::s_signalThread;
#endif
CThreadRep::CThreadRep() :
m_prev(NULL),
m_next(NULL),
m_refCount(1),
m_job(NULL),
m_userData(NULL)
{
// note -- s_mutex must be locked on entry
assert(s_mutex != NULL);
// initialize stuff
init();
#if HAVE_PTHREAD
// get main thread id
m_thread = pthread_self();
#elif WINDOWS_LIKE
// get main thread id
m_thread = NULL;
m_id = GetCurrentThreadId();
#endif
// insert ourself into linked list
if (s_head != NULL) {
s_head->m_prev = this;
m_next = s_head;
}
s_head = this;
}
CThreadRep::CThreadRep(IJob* job, void* userData) :
m_prev(NULL),
m_next(NULL),
m_refCount(2), // 1 for us, 1 for thread
m_job(job),
m_userData(userData)
{
assert(m_job != NULL);
assert(s_mutex != NULL);
// create a thread rep for the main thread if the current thread
// is unknown. note that this might cause multiple "main" threads
// if threads are created external to this library.
getCurrentThreadRep()->unref();
// initialize
init();
// hold mutex while we create the thread
CLock lock(s_mutex);
// start the thread. throw if it doesn't start.
#if HAVE_PTHREAD
// mask some signals in all threads except the main thread
sigset_t sigset, oldsigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
int status = pthread_create(&m_thread, NULL, threadFunc, (void*)this);
pthread_sigmask(SIG_SETMASK, &oldsigset, NULL);
if (status != 0) {
throw XThreadUnavailable();
}
#elif WINDOWS_LIKE
unsigned int id;
m_thread = reinterpret_cast<HANDLE>(_beginthreadex(NULL, 0,
threadFunc, (void*)this, 0, &id));
m_id = static_cast<DWORD>(id);
if (m_thread == 0) {
throw XThreadUnavailable();
}
#endif
// insert ourself into linked list
if (s_head != NULL) {
s_head->m_prev = this;
m_next = s_head;
}
s_head = this;
// returning releases the locks, allowing the child thread to run
}
CThreadRep::~CThreadRep()
{
// note -- s_mutex must be locked on entry
// remove ourself from linked list
if (m_prev != NULL) {
m_prev->m_next = m_next;
}
if (m_next != NULL) {
m_next->m_prev = m_prev;
}
if (s_head == this) {
s_head = m_next;
}
// clean up
fini();
}
void
CThreadRep::initThreads()
{
if (s_mutex == NULL) {
s_mutex = new CMutex;
#if HAVE_PTHREAD
// install SIGWAKEUP handler
struct sigaction act;
sigemptyset(&act.sa_mask);
# if defined(SA_INTERRUPT)
act.sa_flags = SA_INTERRUPT;
# else
act.sa_flags = 0;
# endif
act.sa_handler = &threadCancel;
sigaction(SIGWAKEUP, &act, NULL);
// set signal mask
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGWAKEUP);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
// fire up the INT and TERM signal handler thread. we could
// instead arrange to catch and handle these signals but
// we'd be unable to cancel the main thread since no pthread
// calls are allowed in a signal handler.
int status = pthread_create(&s_signalThread, NULL,
&CThreadRep::threadSignalHandler,
getCurrentThreadRep());
if (status != 0) {
// can't create thread to wait for signal so don't block
// the signals.
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}
#endif // HAVE_PTHREAD
}
}
void
CThreadRep::ref()
{
CLock lock(s_mutex);
++m_refCount;
}
void
CThreadRep::unref()
{
CLock lock(s_mutex);
if (--m_refCount == 0) {
delete this;
}
}
bool
CThreadRep::enableCancel(bool enable)
{
CLock lock(s_mutex);
const bool old = m_cancellable;
m_cancellable = enable;
return old;
}
bool
CThreadRep::isCancellable() const
{
CLock lock(s_mutex);
return (m_cancellable && !m_cancelling);
}
void*
CThreadRep::getResult() const
{
// no lock necessary since thread isn't running
return m_result;
}
void*
CThreadRep::getUserData() const
{
// no lock necessary because the value never changes
return m_userData;
}
CThreadRep*
CThreadRep::getCurrentThreadRep()
{
assert(s_mutex != NULL);
#if HAVE_PTHREAD
const pthread_t thread = pthread_self();
#elif WINDOWS_LIKE
const DWORD id = GetCurrentThreadId();
#endif
// lock list while we search
CLock lock(s_mutex);
// search
CThreadRep* scan = s_head;
while (scan != NULL) {
#if HAVE_PTHREAD
if (scan->m_thread == thread) {
break;
}
#elif WINDOWS_LIKE
if (scan->m_id == id) {
break;
}
#endif
scan = scan->m_next;
}
// create and use main thread rep if thread not found
if (scan == NULL) {
scan = new CThreadRep();
}
// ref for caller
++scan->m_refCount;
return scan;
}
void
CThreadRep::doThreadFunc()
{
// default priority is slightly below normal
setPriority(1);
// wait for parent to initialize this object
{ CLock lock(s_mutex); }
void* result = NULL;
try {
// go
log((CLOG_DEBUG1 "thread %p entry", this));
m_job->run();
log((CLOG_DEBUG1 "thread %p exit", this));
}
catch (XThreadCancel&) {
// client called cancel()
log((CLOG_DEBUG1 "caught cancel on thread %p", this));
}
catch (XThreadExit& e) {
// client called exit()
result = e.m_result;
log((CLOG_DEBUG1 "caught exit on thread %p", this));
}
catch (...) {
log((CLOG_DEBUG1 "exception on thread %p", this));
// note -- don't catch (...) to avoid masking bugs
delete m_job;
throw;
}
// done with job
delete m_job;
// store exit result (no lock necessary because the result will
// not be accessed until m_exit is set)
m_result = result;
}
#if HAVE_PTHREAD
#include "CStopwatch.h"
#if TIME_WITH_SYS_TIME
# include <sys/time.h>
# include <time.h>
#else
# if HAVE_SYS_TIME_H
# include <sys/time.h>
# else
# include <time.h>
# endif
#endif
void
CThreadRep::init()
{
m_result = NULL;
m_cancellable = true;
m_cancelling = false;
m_cancel = false;
m_exit = false;
}
void
CThreadRep::fini()
{
// main thread has NULL job
if (m_job != NULL) {
pthread_detach(m_thread);
}
}
void
CThreadRep::sleep(
double timeout)
{
if (timeout < 0.0) {
return;
}
struct timespec t;
t.tv_sec = (long)timeout;
t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec));
while (nanosleep(&t, &t) < 0)
testCancel();
}
void
CThreadRep::cancel()
{
CLock lock(s_mutex);
if (m_cancellable && !m_cancelling) {
m_cancel = true;
}
else {
return;
}
// break out of system calls
log((CLOG_DEBUG1 "cancel thread %p", this));
pthread_kill(m_thread, SIGWAKEUP);
}
void
CThreadRep::testCancel()
{
{
CLock lock(s_mutex);
// done if not cancelled, not cancellable, or already cancelling
if (!m_cancel || !m_cancellable || m_cancelling) {
return;
}
// update state for cancel
m_cancel = false;
m_cancelling = true;
}
// start cancel
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
throw XThreadCancel();
}
bool
CThreadRep::wait(CThreadRep* target, double timeout)
{
if (target == this) {
return false;
}
testCancel();
if (target->isExited()) {
return true;
}
if (timeout != 0.0) {
CStopwatch timer;
do {
sleep(0.05);
testCancel();
if (target->isExited()) {
return true;
}
} while (timeout < 0.0 || timer.getTime() <= timeout);
}
return false;
}
void
CThreadRep::setPriority(int)
{
// FIXME
}
bool
CThreadRep::isExited() const
{
CLock lock(s_mutex);
return m_exit;
}
void*
CThreadRep::threadFunc(void* arg)
{
CThreadRep* rep = (CThreadRep*)arg;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
// run thread
rep->doThreadFunc();
{
// mark as terminated
CLock lock(s_mutex);
rep->m_exit = true;
}
// unref thread
rep->unref();
// terminate the thread
return NULL;
}
void
CThreadRep::threadCancel(int)
{
// do nothing
}
void*
CThreadRep::threadSignalHandler(void* vrep)
{
CThreadRep* mainThreadRep = reinterpret_cast<CThreadRep*>(vrep);
// detach
pthread_detach(pthread_self());
// add signal to mask
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
sigaddset(&sigset, SIGTERM);
// also wait on SIGABRT. on linux (others?) this thread (process)
// will persist after all the other threads evaporate due to an
// assert unless we wait on SIGABRT. that means our resources (like
// the socket we're listening on) are not released and never will be
// until the lingering thread is killed. i don't know why sigwait()
// should protect the thread from being killed. note that sigwait()
// doesn't actually return if we receive SIGABRT and, for some
// reason, we don't have to block SIGABRT.
sigaddset(&sigset, SIGABRT);
// we exit the loop via thread cancellation in sigwait()
for (;;) {
// wait
int signal;
sigwait(&sigset, &signal);
// if we get here then the signal was raised. cancel the thread.
mainThreadRep->cancel();
}
}
#endif // HAVE_PTHREAD
#if WINDOWS_LIKE
void
CThreadRep::init()
{
m_result = NULL;
m_cancellable = true;
m_cancelling = false;
m_exit = CreateEvent(NULL, TRUE, FALSE, NULL);
m_cancel = CreateEvent(NULL, TRUE, FALSE, NULL);
}
void
CThreadRep::fini()
{
// destroy the events
CloseHandle(m_cancel);
CloseHandle(m_exit);
// close the handle (main thread has a NULL handle)
if (m_thread != NULL) {
CloseHandle(m_thread);
}
}
void
CThreadRep::sleep(
double timeout)
{
if (isCancellable()) {
WaitForSingleObject(m_cancel, (DWORD)(1000.0 * timeout));
}
else {
Sleep((DWORD)(1000.0 * timeout));
}
}
void
CThreadRep::cancel()
{
log((CLOG_DEBUG1 "cancel thread %p", this));
SetEvent(m_cancel);
}
void
CThreadRep::testCancel()
{
// poll cancel event. return if not set.
const DWORD result = WaitForSingleObject(getCancelEvent(), 0);
if (result != WAIT_OBJECT_0) {
return;
}
{
// ignore if disabled or already cancelling
CLock lock(s_mutex);
if (!m_cancellable || m_cancelling)
return;
// update state for cancel
m_cancelling = true;
ResetEvent(m_cancel);
}
// start cancel
log((CLOG_DEBUG1 "throw cancel on thread %p", this));
throw XThreadCancel();
}
bool
CThreadRep::wait(CThreadRep* target, double timeout)
{
// get the current thread. if it's the same as the target thread
// then the thread is waiting on itself.
CThreadPtr currentRep(CThreadRep::getCurrentThreadRep());
if (target == this) {
return false;
}
// is cancellation enabled?
const DWORD n = (isCancellable() ? 2 : 1);
// convert timeout
DWORD t;
if (timeout < 0.0) {
t = INFINITE;
}
else {
t = (DWORD)(1000.0 * timeout);
}
// wait for this thread to be cancelled or for the target thread to
// terminate.
HANDLE handles[2];
handles[0] = target->getExitEvent();
handles[1] = m_cancel;
DWORD result = WaitForMultipleObjects(n, handles, FALSE, t);
// cancel takes priority
if (n == 2 && result != WAIT_OBJECT_0 + 1 &&
WaitForSingleObject(handles[1], 0) == WAIT_OBJECT_0) {
result = WAIT_OBJECT_0 + 1;
}
// handle result
switch (result) {
case WAIT_OBJECT_0 + 0:
// target thread terminated
return true;
case WAIT_OBJECT_0 + 1:
// this thread was cancelled. does not return.
testCancel();
default:
// timeout or error
return false;
}
}
bool
CThreadRep::waitForEvent(double timeout)
{
// check if messages are available first. if we don't do this then
// MsgWaitForMultipleObjects() will block even if the queue isn't
// empty if the messages in the queue were there before the last
// call to GetMessage()/PeekMessage().
if (HIWORD(GetQueueStatus(QS_ALLINPUT)) != 0) {
return true;
}
// is cancellation enabled?
const DWORD n = (isCancellable() ? 1 : 0);
// convert timeout
DWORD t;
if (timeout < 0.0) {
t = INFINITE;
}
else {
t = (DWORD)(1000.0 * timeout);
}
// wait for this thread to be cancelled or for the target thread to
// terminate.
HANDLE handles[1];
handles[0] = m_cancel;
DWORD result = MsgWaitForMultipleObjects(n, handles, FALSE, t, QS_ALLINPUT);
// handle result
switch (result) {
case WAIT_OBJECT_0 + 1:
// message is available
return true;
case WAIT_OBJECT_0 + 0:
// this thread was cancelled. does not return.
testCancel();
default:
// timeout or error
return false;
}
}
void
CThreadRep::setPriority(int n)
{
DWORD pClass = NORMAL_PRIORITY_CLASS;
if (n < 0) {
switch (-n) {
case 1: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
case 2: n = THREAD_PRIORITY_HIGHEST; break;
default:
pClass = HIGH_PRIORITY_CLASS;
switch (-n - 3) {
case 0: n = THREAD_PRIORITY_LOWEST; break;
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
case 2: n = THREAD_PRIORITY_NORMAL; break;
case 3: n = THREAD_PRIORITY_ABOVE_NORMAL; break;
default: n = THREAD_PRIORITY_HIGHEST; break;
}
break;
}
}
else {
switch (n) {
case 0: n = THREAD_PRIORITY_NORMAL; break;
case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break;
case 2: n = THREAD_PRIORITY_LOWEST; break;
default: n = THREAD_PRIORITY_IDLE; break;
}
}
SetPriorityClass(m_thread, pClass);
SetThreadPriority(m_thread, n);
}
HANDLE
CThreadRep::getExitEvent() const
{
// no lock necessary because the value never changes
return m_exit;
}
HANDLE
CThreadRep::getCancelEvent() const
{
// no lock necessary because the value never changes
return m_cancel;
}
unsigned int __stdcall
CThreadRep::threadFunc(void* arg)
{
CThreadRep* rep = (CThreadRep*)arg;
// run thread
rep->doThreadFunc();
// signal termination
SetEvent(rep->m_exit);
// unref thread
rep->unref();
// terminate the thread
return 0;
}
#endif // WINDOWS_LIKE