open-source-search-engine/Threads.cpp
2021-05-06 01:52:55 +10:00

3368 lines
106 KiB
C++

#include "gb-include.h"
#include "BigFile.h"
#include "Threads.h"
#include "Errno.h"
#include "Loop.h"
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/types.h> // getuid()/pid_t/getpid()
#include <sys/wait.h> // waitpid()
#include "Rdb.h" // g_mergeUrgent
#include <sched.h> // clone()
//#include "Msg16.h" // g_pid g_ticker
#include "XmlDoc.h" // g_pid g_ticker
#include "Profiler.h"
#include "Stats.h"
#include "Process.h"
// try using pthreads again
//#define PTHREADS
// use these stubs so libplotter.a works
#ifndef PTHREADS
int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; }
int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; }
#else
#include <pthread.h>
//pthread_attr_t s_attr;
#endif
// main process id (or thread id if using pthreads)
//static pid_t s_pid = (pid_t) -1;
// on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit:
static pthread_t s_pid = (pthread_t) -1;
//pid_t getpidtid() {
// on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit:
pthread_t getpidtid() {
#ifdef PTHREADS
// gettid() is a bit newer so not in our libc32...
// so kinda fake it. return the "thread" id, not process id.
// Threads::amThread() should still work based on thread ids because
// the main process has a unique thread id as well.
return pthread_self();
#else
return (pthread_t)getpid();
#endif
}
// BIG PROBLEM:
// When doing pthread_join it doesn't always ensure a thread doesn't go
// zombie. It seems like when SIGIOs are generated by sigqueue() because
// of a full signal queue, threads start going zombie on me.
// PROBLEM #1: with using the "state" to identify a thread in the queue.
// Often a caller make s a disk access using a certain "state" var.
// He gets control back when cleanUp() calls t->m_callback. And he launches
// another read thread in there, which calls Threads::exit(state) before
// the original thread entry's m_isOccupied is set to false. So
// Threads::exit(state) mistakenly picks the thread entry from the former
// thread picks it has the same state as its thread!
// PROBLEM #2: technically, a thread in Threads::exit() can set its m_done
// bit then send the signal. When the Loop sig handler sees the done bit is
// set it decrements the global thread count even though the thread may
// not have actually exited yet!! I spawned like 1,000 threads this way!!!!!
// a global class extern'd in .h file
Threads g_threads;
// . call this before calling a ThreadEntry's m_startRoutine
// . allows us to set the priority of the thread
int startUp ( void *state ) ;
void *startUp2 ( void *state ) ;
// JAB: warning abatement
//static void launchThreadsWrapper ( int fd , void *state );
static void killStalledFiltersWrapper ( int fd , void *state );
static void makeCallback ( ThreadEntry *t ) ;
// is the caller a thread?
bool Threads::amThread () {
if ( s_pid == (pthread_t)-1 ) return false;
#ifdef PTHREADS
// gettid() is a bit newer so not in our libc32...
// so kinda fake it. return the "thread" id, not process id.
// Threads::amThread() should still work based on thread ids because
// the main process has a unique thread id as well.
return (pthread_self() != s_pid);
#else
return ( getpidtid() != s_pid );
#endif
}
#ifndef PTHREADS
static int32_t s_bad = 0;
static int32_t s_badPid = -1;
#endif
#define MAX_PID 32767
#ifndef PTHREADS
static int s_errno ;
static int s_errnos [ MAX_PID + 1 ];
// this was improvised from linuxthreads/errno.c
//#define CURRENT_STACK_FRAME ({ char __csf; &__csf; })
// WARNING: you MUST compile with -DREENTRANT for this to work
int *__errno_location (void) {
int32_t pid = (int32_t) getpid();
//if ( pid == s_pid ) return &g_errno;
if ( pid <= (int32_t)MAX_PID ) return &s_errnos[pid];
s_bad++;
s_badPid = pid;
return &s_errno;
}
#endif
// stack must be page aligned for mprotect
#define THRPAGESIZE 8192
// how much of stack to use as guard space
#define GUARDSIZE (32*1024)
// . crashed in saving with 800k, so try 1M
// . must be multiple of THRPAGESIZE
//#define STACK_SIZE ((512+256) * 1024)
// pthread_create() cores in calloc() if we don't make STACK_SIZE bigger:
#define STACK_SIZE ((512+256+1024) * 1024)
// jeta was having some problems, but i don't think they were related to
// this stack size of 512k, but i will try boosting to 800k anyway.
//#define STACK_SIZE (512 * 1024)
// 256k was not enough, try 512k
//#define STACK_SIZE (256 * 1024)
// at 128k (and even 200k) some threads do not return! why?? there's
// obviously some stack overwriting going on!
//#define STACK_SIZE (128 * 1024)
static char *s_stackAlloc = NULL;
static int32_t s_stackAllocSize;
//#ifndef PTHREADS
static char *s_stack = NULL;
static int32_t s_stackSize;
static char *s_stackPtrs [ MAX_STACKS ];
//#endif
static int32_t s_next [ MAX_STACKS ];
static int32_t s_head ;
// returns NULL if none left
int32_t Threads::getStack ( ) {
if ( s_head == -1 ) return -1;
int32_t i = s_head;
s_head = s_next [ s_head ];
return i;
}
void Threads::returnStack ( int32_t si ) {
if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; }
s_next[si] = s_head;
s_head = si;
}
void Threads::reset ( ) {
if ( s_stackAlloc ) {
mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE);
mfree ( s_stackAlloc , s_stackAllocSize,
"ThreadStack");
}
s_stackAlloc = NULL;
for ( int32_t i = 0 ; i < MAX_THREAD_QUEUES ; i++ )
m_threadQueues[i].reset();
}
Threads::Threads ( ) {
m_numQueues = 0;
m_initialized = false;
}
void Threads::setPid ( ) {
// set s_pid to the main process id
#ifdef PTHREADS
// on 64bit arch pid is 32 bit and pthread_t is 64 bit
s_pid = (pthread_t)pthread_self();
//log(LOG_INFO,
// "threads: main process THREAD id = %"UINT32"",(int32_t unsigned)s_pid);
pthread_t tid = pthread_self();
sched_param param;
int policy;
// scheduling parameters of target thread
pthread_getschedparam ( tid, &policy, &param);
//log(LOG_INFO,
// "threads: min/max thread priority settings = %"INT32"/%"INT32" (policy=%"INT32")",
// (int32_t)sched_get_priority_min(policy),
// (int32_t)sched_get_priority_max(policy),
// (int32_t)policy);
#else
s_pid = getpid();
#endif
}
bool Threads::init ( ) {
if ( m_initialized ) return true;
m_initialized = true;
m_needsCleanup = false;
//m_needBottom = false;
// sanity check
//if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; }
if ( sizeof(pid_t) > sizeof(pthread_t) ) { char *xx=NULL;*xx=0; }
setPid();
#ifdef _STACK_GROWS_UP
return log("thread: Stack growing up not supported.");
#endif
//g_conf.m_logDebugThread = true;
// . damn, this only applies to fork() calls, i guess
// . a quick a dirty way to restrict # of threads so we don't explode
/*
struct rlimit lim;
lim.rlim_max = 100;
if ( setrlimit(RLIMIT_NPROC,&lim) )
log("thread::init: setrlimit: %s", mstrerror(errno) );
else
log("thread::init: set max number of processes to 100");
*/
// allow threads until disabled
m_disabled = false;
/*
// # of low priority threads launched and returned
m_hiLaunched = 0;
m_hiReturned = 0;
m_loLaunched = 0;
m_loReturned = 0;
*/
//int32_t m_queryMaxBigDiskThreads ; // > 1M read
//int32_t m_queryMaxMedDiskThreads ; // 100k - 1M read
//int32_t m_queryMaxSmaDiskThreads ; // < 100k per read
// categorize the disk read sizes by these here
// g_conf.m_bigReadSize = 0x7fffffff;
// g_conf.m_medReadSize = 1000000;
// g_conf.m_smaReadSize = 100000;
// . register a sleep wrapper to launch threads every 30ms
// . sometimes a bunch of threads mark themselves as done and the
// cleanUp() handler sees them as all still launched so it doesn't
// launch any new ones
//if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper))
// return log("thread: Failed to initialize timer callback.");
if ( ! g_loop.registerSleepCallback(1000,NULL,
killStalledFiltersWrapper,0))
return log("thread: Failed to initialize timer callback2.");
// debug
//log("thread: main process has pid=%"INT32"",(int32_t)s_pid);
// . set priority of the main process to 0
// . setpriority() only applies to SCHED_OTHER threads
// . priority of threads with niceness 0 will be 0
// . priority of threads with niceness 1 will be 10
// . priority of threads with niceness 2 will be 20
// . see 'man sched_setscheduler' for detail scheduling info
// . no need to call getpid(), 0 for pid means the current process
#ifndef PTHREADS
if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 )
log("thread: Call to setpriority failed: %s.",
mstrerror(errno));
#endif
// make multiplier higher for raids, can do more seeks
//int32_t m = 1;
//#ifdef _LARS_
//m = 3;
//#endif
// register the types of threads here instead of in main.cpp
//if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/))
// try running blaster with 5 threads and you'll
// . see like a double degrade in performance for some reason!!
// . TODO: why?
// . well, this should be controlled g_conf.m_maxSpiderDiskThreads
// for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for
// niceness 0 and below disk threads
// . 100 maxThreads out at a time, 32000 can be queued
if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000))
return log("thread: Failed to register thread type." );
// . these are used by Msg5 to merge what it reads from disk
// . i raised it from 1 to 2 and got better response time from Msg10
// . i leave one open in case one is used for doing a big merge
// with high niceness cuz it would hold up high priority ones!
// . TODO: is there a better way? cancel it when UdpServer calls
// Threads::suspendLowPriorityThreads() ?
// . this used to be 2 but now defaults to 10 in Parms.cpp. i found
// i have less int32_t gray lines in the performance graph when i
// did that on trinity.
int32_t max2 = g_conf.m_maxCpuMergeThreads;
if ( max2 < 1 ) max2 = 1;
if ( ! g_threads.registerType ( MERGE_THREAD , max2,1000) )
return log("thread: Failed to register thread type." );
// will raising this from 1 to 2 make it faster too?
// i raised since global specs new servers have 2 (hyperthreaded?) cpus
int32_t max = g_conf.m_maxCpuThreads;
if ( max < 1 ) max = 1;
if ( ! g_threads.registerType ( INTERSECT_THREAD,max,10) )
return log("thread: Failed to register thread type." );
// filter thread spawned to call popen() to filter an http reply
if ( ! g_threads.registerType ( FILTER_THREAD, 2/*maxThreads*/,300) )
return log("thread: Failed to register thread type." );
// RdbTree uses this to save itself
if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) )
return log("thread: Failed to register thread type." );
// . File.cpp spawns a rename thread for doing renames and unlinks
// . doing a tight merge on titldb can be ~250 unlinks
// . MDW up from 1 to 30 max, after doing a ddump on 3000+ collections
// it was taking forever to go one at a time through the unlink
// thread queue. seemed like a 1 second space between unlinks.
// 1/23/1014
if ( ! g_threads.registerType ( UNLINK_THREAD,5/*maxThreads*/,3000) )
return log("thread: Failed to register thread type." );
// generic multipurpose
if ( ! g_threads.registerType (GENERIC_THREAD,20/*maxThreads*/,100) )
return log("thread: Failed to register thread type." );
// for call SSL_accept() which blocks for 10ms even when socket
// is non-blocking...
//if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100))
// return log("thread: Failed to register thread type." );
//#ifndef PTHREADS
// sanity check
if ( GUARDSIZE >= STACK_SIZE )
return log("thread: Stack guard size too big.");
// not more than this outstanding
int32_t maxThreads = 0;
for ( int32_t i = 0 ; i < m_numQueues ; i++ )
maxThreads += m_threadQueues[i].m_maxLaunched;
// limit to stack we got
if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS;
// allocate the stack space
s_stackAllocSize = STACK_SIZE * maxThreads + THRPAGESIZE ;
// clear stack to help check for overwrites
s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" );
if ( ! s_stackAlloc )
return log("thread: Unable to allocate %"INT32" bytes for thread "
"stacks.", s_stackAllocSize);
log(LOG_INIT,"thread: Using %"INT32" bytes for %"INT32" thread stacks.",
s_stackAllocSize,maxThreads);
// align
s_stack = (char *)(((uint64_t)s_stackAlloc+THRPAGESIZE-1)&~(THRPAGESIZE-1));
// new size
s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc);
// protect the whole stack while not in use
if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) )
log("thread: Call to mprotect failed: %s.",mstrerror(errno));
// test
//s_stack[0] = 1;
// init the linked list
for ( int32_t i = 0 ; i < MAX_STACKS ; i++ ) {
if ( i == MAX_STACKS - 1 ) s_next[i] = -1;
else s_next[i] = i + 1;
s_stackPtrs[i] = s_stack + STACK_SIZE * i;
}
s_head = 0;
// don't do real time stuff for now
return true;
//#else
// . keep stack size small, 128k
// . if we get problems, we'll increase this to 256k
// . seems like it grows dynamically from 4K to up to 2M as needed
//if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) )
// return log("thread: init: pthread_attr_setstacksize: %s",
// mstrerror(errno));
//pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED );
//pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM );
// return true;
//#endif
}
// all types should be registered in main.cpp before any threads launch
bool Threads::registerType ( char type , int32_t maxThreads , int32_t maxEntries ) {
// return false and set g_errno if no more room
if ( m_numQueues >= MAX_THREAD_QUEUES ) {
g_errno = EBUFTOOSMALL;
return log(LOG_LOGIC,"thread: registerType: Too many thread "
"queues");
}
// initialize the ThreadQueue class for this type
if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries))
return false;
// we got one more queue now
m_numQueues++;
return true;
}
int32_t Threads::getNumThreadsOutOrQueued() {
int32_t n = 0;
for ( int32_t i = 0 ; i < m_numQueues ; i++ ) {
// skip INTERSECT_THREAD, used to intersect termlists to
// resolve a query. i've seen these get stuck in an infinite
// loop sometimes.
if ( i == INTERSECT_THREAD ) continue;
// skip filter threads, those get stuck sometimes
if ( i == FILTER_THREAD ) continue;
// tally up all other threads
n += m_threadQueues[i].getNumThreadsOutOrQueued();
}
return n;
}
int32_t Threads::getNumWriteThreadsOut() {
return m_threadQueues[DISK_THREAD].getNumWriteThreadsOut();
}
int32_t Threads::getNumActiveWriteUnlinkRenameThreadsOut() {
// these do not countthreads that are done, and just awaiting join
int32_t n = m_threadQueues[DISK_THREAD].getNumWriteThreadsOut();
n += m_threadQueues[UNLINK_THREAD].getNumActiveThreadsOut();
return n;
}
// . returns false (and may set errno) if failed to launch a thread
// . returns true if thread added to queue successfully
// . may be launched instantly or later depending on # of threads in the queue
bool Threads::call ( char type ,
int32_t niceness ,
void *state ,
void (* callback )(void *state,ThreadEntry *t) ,
void *(* startRoutine)(void *state,ThreadEntry *t) ) {
// debug
//return false;
#ifdef _VALGRIND_
return false;
#endif
g_errno = 0;
// don't spawn any if disabled
if ( m_disabled ) return false;
if ( ! g_conf.m_useThreads ) return false;
if ( type == DISK_THREAD && ! g_conf.m_useThreadsForDisk )
return false;
if ( type == MERGE_THREAD && ! g_conf.m_useThreadsForIndexOps )
return false;
if ( type == INTERSECT_THREAD && ! g_conf.m_useThreadsForIndexOps )
return false;
if ( type == FILTER_THREAD && ! g_conf.m_useThreadsForSystemCalls )
return false;
if ( ! m_initialized && ! init() )
return log("db: Threads init failed." );
// . sanity check
// . a thread can NOT call this
//if ( getpid() != s_pid ) {
// fprintf(stderr,"thread: call: bad engineer\n");
// ::exit(-1);
//}
// don't launch for now
//return false;
// . sanity check
// . ensure top 4 bytes of state is the callback
//if ( *(int32_t *)state != (int32_t)callback ) {
// g_errno = EBADENGINEER;
// sleep(50000);
// return log("thread: call: top 4 bytes of state != callback");
//}
// debug msg
//log("adding thread to queue, type=%"INT32"",(int32_t)type);
// find the type
int32_t i;
for ( i = 0 ; i < m_numQueues ; i++ )
if ( m_threadQueues[i].m_threadType == type ) break;
// bitch if type not added via registerType() call
if ( i == m_numQueues ) {
g_errno = EBADENGINEER;
return log(LOG_LOGIC,"thread: addtoqueue: Unregistered "
"thread type %"INT32"",(int32_t)type);
}
// debug msg
//log("thread: call: adding entry for thread");
// . add to this queue
// . returns NULL and sets g_errno on error
ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state,
callback,startRoutine);
if ( ! t ) return log("thread: Failed to add entry to thread pool: "
"%s.",mstrerror(g_errno));
// debug msg
//log("added");
// clear g_errno
//g_errno = 0;
// . try to launch as many threads as we can
// . this sets g_errno on error
// . if it has an error, just ignore it, our thread is queued
launchLoop:
if ( m_threadQueues[i].launchThread2 ( ) )
goto launchLoop;
//if ( ! m_threadQueues[i].launchThread2 ( t ) && g_errno ) {
// log("thread: failed thread launch: %s",mstrerror(g_errno));
// return false;
//}
// return false if there was an error launching the thread
//if ( g_errno ) return false;
// clear g_errno
g_errno = 0;
// success
return true;
}
// static void launchThreadsWrapper ( int fd , void *state ) {
// // debug msg
// //if ( g_conf.m_timingDebugEnabled )
// // log("thread: launchThreadsWrapper: entered");
// // clean up
// g_threads.cleanUp(NULL,1000);
// // and launch
// g_threads.launchThreads();
// }
static void killStalledFiltersWrapper ( int fd , void *state ) {
// bail if no pid
if ( g_pid == -1 ) return;
// . only kill after ticker reaches a count of 30
// . we are called once every second, so inc it each time
int32_t timeout = g_filterTimeout;
if ( timeout <= 0 ) timeout = 30;
if ( g_ticker++ < timeout ) return;
// debug
log("threads: killing stalled filter process of age %"INT32" "
"seconds and pid=%"INT32".",g_ticker,(int32_t)g_pid);
// kill him
int err = kill ( g_pid , 9 );
// don't kill again
g_pid = -1;
if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) );
}
// . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it
// is from when a thread exited
// . we put that signal there using sigqeueue() in Threads::exit()
// . this way another thread can be launched right away
int32_t Threads::launchThreads ( ) {
// stop launching threads if trying to exit.
// only launch save tree threads. so if in the middle of saving
// we allow it to complete?
if ( g_process.m_mode == EXIT_MODE )
return 0;
// try launching from each queue
int32_t numLaunched = 0;
// try to launch DISK threads last so cpu-based threads get precedence
for ( int32_t i = m_numQueues - 1 ; i >= 0 ; i-- ) {
// clear g_errno
g_errno = 0;
// launch as many threads as we can from queue #i
while ( m_threadQueues[i].launchThread2( ) )
numLaunched++;
// continue if no g_errno set
if ( ! g_errno ) continue;
// otherwise bitch about it
log("thread: Failed to launch thread: %s.",mstrerror(g_errno));
}
// clear g_errno
g_errno = 0;
return numLaunched;
}
// . will cancel currently running low priority threads
// . will prevent any low priority threads from launching
// . will only cancel disk threads for now
void Threads::suspendLowPriorityThreads() {
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS.");
// just cancel disk threads for now
for ( int32_t i = 0 ; i < m_numQueues; i++ )
m_threadQueues[i].suspendLowPriorityThreads();
}
void Threads::resumeLowPriorityThreads() {
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS.");
for ( int32_t i = 0 ; i < m_numQueues; i++ )
m_threadQueues[i].resumeLowPriorityThreads();
}
//void Threads::cancelLowPriorityThreads () {
// for ( int32_t i = 0 ; i < m_numQueues; i++ )
// m_threadQueues[i].cancelLowPriorityThreads();
//}
void checkList ( ThreadEntry **headPtr , ThreadEntry **tailPtr ) {
ThreadEntry *t = *headPtr;
// another check
if ( tailPtr && *headPtr && ! *tailPtr ) { char *xx=NULL;*xx=0; }
if ( tailPtr && ! *headPtr && *tailPtr ) { char *xx=NULL;*xx=0; }
if ( ! t ) return;
// head can not have a prev
if ( t->m_prevLink ) { char *xx=NULL;*xx=0; }
ThreadEntry *last = NULL;
for ( ; t ; t = t->m_nextLink ) {
if ( t->m_prevLink != last ) { char *xx=NULL;*xx=0; }
if ( last && last->m_nextLink != t ) { char *xx=NULL;*xx=0; }
last = t;
}
if ( ! tailPtr ) return;
t = *tailPtr;
// tail can not have a next
if ( t->m_nextLink ) { char *xx=NULL;*xx=0; }
last = NULL;
for ( ; t ; t = t->m_prevLink ) {
if ( t->m_nextLink != last ) { char *xx=NULL;*xx=0; }
if ( last && last->m_prevLink != t ) { char *xx=NULL;*xx=0; }
last = t;
}
}
// remove from a head-only linked list
void removeLink ( ThreadEntry **headPtr , ThreadEntry *t ) {
// if ( g_conf.m_logDebugThread )
// logf(LOG_DEBUG,"thread: removeLink [t=0x%"PTRFMT"]",
// (PTRTYPE)t);
if ( g_conf.m_logDebugThread )
checkList ( headPtr , NULL );
if ( t->m_prevLink ) t->m_prevLink->m_nextLink = t->m_nextLink;
else *headPtr = t->m_nextLink;
if ( t->m_nextLink ) t->m_nextLink->m_prevLink = t->m_prevLink;
t->m_nextLink = NULL;
t->m_prevLink = NULL;
if ( g_conf.m_logDebugThread )
checkList ( headPtr , NULL );
}
// MDW: verify this::::: and the one above!!!!!!!!!!!!!!!!
void removeLink2 ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ,
ThreadEntry *t ) {
// if ( g_conf.m_logDebugThread )
// logf(LOG_DEBUG,"thread: removeLink2 [t=0x%"PTRFMT"]",
// (PTRTYPE)t);
if ( g_conf.m_logDebugThread )
checkList ( headPtr , tailPtr );
if ( t->m_prevLink ) t->m_prevLink->m_nextLink = t->m_nextLink;
else *headPtr = t->m_nextLink;
if ( t->m_nextLink ) t->m_nextLink->m_prevLink = t->m_prevLink;
else *tailPtr = t->m_prevLink;
t->m_nextLink = NULL;
t->m_prevLink = NULL;
if ( g_conf.m_logDebugThread )
checkList ( headPtr , tailPtr );
}
// add to a head/tail linked list's tail
void addLinkToTail ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ,
ThreadEntry *t ) {
// if ( g_conf.m_logDebugThread )
// logf(LOG_DEBUG,"thread: addLinkToTail [t=0x%"PTRFMT"]",
// (PTRTYPE)t);
if ( g_conf.m_logDebugThread )
checkList ( headPtr , tailPtr );
if ( *tailPtr ) {
(*tailPtr)->m_nextLink = t;
t->m_nextLink = NULL;
t->m_prevLink = *tailPtr;
*tailPtr = t; // t is the new tail
}
else {
*headPtr = t;
*tailPtr = t;
t->m_nextLink = NULL;
t->m_prevLink = NULL;
}
if ( g_conf.m_logDebugThread )
checkList ( headPtr , tailPtr );
}
// add to a head/tail linked list's head
void addLinkToHead ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ,
ThreadEntry *t ) {
// if ( g_conf.m_logDebugThread )
// logf(LOG_DEBUG,"thread: addLinkToHead [t=0x%"PTRFMT"]",
// (PTRTYPE)t);
if ( g_conf.m_logDebugThread )
checkList ( headPtr , tailPtr );
if ( *headPtr ) {
(*headPtr)->m_prevLink = t;
t->m_prevLink = NULL;
t->m_nextLink = *headPtr;
*headPtr = t; // t is the new head
}
else {
*headPtr = t;
*tailPtr = t;
t->m_nextLink = NULL;
t->m_prevLink = NULL;
}
if ( g_conf.m_logDebugThread )
checkList ( headPtr , tailPtr );
}
// add to a head-only linked list
void addLink ( ThreadEntry **headPtr ,
ThreadEntry *t ) {
// if ( g_conf.m_logDebugThread )
// logf(LOG_DEBUG,"thread: addLink [t=0x%"PTRFMT"]",
// (PTRTYPE)t);
if ( g_conf.m_logDebugThread )
checkList ( headPtr , NULL );
if ( *headPtr ) {
(*headPtr)->m_prevLink = t;
t->m_prevLink = NULL;
t->m_nextLink = *headPtr;
*headPtr = t; // t is the new head
}
else {
*headPtr = t;
t->m_nextLink = NULL;
t->m_prevLink = NULL;
}
if ( g_conf.m_logDebugThread )
checkList ( headPtr , NULL );
}
///////////////////////////////////////////////////////////////////////
// functions for ThreadQueue
///////////////////////////////////////////////////////////////////////
ThreadQueue::ThreadQueue ( ) {
m_entries = NULL;
m_entriesSize = 0;
}
void ThreadQueue::reset ( ) {
if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" );
m_entries = NULL;
m_top = 0;
}
bool ThreadQueue::init ( char threadType, int32_t maxThreads, int32_t maxEntries ) {
m_threadType = threadType;
m_launched = 0;
m_returned = 0;
m_maxLaunched = maxThreads;
// # of low priority threads launched and returned
/*
m_hiLaunched = 0;
m_hiReturned = 0;
m_mdLaunched = 0;
m_mdReturned = 0;
m_loLaunched = 0;
m_loReturned = 0;
// we now count write threads so we can limit that
m_writesLaunched = 0;
m_writesReturned = 0;
// these are for disk threads, which we now limit based on read sizes
m_hiLaunchedBig = 0;
m_hiReturnedBig = 0;
m_mdLaunchedBig = 0;
m_mdReturnedBig = 0;
m_loLaunchedBig = 0;
m_loReturnedBig = 0;
m_hiLaunchedMed = 0;
m_hiReturnedMed = 0;
m_mdLaunchedMed = 0;
m_mdReturnedMed = 0;
m_loLaunchedMed = 0;
m_loReturnedMed = 0;
m_hiLaunchedSma = 0;
m_hiReturnedSma = 0;
m_mdLaunchedSma = 0;
m_mdReturnedSma = 0;
m_loLaunchedSma = 0;
m_loReturnedSma = 0;
*/
//m_entriesUsed = 0;
//m_top = 0;
m_isLowPrioritySuspended = false;
// alloc space for entries
m_maxEntries = maxEntries;
m_entriesSize = sizeof(ThreadEntry)*m_maxEntries;
m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" );
if ( ! m_entries ) return log("thread: Failed to allocate %"INT32" bytes "
"for thread queue.",m_entriesSize);
// debug msg
//log("INIT CALLED. setting all m_isDone to 1.");
// clear m_isOccupied et al for new guys
//for ( int32_t i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) {
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
m_entries[i].m_isOccupied = false;
m_entries[i].m_isLaunched = false;
m_entries[i].m_isDone = true;
m_entries[i].m_qnum = threadType;
m_entries[i].m_stack = NULL;
}
m_emptyHead = NULL;
m_waitHead0 = NULL;
m_waitHead1 = NULL;
m_waitHead2 = NULL;
m_waitHead3 = NULL;
m_waitHead4 = NULL;
m_waitHead5 = NULL;
m_waitHead6 = NULL;
m_waitTail0 = NULL;
m_waitTail1 = NULL;
m_waitTail2 = NULL;
m_waitTail3 = NULL;
m_waitTail4 = NULL;
m_waitTail5 = NULL;
m_waitTail6 = NULL;
m_launchedHead = NULL;
// do not spam the log with log debug msgs even if it is on
char debug = g_conf.m_logDebugThread;
g_conf.m_logDebugThread = 0;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *t = &m_entries[i];
t->m_prevLink = NULL;
t->m_nextLink = NULL;
addLink ( &m_emptyHead , t );
}
g_conf.m_logDebugThread = debug;
return true;
}
int32_t ThreadQueue::getNumActiveThreadsOut() {
int32_t n = 0;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( ! e->m_isOccupied ) continue;
if ( ! e->m_isLaunched ) continue;
// if it is done and just waiting for a join, do not count
if ( e->m_isDone ) continue;
n++;
}
return n;
}
int32_t ThreadQueue::getNumThreadsOutOrQueued() {
// MDW: we also need to count threads that are returned but need their
// callback called so, in the case of RdbDump, the rdblist that was written
// to disk can update the rdbmap before it gets saved, so it doesn't get
// out of sync. Process.cpp calls .suspendMerge() to make sure that all
// merge operations are suspended as well.
int32_t n = 0;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( e->m_isOccupied ) n++;
}
return n;
/*
int32_t n = m_launched - m_returned;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( ! e->m_isOccupied ) continue;
if ( e->m_isLaunched ) continue;
if ( e->m_isDone ) continue;
// do not count "reads", only count writes
//if ( m_threadType == DISK_THREAD && e->m_state ) {
// FileState *fs = (FileState *)e->m_state;
// if ( fs->m_doWrite ) continue;
//}
}
return n;
*/
}
int32_t ThreadQueue::getNumWriteThreadsOut () {
// only consider disk threads
if ( m_threadType != DISK_THREAD ) return 0;
int32_t n = 0;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( ! e->m_isOccupied ) continue;
if ( ! e->m_isLaunched ) continue;
if ( e->m_isDone ) continue;
FileState *fs = (FileState *)e->m_state;
if ( ! fs ) continue;
if ( ! fs->m_doWrite ) continue;
n++;
}
return n;
}
// return NULL and set g_errno on error
ThreadEntry *ThreadQueue::addEntry ( int32_t niceness ,
void *state ,
void (* callback )(void *state,
ThreadEntry *t) ,
void *(* startRoutine)(void *state,
ThreadEntry *t) ) {
// if we are 90% full and niceness is > 0, knock it off
// int32_t max = m_maxEntries;
// if ( m_threadType == DISK_THREAD && niceness > 0 ) {
// max = (m_maxEntries * 90) / 100;
// if ( max <= 0 ) max = 1;
// }
ThreadEntry *t = m_emptyHead;
if ( ! t ) {
g_errno = ENOTHREADSLOTS;
static time_t s_time = 0;
time_t now = getTime();
if ( now - s_time > 5 ) {
log("thread: Could not add thread to queue. Already "
"have %"INT32" entries.",m_maxEntries);
s_time = now;
}
return NULL;
}
// sanity
if ( t->m_isOccupied ) { char *xx=NULL;*xx=0; }
// debug test
//if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; }
// get first available entry, not in use
//int32_t i;
//for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ )
// for ( i = 0 ; i < max ; i++ )
// if ( ! m_entries[i].m_isOccupied ) break;
// caution
// //if ( i >= MAX_THREAD_ENTRIES ) {
// if ( i >= max ) {
// g_errno = ENOTHREADSLOTS;
// static time_t s_time = 0;
// time_t now = getTime();
// if ( now - s_time > 5 ) {
// log("thread: Could not add thread to queue. Already "
// "have %"INT32" entries.",max);
// s_time = now;
// }
// return NULL;
// }
// debug msg
//fprintf(stderr,"addEntry my pid=%"UINT32"\n", (int32_t)getpid() );
// get an available entry
//ThreadEntry *t = &m_entries [ i ];
// debug msg
//log("claiming entry state=%"UINT32", occupied=%"INT32"",(int32_t)t->m_state,
// (int32_t)t->m_isOccupied);
// stick it in
t->m_niceness = niceness;
t->m_state = state;
t->m_callback = callback;
t->m_startRoutine = startRoutine;
t->m_isOccupied = true;
t->m_isCancelled = false;
t->m_stack = NULL;
// debug msg
//log("setting t=%"UINT32" m_isDone to 0", (int32_t)t );
t->m_isDone = false;
t->m_isLaunched = false;
t->m_queuedTime = gettimeofdayInMilliseconds();
t->m_readyForBail = false;
t->m_allocBuf = NULL;
t->m_allocSize = 0;
t->m_errno = 0;
t->m_bestHeadPtr = NULL;
t->m_bestTailPtr = NULL;
// and when the ohcrap callback gets called and the thread
// is cleaned up it will check the FileState readsize and
// m_isWrite to see which launch counts to decrement, so
// since FileState will be corrupted, we need to store
// this info directly into the thread entry.
if ( m_threadType == DISK_THREAD && t->m_state ) {
FileState *fs = (FileState *)t->m_state;
t->m_bytesToGo = fs->m_bytesToGo;
t->m_doWrite = fs->m_doWrite ;
}
else {
t->m_bytesToGo = 0;
t->m_doWrite = false;
}
// inc the used count
//m_entriesUsed++;
// debug msg
//log("m_entriesUsed now %"INT32"",m_entriesUsed);
// might have to inc top as well
//if ( i == m_top ) m_top++;
// there's only two linked lists we can wait in if we are not disk
if ( m_threadType != DISK_THREAD ) {
ThreadEntry **bestHeadPtr = NULL;
ThreadEntry **bestTailPtr = NULL;
if ( niceness <= 0 ) {
bestHeadPtr = &m_waitHead0;
bestTailPtr = &m_waitTail0;
}
// 'merge' threads from disk merge ops have niceness 1
else if ( niceness == 1 ) {
bestHeadPtr = &m_waitHead1;
bestTailPtr = &m_waitTail1;
}
// niceness is 2? MAX_NICENESS
else {
bestHeadPtr = &m_waitHead2;
bestTailPtr = &m_waitTail2;
}
// remove from empty list
removeLink ( &m_emptyHead , t );
// add to waiting list at the end
addLinkToTail ( bestHeadPtr , bestTailPtr , t );
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"queued %s thread for launch1. "
"niceness=%"INT32". ", (PTRTYPE)t,
getThreadType(), (int32_t)niceness );
return t;
}
//
// otherwise we are a disk thread, there's 7 linked lists to use
//
// shortcuts
//int32_t rs = t->m_bytesToGo;
char nice = t->m_niceness;
// get best thread candidate from best linked list of candidates
ThreadEntry **bestHeadPtr = NULL;
ThreadEntry **bestTailPtr = NULL;
// short/med/long high priority (niceness 0) disk reads in head0/1/2
// but we can't launch one more if already at our quota.
if ( ! bestHeadPtr && nice == 0 ) { //&& rs <= g_conf.m_smaReadSize ) {
bestHeadPtr = &m_waitHead0;
bestTailPtr = &m_waitTail0;
}
// if ( ! bestHeadPtr && nice == 0 && rs <= g_conf.m_medReadSize ) {
// bestHeadPtr = &m_waitHead1;
// bestTailPtr = &m_waitTail1;
// }
// if ( ! bestHeadPtr && nice == 0 ) {
// bestHeadPtr = &m_waitHead2;
// bestTailPtr = &m_waitTail2;
// }
// low priority (merge or dump) disk WRITES go in waithead4
if ( ! bestHeadPtr && t->m_doWrite ) {
bestHeadPtr = &m_waitHead4;
bestTailPtr = &m_waitTail4;
}
// niceness 1 read threads here
if ( ! bestHeadPtr && nice == 1 ) {
bestHeadPtr = &m_waitHead5;
bestTailPtr = &m_waitTail5;
}
// niceness 2 read threads here
if ( ! bestHeadPtr ) {
bestHeadPtr = &m_waitHead6;
bestTailPtr = &m_waitTail6;
}
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"remove from empty list, add to wait list",
(PTRTYPE)t);
// remove from empty list
removeLink ( &m_emptyHead , t );
// sanity
if ( m_emptyHead == t ) { char *xx=NULL;*xx=0; }
// add to the new waiting list at the end
addLinkToTail ( bestHeadPtr , bestTailPtr , t );
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"queued %s thread for launch2. "
"niceness=%"INT32". ", (PTRTYPE)t,
getThreadType(), (int32_t)niceness );
// success
return t;
}
int32_t Threads::timedCleanUp (int32_t maxTime, int32_t niceness) {
// skip it if exiting
if ( g_process.m_mode == EXIT_MODE )
return 0;
if ( ! m_needsCleanup ) return 0;
//if ( g_inSigHandler ) return 0;
int64_t startTime = gettimeofdayInMillisecondsLocal();
int64_t took = 0;
if ( niceness >= MAX_NICENESS ) m_needsCleanup = false;
//for ( int32_t i = -1 ; i <= niceness ; i++ ) {
for ( int32_t i = 0 ; i <= niceness ; i++ ) {
for ( int32_t j = 0 ; j < m_numQueues ; j++ )
m_threadQueues[j].timedCleanUp ( niceness );
launchThreads();
if ( maxTime < 0 ) continue;
took = startTime - gettimeofdayInMillisecondsLocal();
if ( took <= maxTime ) continue;
// ok, we have to cut if short...
m_needsCleanup = true;
break;
}
return took;
}
bool Threads::isHittingFile ( BigFile *bf ) {
return m_threadQueues[DISK_THREAD].isHittingFile(bf);
}
bool ThreadQueue::isHittingFile ( BigFile *bf ) {
// loop through candidates
for ( int32_t i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// must not be done
if ( t->m_isDone ) continue;
// must be launched.. really??
//if ( ! t->m_isLaunched ) continue;
// must be a read
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
// int16_tcut
FileState *fs = (FileState *)t->m_state;
// get bigfile ptr
if ( fs->m_this == bf ) return true;
}
return false;
}
void Threads::bailOnReads ( ) {
m_threadQueues[DISK_THREAD].bailOnReads();
}
// Process.cpp calls these callbacks before their time in order to
// set EDISKSTUCK
void ThreadQueue::bailOnReads ( ) {
// note it
log("threads: bypassing read threads");
ThreadEntry *t = m_launchedHead;
ThreadEntry *nextLink = NULL;
// loop through candidates
//for ( int32_t i = 0 ; i < m_top; i++ ) {
for ( ; t ; t = nextLink ) {
// do it here in case we modify the linked list below
nextLink = t->m_nextLink;
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
//if ( ! t->m_isLaunched ) continue;
// must be niceness 0
if ( t->m_niceness != 0 ) continue;
// must not be done
if ( t->m_isDone ) continue;
// must not have already called callback
if ( t->m_callback == ohcrap ) continue;
// must be a read
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
// int16_tcut
FileState *fs = (FileState *)t->m_state;
// do not stop writes
if ( fs->m_doWrite ) continue;
// must be niceness 0 too!
if ( fs->m_niceness != 0 ) continue;
// what is this? unlaunched...
//if ( t->m_pid == 0 ) continue;
// can only bail on a thread after it copies its FileState
// class into its stack so we can bypass it and free the
// original FileState without causing a core. if thread
// is not yet launched we have to call the callback here too
// otherwise it never gets launched until the disk is unstuck!
if ( ! t->m_readyForBail && t->m_isLaunched ) continue;
// set error
t->m_errno = EDISKSTUCK;
// set this too
g_errno = EDISKSTUCK;
// do not allow caller to free the alloc'd buf in case
// its read finally comes through!
t->m_allocBuf = fs->m_allocBuf;
t->m_allocSize = fs->m_allocSize;
fs->m_allocBuf = NULL;
fs->m_allocSize = 0;
// call it
t->m_callback ( t->m_state , t );
// do not re-call it...
t->m_callback = ohcrap;
// invalidate state (FileState usually)
t->m_state = NULL;
// do not launch if not yet launched
if ( t->m_isLaunched ) continue;
// delete him if not yet launched, otherwise we try to
// launch it later with a corrupted/unstable FileState...
// and that causes our launch counts to get out of whack i
// think...
t->m_isOccupied = false;
// note it
log("threads: bailing unlaunched thread");
// remove from linked list then
removeLink ( &m_launchedHead , t );
addLink ( &m_emptyHead , t );
// do we have to decrement top
// if ( m_top == i + 1 )
// while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
// m_top--;
}
}
// BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to
// ohcrap() because it was taking too long to do its read and we prematurely
// called its callback above in bailOnReads(). In that case we still have to
// free the disk read buffer which was never used. And doneWrapper() in
// BigFile.cpp is never called.
void ohcrap ( void *state , ThreadEntry *t ) {
// free the read buffer here then
if ( t->m_allocBuf )
mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" );
log("threads: got one");
}
// . cleans up any threads that have exited
// . their m_isDone should be set to true
// . don't process threads whose niceness is > maxNiceness
// . return true if we cleaned one up
bool ThreadQueue::timedCleanUp ( int32_t maxNiceness ) {
// top:
int32_t numCallbacks = 0;
ThreadEntry *t = m_launchedHead;
ThreadEntry *nextLink = NULL;
// loop through candidates
for ( ; t ; t = nextLink ) {
// get it here in case we remove 't' from the linked list below
nextLink = t->m_nextLink;
// point to it
//ThreadEntry *t = &m_entries[i];
// skip if not qualified
if ( t->m_niceness > maxNiceness ) continue;
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
if ( ! t->m_isLaunched ) continue;
// . we were segfaulting right here before because the thread
// was setting t->m_pid and at this point it was not
// set so t->m_pid was a bogus value
// . thread may have seg faulted, in which case sigbadhandler()
// in Loop.cpp will get it and set errno to 0x7fffffff
#ifndef PTHREADS
// MDW: i hafta take this out because the errno_location thing
// is not working on the newer gcc
if ( ! t->m_isDone && t->m_pid >= 0 &&
s_errnos [t->m_pid] == 0x7fffffff ) {
log("thread: Got abnormal thread termination. Seems "
"like the thread might have cored.");
s_errnos[t->m_pid] = 0;
goto again;
}
#endif
// skip if not done yet
if ( ! t->m_isDone ) continue;
#ifdef PTHREADS
// if pthread_create() failed it returns the errno and we
// needsJoin is false, so do not try to join
// to a thread if we did not create it, lest pthread_join()
// cores
if ( t->m_needsJoin ) {
// . join up with that thread
// . damn, sometimes he can block forever on his
// call to sigqueue(),
int64_t startTime = gettimeofdayInMillisecondsLocal();
int64_t took;
int32_t status = pthread_join ( t->m_joinTid , NULL );
took = startTime - gettimeofdayInMillisecondsLocal();
if ( took > 50 ) {
log("threads: pthread_join took %i ms",
(int)took);
}
if ( status != 0 ) {
log("threads: pthread_join %"INT64" = %s (%"INT32")",
(int64_t)t->m_joinTid,mstrerror(status),
status);
}
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined1 with "
"t=0x%"PTRFMT" "
"jointid=0x%"XINT64".",
(PTRTYPE)t,(int64_t)t->m_joinTid);
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE ,
STACK_SIZE - GUARDSIZE,
PROT_NONE );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
}
#else
again:
int status ;
pid_t pid = waitpid ( t->m_pid , &status , 0 );
int err = errno;
// debug the waitpid
if ( g_conf.m_logDebugThread || g_process.m_exiting )
log(LOG_DEBUG,"thread: Waiting for t=0x%"PTRFMT" "
"pid=%"INT32".",
(PTRTYPE)t,(int32_t)t->m_pid);
// bitch and continue if join failed
if ( pid != t->m_pid ) {
// waitpid() gets interrupted by various signals so
// we need to repeat (SIGCHLD?)
if ( err == EINTR ) goto again;
log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.",
(int32_t)t->m_pid,(int32_t)pid,mstrerror(err));
continue;
}
// if status not 0 then process got abnormal termination
if ( ! WIFEXITED(status) ) {
if ( WIFSIGNALED(status) )
log("thread: Child process (pid=%i) exited "
"from unhandled signal number %"INT32".",
pid,(int32_t)WTERMSIG(status));
else
log("thread: Child process (pid=%i) exited "
"for unknown reason." , pid );
}
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".",
(int32_t)t->m_pid,(int32_t)t->m_pid);
#endif
// we may get cleaned up and re-used and our niceness reassignd
// right after set m_isDone to true, so save niceness
//int32_t niceness = t->m_niceness;
char qnum = t->m_qnum;
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
if ( tq != this ) { char *xx = NULL; *xx = 0; }
// get read size before cleaning it up -- it could get nuked
//int32_t rs = 0;
bool isWrite = false;
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
//FileState *fs = (FileState *)t->m_state;
//rs = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
/*
if ( niceness <= 0) tq->m_hiReturned++;
else if ( niceness == 1) tq->m_mdReturned++;
else if ( niceness >= 2) tq->m_loReturned++;
// deal with the tiers for disk threads based on read sizes
if ( tq->m_threadType == DISK_THREAD ) {
// writes are special cases
if ( isWrite ) m_writesReturned++;
if ( rs >= 0 && niceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_loReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_loReturnedMed++;
else
tq->m_loReturnedSma++;
}
else if ( rs >= 0 && niceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_mdReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_mdReturnedMed++;
else
tq->m_mdReturnedSma++;
}
else if ( rs >= 0 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_hiReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_hiReturnedMed++;
else
tq->m_hiReturnedSma++;
}
}
*/
// now count him as returned
m_returned++;
// prepare for relaunch if we were cancelled
if ( t->m_isCancelled ) {
t->m_isCancelled = false;
t->m_isLaunched = false;
t->m_isDone = false;
// take out of launched linked list
removeLink ( &m_launchedHead , t );
// get the waiting linked list from whence we came
ThreadEntry **bestHeadPtr = t->m_bestHeadPtr;
ThreadEntry **bestTailPtr = t->m_bestTailPtr;
// put BACK into the waiting linked list at the head
addLinkToHead ( bestHeadPtr , bestTailPtr , t );
log("thread: Thread cancelled. Preparing thread "
"for relaunch");
continue;
}
numCallbacks++;
// not running any more
t->m_isLaunched = false;
// not occupied any more
t->m_isOccupied = false;
// do we have to decrement top
// if ( m_top == i + 1 )
// while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
// m_top--;
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"remove from launched list, add to empty list",
(PTRTYPE)t);
// now move it to the empty list
removeLink ( &m_launchedHead , t );
addLink ( &m_emptyHead , t );
// send a cancel sig to the thread in case it's still there
//int err = pthread_cancel ( t->m_tid );
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
// mstrerror(err) );
// one less entry occupied
//m_entriesUsed--;
// debug msg
//log("m_entriesUsed now %"INT32"",m_entriesUsed);
// one more returned
//m_returned++;
// clear the g_errno in case set by a previous callback
//g_errno = 0;
// launch as many threads as we can before calling the
// callback since this may hog the CPU like Msg20 does
//g_threads.launchThreads();
g_errno = 0;
//g_loop.startBlockedCpuTimer();
//only allow a quickpoll if we are nice.
//g_loop.canQuickPoll(t->m_niceness);
makeCallback ( t );
//int64_t took = gettimeofdayInMilliseconds()-startTime;
//if(took > 8 && maxNiceness > 0) {
// if(g_conf.m_sequentialProfiling)
// log(LOG_TIMING,
// "admin: Threads spent %"INT64" ms to callback "
// "%"INT32" callbacks, nice: %"INT32"",
// took, numCallbacks, maxNiceness);
// g_threads.m_needBottom = true;
// maxNiceness = 0;
//}
// clear errno again
g_errno = 0;
if ( g_conf.m_logDebugThread ) {
int64_t now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done1. "
"active=%"INT32" "
"time since queued = %"UINT64" ms "
"time since launch = %"UINT64" ms "
"time since pre-exit = %"UINT64" ms "
"time since exit = %"UINT64" ms",
(PTRTYPE)t,
getThreadType() ,
(int32_t)(m_launched - m_returned) ,
(uint64_t)(now - t->m_queuedTime),
(uint64_t)(now - t->m_launchedTime),
(uint64_t)(now - t->m_preExitTime) ,
(uint64_t)(now - t->m_exitTime) );
}
}
//since we need finer grained control in loop, we no longer collect
//the callbacks, sort, then call them. we now call them right away
//that way we can break out if we start taking too long and
//give control back to udpserver.
return numCallbacks != 0;
}
void makeCallback ( ThreadEntry *t ) {
// sanity check - if in a high niceness callback, we should
// only be calling niceness 0 callbacks here
// no, this is only called from sleep wrappers originating from
// Loop.cpp, so we should be ok
//if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; }
// save it
int32_t saved = g_niceness;
// log it now
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: enter thread callback t=0x%"PTRFMT" "
//"type=%s "
"state=0x%"PTRFMT" "
"nice=%"INT32"",
(PTRTYPE)t,
//getThreadType(),
(PTRTYPE)t->m_state,
(int32_t)t->m_niceness);
// time it?
int64_t start;
if ( g_conf.m_maxCallbackDelay >= 0 )
start = gettimeofdayInMillisecondsLocal();
// then set it
if ( t->m_niceness >= 1 ) g_niceness = 1;
else g_niceness = 0;
t->m_callback ( t->m_state , t );
// time it?
if ( g_conf.m_maxCallbackDelay >= 0 ) {
int64_t elapsed = gettimeofdayInMillisecondsLocal() - start;
if ( elapsed >= g_conf.m_maxCallbackDelay )
log("threads: Took %"INT64" ms to call "
"thread callback niceness=%"INT32"",
elapsed,(int32_t)saved);
}
// log it now
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
log(LOG_DEBUG,"loop: exit thread callback t=0x%"PTRFMT" "
//"type=%s "
"nice=%"INT32"",
(PTRTYPE)t,
//getThreadType(),
(int32_t)t->m_niceness);
// restore global niceness
g_niceness = saved;
}
bool Threads::cleanUp ( ThreadEntry *t , int32_t maxNiceness ) {
bool didSomething = false;
loop:
// assume no more cleanup needed
m_needsCleanup = false;
//m_needBottom = false;
// debug msg
//log("cleanUp");
// debug msg
//log("cleaning up exited threads and calling callbacks");
for ( int32_t i = 0 ; i < m_numQueues ; i++ ) {
didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness );
// . if we broke from the loop
//if(m_needBottom) maxNiceness = 0;
}
// . loop more if we got a new one
// . thread will set this when about to exit
// . waitpid() may be interrupted by a SIGCHLD and not get his pid
if ( m_needsCleanup ) goto loop;
return didSomething;
}
// . cleans up any threads that have exited
// . their m_isDone should be set to true
// . don't process threads whose niceness is > maxNiceness
bool ThreadQueue::cleanUp ( ThreadEntry *tt , int32_t maxNiceness ) {
// call all callbacks after all threads are cleaned up
void (* callbacks[64])(void *state,ThreadEntry *);
void *states [64];
int64_t times [64];
int64_t times2 [64];
int64_t times3 [64];
int64_t times4 [64];
ThreadEntry *tids [64];
int64_t startTime = gettimeofdayInMilliseconds();
// top:
ThreadEntry *t = m_launchedHead;
ThreadEntry *nextLink = NULL;
int32_t numCallbacks = 0;
// loop through candidates
//for ( int32_t i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) {
for ( ; t && numCallbacks < 64 ; t = nextLink ) {
// do it here in case we modify the linked list below
nextLink = t->m_nextLink;
// point to it
//ThreadEntry *t = &m_entries[i];
// skip if not qualified
if ( t->m_niceness > maxNiceness ) {
//if(t->m_isDone) {
// g_threads.m_needBottom = true;
// //g_threads.m_needsCleanup = true;
//}
continue;
}
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
if ( ! t->m_isLaunched ) continue;
// . we were segfaulting right here before because the thread
// was setting t->m_pid and at this point it was not
// set so t->m_pid was a bogus value
// . thread may have seg faulted, in which case sigbadhandler()
// in Loop.cpp will get it and set errno to 0x7fffffff
#ifndef PTHREADS
// MDW: i hafta take this out because the errno_location thing
// is not working on the newer gcc
if ( ! t->m_isDone && t->m_pid >= 0 &&
s_errnos [t->m_pid] == 0x7fffffff ) {
log("thread: Got abnormal thread termination. Seems "
"like the thread might have cored.");
s_errnos[t->m_pid] = 0;
goto again;
}
#endif
// skip if not done yet
if ( ! t->m_isDone ) continue;
#ifdef PTHREADS
if ( t->m_needsJoin ) {
// . join up with that thread
// . damn, sometimes he can block forever on his
// call to sigqueue(),
int32_t status = pthread_join ( t->m_joinTid , NULL );
if ( status != 0 ) {
log("threads: "
"pthread_join2 %"INT64" = %s (%"INT32")",
(int64_t)t->m_joinTid,mstrerror(status),
status);
}
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined2 with "
"t=0x%"PTRFMT" "
"jointid=0x%"XINT64".",
(PTRTYPE)t,(int64_t)t->m_joinTid);
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE ,
STACK_SIZE - GUARDSIZE,
PROT_NONE );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
}
#else
again:
int status ;
pid_t pid = waitpid ( t->m_pid , &status , 0 );
int err = errno;
// debug the waitpid
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: Waiting for "
"t=0x%"PTRFMT" pid=%"INT32".",
(PTRTYPE)t,(int32_t)t->m_pid);
// bitch and continue if join failed
if ( pid != t->m_pid ) {
// waitpid() gets interrupted by various signals so
// we need to repeat (SIGCHLD?)
if ( err == EINTR ) goto again;
log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.",
(int32_t)t->m_pid,(int32_t)pid,mstrerror(err));
continue;
}
// if status not 0 then process got abnormal termination
if ( ! WIFEXITED(status) ) {
if ( WIFSIGNALED(status) )
log("thread: Child process (pid=%i) exited "
"from unhandled signal number %"INT32".",
pid,(int32_t)WTERMSIG(status));
else
log("thread: Child process (pid=%i) exited "
"for unknown reason." , pid );
}
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
#endif
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".",
(int32_t)t->m_pid,(int32_t)t->m_pid);
// we may get cleaned up and re-used and our niceness reassignd
// right after set m_isDone to true, so save niceness
//int32_t niceness = t->m_niceness;
char qnum = t->m_qnum;
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
if ( tq != this ) { char *xx = NULL; *xx = 0; }
/*
// get read size before cleaning it up -- it could get nuked
int32_t rs = 0;
bool isWrite = false;
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
//FileState *fs = (FileState *)t->m_state;
rs = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( niceness <= 0) tq->m_hiReturned++;
else if ( niceness == 1) tq->m_mdReturned++;
else if ( niceness >= 2) tq->m_loReturned++;
// deal with the tiers for disk threads based on read sizes
if ( tq->m_threadType == DISK_THREAD ) {
// writes are special cases
if ( isWrite ) m_writesReturned++;
if ( rs >= 0 && niceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_loReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_loReturnedMed++;
else
tq->m_loReturnedSma++;
}
else if ( rs >= 0 && niceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_mdReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_mdReturnedMed++;
else
tq->m_mdReturnedSma++;
}
else if ( rs >= 0 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_hiReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_hiReturnedMed++;
else
tq->m_hiReturnedSma++;
}
}
*/
// . we should count down here, not in the master thread
// . solves Problem #2 ?
// . TODO: this is not necessaruly atomic we should set
// t->m_aboutToExit to true so cleanUp can periodically set
// m_returned to what it should be!!!
//g_threads.m_threadQueues[qnum].m_returned++;
// now count him as returned
m_returned++;
// prepare for relaunch if we were cancelled
if ( t->m_isCancelled ) {
t->m_isCancelled = false;
t->m_isLaunched = false;
t->m_isDone = false;
// take out of the linked list of launched threads
removeLink ( &m_launchedHead , t );
// what waiting linked list did it come from?
// get the waiting linked list from whence we came
ThreadEntry **bestHeadPtr = t->m_bestHeadPtr;
ThreadEntry **bestTailPtr = t->m_bestTailPtr;
// and put it back at the head
addLinkToHead ( bestHeadPtr , bestTailPtr , t );
log("thread: Thread cancelled. Preparing thread "
"for relaunch");
continue;
}
// debug msg
//log("[%"UINT32"] CLEANING UP THREAD type=%"INT32", numLaunched=%"INT32"",
// m_entries[i].m_tid , m_threadType , m_launched );
// remove it
// debug msg
//log("CLN TID=%"UINT32" t=%"UINT32"",(int32_t)t->m_tid , (int32_t)t);
//log("thread callback for tid=%"UINT32"",(int32_t)t->m_tid );
// . save important stuff before freeing up the ThreadEntry
// for possible take over.
// . calling the callback may launch a thread which may
// claim THIS thread entry, t
//void (* callback)(void *state);
//callback = t->m_callback;
//void *state = t->m_state;
callbacks [ numCallbacks ] = t->m_callback;
states [ numCallbacks ] = t->m_state;
times [ numCallbacks ] = t->m_queuedTime;
times2 [ numCallbacks ] = t->m_launchedTime;
times3 [ numCallbacks ] = t->m_preExitTime;
times4 [ numCallbacks ] = t->m_exitTime;
tids [ numCallbacks ] = t;
numCallbacks++;
// SOLUTION: before calling the callback which may launch
// another thread with this same tid, thus causing an error,
// we should set these to false first:
// not running any more
t->m_isLaunched = false;
// not occupied any more
t->m_isOccupied = false;
// do we have to decrement top
//if ( m_top == i + 1 )
// while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
// m_top--;
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"remove from launched list, add to empty list",
(PTRTYPE)t);
// now move it to the empty list
removeLink ( &m_launchedHead , t );
addLink ( &m_emptyHead , t );
// send a cancel sig to the thread in case it's still there
//int err = pthread_cancel ( t->m_tid );
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
// mstrerror(err) );
// one less entry occupied
//m_entriesUsed--;
// debug msg
//log("m_entriesUsed now %"INT32"",m_entriesUsed);
// one more returned
//m_returned++;
// clear the g_errno in case set by a previous callback
//g_errno = 0;
// launch as many threads as we can before calling the
// callback since this may hog the CPU like Msg20 does
//g_threads.launchThreads();
g_errno = 0;
makeCallback ( t );
// int64_t took = gettimeofdayInMilliseconds()-startTime;
// if(took > 8 && maxNiceness > 0) {
// if(g_conf.m_sequentialProfiling)
// log(LOG_TIMING,
// "admin: Threads spent %"INT64" ms to callback "
// "%"INT32" callbacks, nice: %"INT32"",
// took, numCallbacks, maxNiceness);
// g_threads.m_needBottom = true;
// maxNiceness = 0;
// }
// clear errno again
g_errno = 0;
if ( g_conf.m_logDebugThread ) {
int64_t now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done2. "
"active=%"INT32" "
"time since queued = %"UINT64" ms "
"time since launch = %"UINT64" ms "
"time since pre-exit = %"UINT64" ms "
"time since exit = %"UINT64" ms",
(PTRTYPE)t,
getThreadType() ,
(int32_t)(m_launched - m_returned) ,
(uint64_t)(now - t->m_queuedTime),
(uint64_t)(now - t->m_launchedTime),
(uint64_t)(now - t->m_preExitTime) ,
(uint64_t)(now - t->m_exitTime) );
}
// calling thread callback
//log("calling thread id %"INT32" callback", (int32_t)(t->m_tid));
// first call it's callback
//callback ( state );
// clear after just in case
//g_errno = 0;
// debug msg
//log("CLN2 TID=%"UINT32" t=%"INT32"",(int32_t)t->m_tid ,(int32_t)t);
// return now if tt was specified
//if ( tt ) return;
}
int64_t took2 = gettimeofdayInMilliseconds()-startTime;
if(numCallbacks > 0 && took2 > 5)
log(LOG_DEBUG, "threads: took %"INT64" ms to callback %"INT32" "
"callbacks, nice: %"INT32"", took2, numCallbacks, maxNiceness);
//since we need finer grained control in loop, we no longer collect
//the callbacks, sort, then call them. we now call them right away
//that way we can break out if we start taking too long and
//give control back to udpserver.
return numCallbacks != 0;
/*
// print out that we got them
if ( g_conf.m_logDebugThread ) {
int64_t now = gettimeofdayInMilliseconds();
for ( int32_t i = 0 ; i < numCallbacks ; i++ )
log(LOG_DEBUG,"thread: [tid=%"PTRFMT"] %s done3. "
"active=%"INT32" "
"time since queued = %"UINT64" ms "
"time since launch = %"UINT64" ms "
"time since pre-exit = %"UINT64" ms "
"time since exit = %"UINT64" ms",
(PTRTYPE)tids[i],
getThreadType() ,
(int32_t)(m_launched - m_returned) ,
(uint64_t)(now - times [i]),
(uint64_t)(now - times2[i]) ,
(uint64_t)(now - times3[i]) ,
(uint64_t)(now - times4[i]) );
}
// . before calling callbacks, launch any other threads waiting in
// this queue
// . TODO: break into parts, cleanup, launch, call callbacks
//while ( launchThread() );
// . sort callbacks by queued time
// . do bubble sort cuz usually it's not too many threads we cleaned up
bool flag ;
void (* tmpCallback)(void *state,ThreadEntry *t);
int64_t tmpTime;
void *tmpState;
int64_t now = gettimeofdayInMilliseconds();
bubble:
flag = false;
for ( int32_t i = 1 ; i < numCallbacks ; i++ ) {
if ( times[i] >= times[i-1] ) continue;
tmpTime = times [i ];
tmpState = states [i ];
tmpCallback = callbacks[i ];
times [i ] = times [i-1];
states [i ] = states [i-1];
tids [i ] = tids [i-1];
callbacks [i ] = callbacks[i-1];
times [i-1] = tmpTime;
states [i-1] = tmpState;
callbacks [i-1] = tmpCallback;
flag = true;
}
if ( flag ) goto bubble;
// call the callbacks now in order of oldest queued time first
for ( int32_t i = 0 ; i < numCallbacks ; i++ ) {
g_errno = 0;
callbacks[i] ( states[i] , NULL );
}
int64_t took = gettimeofdayInMilliseconds()-now;
if(numCallbacks > 0 && took > 5)
log(LOG_TIMING, "admin: took %"INT64" ms to callback %"INT32" "
"callbacks, nice: %"INT32"", took, numCallbacks, maxNiceness);
#if 0 //I think this is more efficient, for now we'll just use the old way
//theres no reason to sort these, lets just find the top one
//and call it. do it again it until we've called all of them.
int32_t newNumCallbacks = numCallbacks;
for ( int32_t i = 0 ; i < numCallbacks ; i++ ) {
int64_t maxTime = 0;
int32_t maxNdx = 0;
for ( int32_t j = 0 ; j < newNumCallbacks ; j++ ) {
if(maxTime >= times[i]) {
maxTime = times[i];
maxNdx = i;
}
}
g_errno = 0;
callbacks[maxNdx] ( states[maxNdx] );
//copy last one into called slot and decrement
newNumCallbacks--;
times [maxNdx] = times [newNumCallbacks];
states [maxNdx] = states [newNumCallbacks];
callbacks [maxNdx] = callbacks[newNumCallbacks];
}
#endif
g_errno = 0;
// close more threads if we were limited by callbacks[] size
if ( numCallbacks >= 64 ) goto top;
//return true if we called something back;
//returns wrong value if numCallbacks was exactly 64, not too serious...
return numCallbacks != 0;
*/
}
// used by UdpServer to see if it should call a low priority callback
bool Threads::hasHighPriorityCpuThreads() {
ThreadQueue *q ;
//int32_t hiActive = 0 ;
ThreadEntry *t ;
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
t = q->m_launchedHead;
for ( ; t ; t = t->m_nextLink )
if ( t->m_niceness == 0 ) return true;
//hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[MERGE_THREAD];
t = q->m_launchedHead;
for ( ; t ; t = t->m_nextLink )
if ( t->m_niceness == 0 ) return true;
//hiActive += q->m_hiLaunched - q->m_hiReturned;
return false;
}
// used by UdpServer to see if it should call a low priority callback
int32_t Threads::getNumActiveHighPriorityThreads() {
ThreadQueue *q ;
ThreadEntry *t;
int32_t hiActive = 0 ;
q = &g_threads.m_threadQueues[DISK_THREAD];
t = q->m_launchedHead;
for ( ; t ; t = t->m_nextLink )
if ( t->m_niceness == 0 ) hiActive++;
//hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
t = q->m_launchedHead;
for ( ; t ; t = t->m_nextLink )
if ( t->m_niceness == 0 ) hiActive++;
//hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[MERGE_THREAD];
t = q->m_launchedHead;
for ( ; t ; t = t->m_nextLink )
if ( t->m_niceness == 0 ) hiActive++;
//hiActive += q->m_hiLaunched - q->m_hiReturned;
return hiActive;
}
// . returns false if no thread launched
// . returns true if thread was launched
// . sets g_errno on error
// . don't launch a low priority thread if a high priority thread is running
// . i.e. don't launch a high niceness thread if a low niceness is running
bool ThreadQueue::launchThread2 ( ) {
// or if no stacks left, don't even try
if ( s_head == -1 ) return false;
// . how many threads are active now?
// . NOTE: not perfectly thread safe here
int64_t active = m_launched - m_returned ;
// debug msg
if ( g_conf.m_logDebugThread && m_threadType == DISK_THREAD )
log(LOG_DEBUG,"thread: q=%s launchThread: active=%"INT64" "
"max=%"INT32".",getThreadType(), active,m_maxLaunched);
// return if the max is already launched for this thread queue
if ( active >= m_maxLaunched ) return false;
if ( m_threadType != DISK_THREAD ) {
// if one thread of this type is already out, forget it
// then we can't have 100 GENERIC THREADS!!! with this...
//if ( m_launchedHead ) return false;
// first try niceness 0 queue
ThreadEntry **bestHeadPtr = &m_waitHead0;
ThreadEntry **bestTailPtr = &m_waitTail0;
// if empty, try niceness 1
if ( ! *bestHeadPtr ) {
bestHeadPtr = &m_waitHead1;
bestTailPtr = &m_waitTail1;
}
// then niceness 2
if ( ! *bestHeadPtr ) {
bestHeadPtr = &m_waitHead2;
bestTailPtr = &m_waitTail2;
}
// if bother empty, that was easy
if ( ! *bestHeadPtr ) return false;
// do not launch a low priority merge, intersect or filter
// thread if we/ have high priority cpu threads already going
// on. this way a low priority spider thread will not launch
// if a high priority cpu-based thread of any kind (right now
// just MERGE or INTERSECT) is already running.
if ( (*bestHeadPtr)->m_niceness > 0 &&
g_threads.hasHighPriorityCpuThreads ( ) )
return false;
// otherwise launch the next one
// MERGE_THREAD, INTERSECT, FILTER, UNLINK, SAVETREE
return launchThreadForReals ( bestHeadPtr , bestTailPtr );
}
// disk thread is much more complicated
// use several queues
// debug msg
//log("trying to launch for type=%"INT32"",(int32_t)m_threadType);
// clean up any threads that have exited
//cleanUp ();
// if no entries then nothing to launch
//if ( m_top <= 0 ) return false;
// return log("MAX. %"INT32" are launched. %"INT32" now in queue.",
// active , m_entriesUsed );
// . sanity check
// . a thread can NOT call this
//if ( getpid() != s_pid ) {
// fprintf(stderr,"thread: launchThread: bad engineer\n");
// ::exit(-1);
//}
//int64_t now = gettimeofdayInMilliseconds();
/*
int64_t now = -1LL;
// pick thread with lowest niceness first
int32_t minNiceness = 0x7fffffff;
int64_t maxWait = -1;
//int32_t mini = -1;
//bool minIsWrite = false;
int32_t lowest = 0x7fffffff;
int32_t highest = 0;
// . now base our active thread counts on niceness AND read sizes
// . this is only used for DISK_THREADs
// . loActive* includes niceness >= 1
int32_t loActiveBig = m_loLaunchedBig - m_loReturnedBig;
int32_t loActiveMed = m_loLaunchedMed - m_loReturnedMed;
int32_t loActiveSma = m_loLaunchedSma - m_loReturnedSma;
int32_t mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig;
int32_t mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed;
int32_t mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma;
int32_t hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig;
int32_t hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed;
int32_t hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma;
int32_t activeWrites = m_writesLaunched - m_writesReturned;
// how many niceness=2 threads are currently running now?
int64_t loActive = m_loLaunched - m_loReturned;
int64_t mdActive = m_mdLaunched - m_mdReturned;
//int64_t hiActive = m_hiLaunched - m_hiReturned;
int32_t total = loActive + mdActive;
*/
// hi priority max
// JAB: warning abatement
//int64_t hiActive = m_hiLaunched - m_hiReturned;
// get lowest niceness level of launched threads
ThreadEntry *t = m_launchedHead;
bool hasLowNicenessOut = false;
int32_t activeCount = 0;
int32_t spiderCount = 0;
// int32_t launchedBig0 = 0 , launchedMed0 = 0 , launchedSma0 = 0;
// int32_t launchedBig1 = 0 , launchedMed1 = 0 , launchedSma1 = 0;
for ( ; t ; t = t->m_nextLink ) {
// if he's done, skip him. maybe he hasn't been cleaned up yet
if ( t->m_isDone ) continue;
// count active out
activeCount++;
// get the highest niceness for all that are launched
//if ( niceness > highest ) highest = niceness;
//int32_t rs = t->m_bytesToGo;
// is the launched thread niceness 0? (i.e. high priority)
if ( t->m_niceness == 0 ) {
// set a flag
hasLowNicenessOut = true;
// count read sizes
// if (rs > g_conf.m_medReadSize ) launchedBig0++;
// else if (rs > g_conf.m_smaReadSize ) launchedMed0++;
// else launchedSma0++;
}
else {
spiderCount++;
}
// if ( rs > g_conf.m_medReadSize ) launchedBig1++;
// else if ( rs > g_conf.m_smaReadSize ) launchedMed1++;
// else launchedSma1++;
}
// int32_t max = g_conf.m_spiderMaxDiskThreads;
// if ( max <= 0 ) max = 1;
// if ( activeCount >= max )
// return false;
// get best thread candidate from best linked list of candidates
ThreadEntry **bestHeadPtr = NULL;
ThreadEntry **bestTailPtr = NULL;
// short/med/long high priority (niceness 0) disk reads in head0/1/2
// but we can't launch one more if already at our quota.
if ( ! bestHeadPtr && m_waitHead0 ) {
//launchedSma0 < g_conf.m_queryMaxSmaDiskThreads ) {
bestHeadPtr = &m_waitHead0;
bestTailPtr = &m_waitTail0;
}
// if ( ! bestHeadPtr &&
// m_waitHead1 &&
// launchedMed0 < g_conf.m_queryMaxMedDiskThreads ) {
// bestHeadPtr = &m_waitHead1;
// bestTailPtr = &m_waitTail1;
// }
// if ( ! bestHeadPtr &&
// m_waitHead2 &&
// launchedBig0 < g_conf.m_queryMaxBigDiskThreads ) {
// bestHeadPtr = &m_waitHead2;
// bestTailPtr = &m_waitTail2;
// }
// if we have a niceness 0 disk read/write outstandind and we are
// 1 or 2, do not launch! we do not want low priority disk reads
// having to contend with high priority ones.
// now we only do this if the 'separate disk reads' parms is true.
if ( g_conf.m_separateDiskReads && hasLowNicenessOut && ! bestHeadPtr )
return false;
// threads to save conf and tree/bucket files to disk go in waitHead3
// no these use SAVETREE Thread type
// if ( ! bestHead ) {
// bestHead = m_waitHead3;
// bestTail = m_waitTail3;
// }
// do not allow too high niceness read threads out
if ( spiderCount >= g_conf.m_spiderMaxDiskThreads )
return false;
// low priority (merge or dump) disk WRITES go in waithead4
if ( ! bestHeadPtr && m_waitHead4 ) {
bestHeadPtr = &m_waitHead4;
bestTailPtr = &m_waitTail4;
}
// niceness 1. for merge reads so they superscede regular spider reads
// niceness 1 read threads:
if ( ! bestHeadPtr && m_waitHead5 ) {
bestHeadPtr = &m_waitHead5;
bestTailPtr = &m_waitTail5;
}
// niceness 2 read threads:
if ( ! bestHeadPtr && m_waitHead6 ) {
bestHeadPtr = &m_waitHead6;
bestTailPtr = &m_waitTail6;
}
// if nobody waiting, return false
if ( ! bestHeadPtr ) return false;
// i dunno what the point of this was... so i commented it out
//int32_t max2 = g_conf.m_queryMaxDiskThreads ;
//if ( max2 <= 0 ) max2 = 1;
// only do this check if we're a addlists/instersect thread queue
//if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false;
// point to entry in the best linked list to launch from
return launchThreadForReals ( bestHeadPtr , bestTailPtr );
/*
// loop through candidates
for ( int32_t i = 0 ; i < m_top ; i++ ) {
// skip if not occupied
if ( ! m_entries[i].m_isOccupied ) continue;
int32_t niceness = m_entries[i].m_niceness;
// get lowest niceness level of launched threads
if ( m_entries[i].m_isLaunched ) {
// if he's done, skip him
if ( m_entries[i].m_isDone ) continue;
// get the highest niceness for all that are launched
if ( niceness > highest ) highest = niceness;
// get the lowest niceness for all that are launched
if ( niceness < lowest ) lowest = niceness;
// continue now since it's already launched
continue;
}
// . these filters really make it so the spider does not
// impact query response time
//if ( niceness >= 1 && hiActive > 0 ) continue;
// don't consider any lows if one already running
//if ( niceness >= 2 && loActive > 0 ) continue;
// don't consider any lows if a hi already running
//if ( niceness >= 2 && hiActive > 0 ) continue;
// don't consider any mediums if one already running
//if ( niceness == 1 && mdActive > 0 ) continue;
// don't consider any mediums if a hi already running
//if ( niceness == 1 && hiActive > 0 ) continue;
//if ( m_threadType == DISK_THREAD ) {
// if ( niceness >= 1 && hiActive > 0 ) continue;
// if ( niceness >= 2 && loActive >= max ) continue;
// if ( niceness == 1 && mdActive >= max ) continue;
//}
// treat niceness 1 as niceness 2 for ranking purposes
// IFF we're not too backlogged with file merges. i.e.
// IFF we're merging faster than we're dumping.
// only merges and dumps have niceness 1 really.
// spider disk reads are all niceness 2.
// Now Rdb::addList just refuses to add data if we have too
// many unmerged files on disk!
// now we use niceness 1 for "real merges" so those reads take
// priority over spider build reads. (8/14/12)
//if(niceness == 1 ) niceness = 2;
// if he doesn't beat or tie us, skip him
if ( niceness > minNiceness ) continue;
// no more than "max" medium and low priority threads should
// be active/launched at any one time
if ( niceness >= 1 && total >= max ) continue;
// int16_tcut
ThreadEntry *t = &m_entries[i];
// what is this guy's read size?
// the filestate provided could have been
//FileState *fs ;
int32_t readSize = 0 ;
bool isWrite = false;
if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) {
//fs = (FileState *)m_entries[i].m_state;
readSize = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( isWrite && activeWrites > g_conf.m_maxWriteThreads )
continue;
if ( m_threadType == MERGE_THREAD ||
m_threadType == INTERSECT_THREAD ||
m_threadType == FILTER_THREAD )
if ( niceness > 0 && hiActive2 > 0 )
continue;
// how many threads can be launched for this readSize/niceness?
if ( niceness >= 1 && m_threadType == DISK_THREAD ) {
if ( readSize > g_conf.m_medReadSize ) {
if ( loActiveBig + mdActiveBig >=
g_conf.m_spiderMaxBigDiskThreads )
continue;
}
else if ( readSize > g_conf.m_smaReadSize ) {
if ( loActiveMed + mdActiveMed >=
g_conf.m_spiderMaxMedDiskThreads )
continue;
}
else if ( loActiveSma + mdActiveSma >=
g_conf.m_spiderMaxSmaDiskThreads )
continue;
}
else if ( niceness < 1 && m_threadType == DISK_THREAD ) {
if ( readSize > g_conf.m_medReadSize ) {
if ( hiActiveBig >=
g_conf.m_queryMaxBigDiskThreads )
continue;
}
else if ( readSize > g_conf.m_smaReadSize ) {
if ( hiActiveMed >=
g_conf.m_queryMaxMedDiskThreads )
continue;
}
else if ( hiActiveSma >=
g_conf.m_queryMaxSmaDiskThreads )
continue;
}
// be lazy with this since it uses a significant amount of cpu
if ( now == -1LL ) now = gettimeofdayInMilliseconds();
// how long has this entry been waiting in the queue to launch?
int64_t waited = now - m_entries[i].m_queuedTime ;
// adjust "waited" if it originally had a niceness of 1
if ( m_entries[i].m_niceness >= 1 ) {
// save threads gain precedence
if ( m_threadType == SAVETREE_THREAD )
waited += 49999999;
else if ( m_threadType == UNLINK_THREAD )
waited += 39999999;
else if ( m_threadType == MERGE_THREAD )
waited += 29999999;
else if ( m_threadType == INTERSECT_THREAD )
waited += 29999999;
else if ( m_threadType == FILTER_THREAD )
waited += 9999999;
// if its a write thread... do it quick
else if ( isWrite )
waited += 19999999;
// . if it has waited more than 500 ms it needs
// to launch now...
// . these values seem to be VERY well tuned on
// my production machines, but they may have to
// be tuned for other machines? TODO.
// . they should auto-tune so when merge is more
// important it should starve the other spider reads
else if ( waited >= 500 )
waited += 9999999;
// it hurts for these guys to wait that int32_t
else waited *= 4;
}
// is real merge?
if ( m_entries[i].m_niceness == 1 )
waited += 19999999;
// watch out for clock skew
if ( waited < 0 ) waited = 0;
// if tied, the lowest time wins
if ( niceness == minNiceness && waited <= maxWait ) continue;
// we got a new winner
mini = i;
minNiceness = niceness;
maxWait = waited;
minIsWrite = isWrite;
}
// if no candidate, bail honorably
if ( mini == -1 ) return false;
// . if we've got an urgent thread (niceness=1) going on, do not allow
// niceness=2 threads to launch
// . actually, don't launch *ANY* if doing an urgent merge even if
// no niceness=1 thread currently launched
// . CAUTION! when titledb is merging it dumps tfndb and if tfndb
// goes urgent,make sure it dumps out with niceness 1, otherwise
// this will freeze us!! since titledb won't be able to continue
// merging until tfndb finishes dumping...
// . Now Rdb::addList just refuses to add data if we have too
// many unmerged files on disk! Let others still read from us,
// just not add to us...
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 )
// return false;
// if we're urgent, don't launch a niceness 2 unless he's waited
// at least 200ms in the queue
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 )
// return false;
// . if the thread to launch has niceness > lowest launched then bail
// . i.e. don't launch a low-priority thread if we have highs running
// . we no longer let a niceness of 1 prevent a niceness of 2 from
// launching, this way we can launch merge threads at a niceness
// of 1 w/o hurting the spidering too much, but still giving the
// merge some preferential treatment over the disk so we don't
// RdbDump data faster than we can finish the merge.
if ( minNiceness > lowest && lowest < 1 ) return false;
// . don't launch a low priority thread if there's a high launched
// from any ThreadQueue
// . hi priority is niceness <= 0, med/lo priority is niceness >= 1
//int64_t hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned;
// now just limit it to this queue... so you can launch a low
// merge thread even if there's a high disk read going on
//if ( minNiceness >= 1 && hiActive > 0 ) return false;
// if we're low priority and suspended is true then bail
//if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false;
// if we're going to launch a high priority thread then CANCEL
// any low priority disk-read threads already in progress
//if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads ();
// . let's cancel ALL low priority threads if we're launching a high
// . hi priority is niceness <= 0, low priority is niceness >= 1
//int64_t loActive = g_threads.m_loLaunched - g_threads.m_loReturned;
int32_t realNiceness = m_entries[mini].m_niceness;
//if ( realNiceness <= 0 && (loActive + mdActive) > 0 )
// // this actually cancels medium priority threads, too
// g_threads.cancelLowPriorityThreads();
// point to winning entry
ThreadEntry *t = &m_entries[mini];
*/
}
bool ThreadQueue::launchThreadForReals ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ) {
ThreadEntry *t = *headPtr;
// if descriptor was closed, just return error now, we
// cannot try to re-open because the file might have been
// unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid
// FileState ptr because it does its own saving, so check for NULLs.
FileState *fs = (FileState *)t->m_state;
bool allocated = false;
if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) {
// allocate the read buffer here!
if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) {
int32_t need = t->m_bytesToGo + fs->m_allocOff;
char *p = (char *) mmalloc ( need , "ThreadReadBuf" );
if ( p ) {
fs->m_buf = p + fs->m_allocOff;
fs->m_allocBuf = p;
fs->m_allocSize = need;
allocated = true;
}
else
log("thread: read buf alloc failed "
"for %"INT32" bytes.",need);
// just let the BigFile::readWrite_r() handle the
// error for the NULL read buf
}
// . otherwise, they are intact, so get the real fds
// . we know the stored File is still around because of that
bool doWrite = fs->m_doWrite;
BigFile *bb = fs->m_this;
fs->m_fd1 = bb->getfd (fs->m_filenum1,!doWrite);//&fs->m_vfd1);
fs->m_fd2 = bb->getfd (fs->m_filenum2,!doWrite);//&fs->m_vfd2);
// is this bad?
if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s",
fs->m_fd1,bb->getFilename());
if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.",
fs->m_fd2,bb->getFilename());
fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 );
fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 );
}
// count it as launched now, before we actually launch it
m_launched++;
/*
// priority-based GLOBAL & LOCAL launch count
if ( realNiceness <= 0 ) m_hiLaunched++;
else if ( realNiceness == 1 ) m_mdLaunched++;
else if ( realNiceness >= 2 ) m_loLaunched++;
// deal with the tiers for disk threads based on read sizes
if ( m_threadType == DISK_THREAD ) {
// writes are special cases
if ( t->m_doWrite ) m_writesLaunched++;
//FileState *fs = (FileState *)m_entries[mini].m_state;
int32_t rs = t->m_bytesToGo; // 0;
//if ( fs ) rs = fs->m_bytesToGo;
if ( realNiceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
m_loLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_loLaunchedMed++;
else
m_loLaunchedSma++;
}
else if ( realNiceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
m_mdLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_mdLaunchedMed++;
else
m_mdLaunchedSma++;
}
else {
if ( rs > g_conf.m_medReadSize )
m_hiLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_hiLaunchedMed++;
else
m_hiLaunchedSma++;
}
}
*/
// debug msg
//if ( m_threadType == 0 )
// log("creating thread, t=%"UINT32" state=%"UINT32"
//launched = %"INT32"",
// t , (int32_t)t->m_state , m_launched );
// and set the flag
t->m_isLaunched = true;
// . launch it
// . this sets the pthread_t ptr for identificatoin
// . returns false on error
//pthread_t tmp;
loop:
// debug msg
if ( g_conf.m_logDebugThread ) {
int32_t active = m_launched - m_returned ;
int64_t now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] launched %s thread. "
"active=%"INT32" "
"niceness=%"INT32". waited %"UINT64" ms in queue.",
(PTRTYPE)t, getThreadType(), active, t->m_niceness ,
now - t->m_queuedTime);
}
// be lazy with this since it uses a significant amount of cpu
//if ( now == -1LL ) now = gettimeofdayInMilliseconds();
int64_t now = gettimeofdayInMilliseconds();
//t->m_launchedTime = g_now;
t->m_launchedTime = now;
// loop2:
// spawn the thread
int32_t count = 0;
pid_t pid;
#ifndef PTHREADS
//int status;
//int ret;
// random failure test
//if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; }
// malloc twice the size
t->m_stackSize = STACK_SIZE;
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
int32_t si = g_threads.getStack ( );
if ( si < 0 ) {
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
goto hadError;
}
t->m_si = si;
t->m_stack = s_stackPtrs [ si ];
// UNprotect the whole stack so we can use it
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
PROT_READ | PROT_WRITE );
// clear g_errno
g_errno = 0;
// . make another process
// . do not use sig handlers, so if a child process gets any unhandled
// signal (like SEGV) it will just exit
pid = clone ( startUp , t->m_stack + t->m_stackSize ,
CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND |
SIGCHLD ,
(void *)t );
// . we set the pid because we are the one that checks it!
// . if we just let him do it, when we check in cleanup routine
// we can get an uninitialized pid
t->m_pid = pid;
// might as well bitch if we should here
if ( s_bad ) {
log(LOG_LOGIC,"thread: PID received: %"INT32" > %"INT32". Bad.",
s_badPid, (int32_t)MAX_PID);
//char *xx = NULL; *xx = 0;
}
// wait for him
//ret = waitpid ( -1*pid , &status , 0 );
//if ( ret != pid )
// log("waitpid(pid=%"INT32"): ret=%"INT32" err=%s",
// (int32_t)pid,(int32_t)ret,mstrerror(errno));
// check if he's done
//if ( ! t->m_isDone ) log("NOT DONE");
// set the pid
//t->m_pid = pid;
// error?
if ( pid == (pid_t)-1 ) g_errno = errno;
//
// now use pthreads again... are they stable yet?
//
#else
// assume it does not go through
t->m_needsJoin = false;
// pthread inherits our sigmask, so don't let it handle sigalrm
// signals in Loop.cpp, it'll screw things up. that handler
// is only meant to be called by the main process. if we end up
// double calling it, this thread may think g_callback is non-null
// then it gets set to NULL, then the thread cores! seen it...
// sigset_t sigs;
// sigemptyset ( &sigs );
// sigaddset ( &sigs , SIGALRM );
// sigaddset ( &sigs , SIGVTALRM );
// if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 )
// log("threads: failed to block sig");
// supply our own stack to make pthread_create() fast otherwise
// it has slowness issues with mmap()
// http://www.gossamer-threads.com/lists/linux/kernel/960227
t->m_stackSize = STACK_SIZE;
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
int32_t si = g_threads.getStack ( );
if ( si < 0 ) {
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
goto hadError;
}
t->m_si = si;
t->m_stack = s_stackPtrs [ si ];
// check it's aligned
if ( (uint64_t)(t->m_stack) & (THRPAGESIZE-1) ) {
char *xx=NULL;*xx=0; }
// UNprotect the whole stack so we can use it
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
PROT_READ | PROT_WRITE );
pthread_attr_t attr;
pthread_attr_init ( &attr );
pthread_attr_setstack ( &attr , t->m_stack , t->m_stackSize );
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"remove from wait list, add to launch list",
(PTRTYPE)t);
// remove from waiting linked list, whichever one it was in
removeLink2 ( headPtr , tailPtr , t );
// add to 'launched' linked list
addLink ( &m_launchedHead , t );
// save the waiting linked list we came from in case we get cancelled
// and we have to put it back into it
t->m_bestHeadPtr = headPtr;
t->m_bestTailPtr = tailPtr;
// debug
if ( g_conf.m_logDebugThread )
log("threads: pthread_create: "
"stack=%"PTRFMT" stacksize=%"INT64""
, (PTRTYPE)t->m_stack
, (int64_t)t->m_stackSize );
// this returns 0 on success, or the errno otherwise
g_errno = pthread_create ( &t->m_joinTid , &attr, startUp2 , t) ;
// if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 )
// log("threads: failed to unblock sig");
#endif
// we're back from pthread_create
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: Back from clone "
"t=0x%"PTRFMT" pid=%"INT32".",
(PTRTYPE)t,(int32_t)pid);
// return true on successful creation of the thread
if ( g_errno == 0 ) {
// good stuff, the thread needs a join now
t->m_needsJoin = true;
if ( count > 0 )
log("thread: Call to clone looped %"INT32" times.",
count);
return true;
}
//#undef usleep
// forever loop
if ( g_errno == EAGAIN ) {
if ( count++ == 0 )
log("thread: Call to clone had error: %s.",
mstrerror(g_errno));
//usleep(1);
//goto loop2;
goto hadError;
}
// debug msg
// log("created tid=%"INT32"",(int32_t)t->m_tid);
// return true;
//}
// do again if g_errno is AGAIN
if ( g_errno == EINTR ) {
log("thread: Call to clone was interrupted. Trying again.");
goto loop;
}
//#ifndef _PTHREADS_
hadError:
//#endif
if ( g_errno )
log("thread: pthread_create had error = %s",
mstrerror(g_errno));
// it didn't launch, did it? dec the count.
m_launched--;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
// RETURN THE STACK
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
/*
// priority-based LOCAL & GLOBAL launch counts
if ( realNiceness <= 0 ) m_hiLaunched--;
else if ( realNiceness == 1 ) m_mdLaunched--;
else if ( realNiceness >= 2 ) m_loLaunched--;
// . deal with the tiers for disk threads based on read sizes
// . WARNING: we cannot easily change tiers dynamically
// because it will throw these counts off
if ( m_threadType == DISK_THREAD ) {
// writes are special cases
if ( t->m_doWrite ) m_writesLaunched--;
//FileState *fs = (FileState *)m_entries[mini].m_state;
int32_t rs = t->m_bytesToGo; // 0;
//if ( fs ) rs = fs->m_bytesToGo;
if ( realNiceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
m_loLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_loLaunchedMed--;
else
m_loLaunchedSma--;
}
else if ( realNiceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
m_mdLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_mdLaunchedMed--;
else
m_mdLaunchedSma--;
}
else {
if ( rs > g_conf.m_medReadSize )
m_hiLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_hiLaunchedMed--;
else
m_hiLaunchedSma--;
}
}
*/
// unset the flag
t->m_isLaunched = false;
// bail on other errors
log("thread: Call to clone had error: %s.", mstrerror(g_errno));
// correction on this error
log("thread: Try not using so much memory. "
"memused now =%"INT64".",g_mem.getUsedMem());
// free allocated buffer
if ( allocated ) {
mfree ( fs->m_allocBuf , fs->m_allocSize , "ThreadReadBuf" );
fs->m_buf = NULL;
}
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"remove from launched list, RE-add to wait list",
(PTRTYPE)t);
// remove from launched linked list
removeLink ( &m_launchedHead , t );
// back into the queue waiting to launch
addLinkToHead ( headPtr , tailPtr , t );
// i'm not sure return value matters at this point? the thread
// is queued and hopefully will launch at some point
return false;
/*
// if this is the direct thread request do not call callback, just
// return false, otherwise we get into an unexpected loop thingy
if ( t == te )
return log("thread: Returning false.");
// do it blocking
log("thread: Calling without thread. This will crash many times. "
"Please fix it.");
// return false so caller will re-do without thread!
// so BigFile::readwrite() will retry without thread and we won't
// get into a weird loop thingy
if ( te ) return false;
// uint64_t profilerStart,profilerEnd;
// uint64_t statStart,statEnd;
//if (g_conf.m_profilingEnabled){
// address=(int32_t)t->m_startRoutine;
// g_profiler.startTimer(address, __PRETTY_FUNCTION__);
//}
t->m_startRoutine ( t->m_state , t );
//if (g_conf.m_profilingEnabled) {
// if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__))
// log(LOG_WARN,"admin: Couldn't add the fn %"INT32"",
// (int32_t)address);
//}
t->m_exitTime = gettimeofdayInMilliseconds();
// flag it for cleanup
t->m_isDone = true;
t->m_isLaunched = true;
// clean it up
cleanUp ( t , 200);//maxNiceness thread can have to be cleaned up
// ignore error
g_errno = 0;
// we kinda launched one, so say true here
return true; // false;
*/
}
#ifndef PTHREADS
static bool s_firstTime = true;
#endif
// threads start up with cacnellation deferred until pthreads_testcancel()
// is called, but we never call that
int startUp ( void *state ) {
// get thread entry
ThreadEntry *t = (ThreadEntry *)state;
// no! now parent does since he is the one that needs to check it
// in the cleanup routine
// remember the pid
//t->m_pid = getpid();
// . sanity check
// . a thread can NOT call this
#ifndef PTHREADS
if ( getpid() == s_pid )
log("thread: Thread has same pid %i as main process.",s_pid);
#endif
// the cleanup handler
//pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state );
// our signal set
sigset_t set;
sigemptyset(&set);
//sigaddset(&set, SIGHUP);
// we need this here so if we break the gb process with gdb it
// does not kill the child processes when it sends out the SIGINT.
sigaddset(&set, SIGINT);
// ignore the real time signal, man...
//sigaddset(&set, GB_SIGRTMIN);
//pthread_sigmask(SIG_BLOCK, &set, NULL);
#ifndef PTHREADS
sigprocmask(SIG_BLOCK, &set, NULL);
#else
// turn these off in the thread
sigaddset ( &set , SIGALRM );
sigaddset ( &set , SIGVTALRM );
pthread_sigmask(SIG_BLOCK,&set,NULL);
#endif
// . what this lwp's priority be?
// . can range from -20 to +20
// . the lower p, the more cpu time it gets
// . this is really the niceness, not the priority
int p ;
// currently our niceness ranges from -1 to 2 for us
if ( t->m_niceness == 2 ) p = 19 ;
else if ( t->m_niceness == 1 ) p = 10 ;
else p = 0 ;
// remember the tid
//t->m_tid = pthread_self();
// debug
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"in startup pid=%"INT64" pppid=%"INT32"",
(PTRTYPE)t,(int64_t)getpidtid(),(int32_t)getppid());
// debug msg
//fprintf(stderr,"new thread tid=%"INT32" pid=%"INT32"\n",
// (int32_t)t->m_tid,(int32_t)t->m_pid);
// . set this process's priority
// . setpriority() is only used for SCHED_OTHER threads
//if ( pthread_setschedprio ( getpidtid() , p ) ) {
#ifndef PTHREADS
if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) {
// do we even support logging from a thread?
if ( s_firstTime ) {
log("thread: Call to "
"setpriority(%"UINT32",%i) in thread "
"failed: %s. This "
"message will not be repeated.",
(uint32_t)getpidtid(),p,mstrerror(errno));
s_firstTime = false;
}
errno = 0;
}
#endif
/*
sched_param sp;
int pp;
int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ;
if ( err ) log("thread: startUp: pthread_getschedparam: %s",
mstrerror(err));
// adjust the priority
p = 1;
sp.sched_priority = p;
// and reassign
err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ;
log("thread: startUp: pthread_setschedparam(%"INT32"): %s",
p,mstrerror(err));
*/
// somehow, it works ok when we have this print statement delay!!!
//fprintf(stderr,"thread pid = %"INT32"\n",(int32_t)getpid());
// . call the startRoutine
// . IMPORTANT: this can NEVER do non-blocking stuff
t->m_startRoutine ( t->m_state , t );
// pop it off
//pthread_cleanup_pop ( 1 /*execute handler?*/ );
// . now throw it on g_loop's sigqueue
// . the first 4 bytes of t->m_state should be t->m_callback
// . no! just use 1 to tell Loop to call g_threads.cleanUp()
// . TODO: pass in a ptr to cleanUpWrapper() instead of "t"
// . sival_int is only 4 bytes on a 64 bit arch...
sigval_t svt;
svt.sival_int = 1;//(int64_t)t ; //(int)(t->m_state); // fd;
// set exit time
int64_t now = gettimeofdayInMilliseconds();
t->m_preExitTime = now;
t->m_exitTime = now;
if ( g_conf.m_logDebugThread ) {
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"done with startup pid=%"INT64"",
(PTRTYPE)t,(int64_t)getpidtid());
}
// . now mark thread as ready for removal
// . do this BEFORE queueing the signal since we're still a thread!!!
// . cleanUp() will take care of the rest
// . cleanUp() will call pthread_join on us!
t->m_isDone = true;
// let Loop.cpp's sigHandler_r call g_thread.cleanUp()
g_threads.m_needsCleanup = true;
//if(t->m_niceness > 0) g_threads.m_needBottom = true;
// . send the signal
// . if queue is full g_loop will get a SIGIO and call
// g_threads.cleanUp()/launchThreads() in it's doPoll() routine
// . we reserve GB_SIGRTMIN itself for unblocked interrupts for
// UdpServer
// . damn, it seems that if the queue is full pthread_join is unable
// to detach threads.... so sleep until it clears up
// . HEY! process is supposed to send us an ECHLD signal? right?
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
// . it does not send us a signal automatically, so we must do it!
// . i noticed during the linkdb rebuild we were not getting the signal
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
// i verified this breaks select() in Loop.cpp out of it's sleep
//fprintf(stderr,"threads sending SIGCHLD\n");
// try a sigchld now! doesn't it already do this? no...
// on 64bit arch pthread_t is 64bit and pid_t is 32bit
// i dont think this makes sense with pthreads any more, they don't
// use pid_t they use pthread_t
#ifndef PTHREADS
sigqueue ( (pid_t)(int64_t)s_pid, SIGCHLD, svt ) ;
#else
pthread_kill ( s_pid , SIGCHLD );
#endif
return 0;
}
// pthread_create uses this one
void *startUp2 ( void *state ) {
startUp ( state );
return NULL;
}
// watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we
// could be in a signal handler here
void ThreadQueue::suspendLowPriorityThreads() {
// disable for now
//return;
// just return if already suspended
if ( m_isLowPrioritySuspended ) return;
// log it
//log("ThreadQueue:: suspending low priority threads!!!!!!");
// set the flag so low priority threads won't be launched
m_isLowPrioritySuspended = true;
// . cancel any outstanding low priority threads that are running
// . no, only cancel if we run another disk thread
// . this will happen above in ThreadQueue::launchThread()
//cancelLowPriorityThreads();
}
// this is called by UdpServer::destroySlot() and should NOT be in a sig handlr
void ThreadQueue::resumeLowPriorityThreads() {
// disable for now
//return;
// bail if no need
if ( ! m_isLowPrioritySuspended ) return;
// turn em back on
m_isLowPrioritySuspended = false;
// just in case, though
//if ( g_inSigHandler ) {
// log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig "
// "handler.");
// return;
//}
// try to start up some threads then
g_threads.launchThreads();
}
void ThreadQueue::print ( ) {
// loop through candidates
for ( int32_t i = 0 ; i < m_top ; i++ ) {
ThreadEntry *t = &m_entries[i];
// print it
log(LOG_INIT,"thread: address=%"PTRFMT" "
"pid=%u state=%"PTRFMT" "
"occ=%i done=%i lnch=%i",
(PTRTYPE)t , t->m_pid ,
(PTRTYPE)t->m_state , t->m_isOccupied , t->m_isDone ,
t->m_isLaunched );
}
}
const char *ThreadQueue::getThreadType ( ) {
const char *s = "unknown";
if ( m_threadType == DISK_THREAD ) s = "disk";
if ( m_threadType == MERGE_THREAD ) s = "merge";
if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists";
if ( m_threadType == FILTER_THREAD ) s = "filter";
if ( m_threadType == SAVETREE_THREAD ) s = "savetree";
if ( m_threadType == UNLINK_THREAD ) s = "unlink";
if ( m_threadType == GENERIC_THREAD ) s = "generic";
return s;
}
#include "BigFile.h" // FileState class
/*
MDW: this is unused
int32_t Threads::getDiskThreadLoad ( int32_t maxNiceness , int32_t *totalToRead ) {
ThreadQueue *q = &m_threadQueues[DISK_THREAD];
ThreadEntry *e = q->m_entries;
int32_t top = q->m_top;
*totalToRead = 0;
int32_t n = 0;
// we really can't suspend threads cuz they might have the
// mutex lock so we just cancel the disk threads here then
for ( int32_t i = 0 ; i < top ; i++ ) {
// get entry
ThreadEntry *t = &e[i];
// skip if not occupied
if ( ! t->m_isOccupied ) continue;
// skip if it's nicer than what we want
if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue;
// skip if already done
if ( t->m_isDone ) continue;
// cast state data
FileState *fs = (FileState *) t->m_state;
// sometimes NULL, like from Sync.cpp's call
if ( ! fs ) continue;
// only remove read operations, since write operations get
// the fd up front
if ( t->m_doWrite ) continue;
// how many byte to do
//int32_t todo = fs->m_bytesToGo - fs->m_bytesDone;
int32_t todo = t->m_bytesToGo;
// multiply by 2 if a write
if ( t->m_doWrite ) todo *= 2;
// add to total bytes to read
*totalToRead += todo;
// count the thread
n++;
}
return n;
}
*/
// when a BigFile is removed, much like we remove its pages from DiskPageCache
// we also remove any unlaunched reads/writes on it from the thread queue.
void ThreadQueue::removeThreads ( BigFile *bf ) {
// did the BigFile get hosed? that means our BigFile was
// unlinked or closed before we got a chance to launch the
// thread.
//int32_t maxi = -1;
//ThreadEntry *head ;
removeThreads2 ( &m_waitHead0 , &m_waitTail0 , bf );
removeThreads2 ( &m_waitHead1 , &m_waitTail1 , bf );
removeThreads2 ( &m_waitHead2 , &m_waitTail2 , bf );
removeThreads2 ( &m_waitHead3 , &m_waitTail3 , bf );
removeThreads2 ( &m_waitHead4 , &m_waitTail4 , bf );
removeThreads2 ( &m_waitHead5 , &m_waitTail5 , bf );
removeThreads2 ( &m_waitHead6 , &m_waitTail6 , bf );
}
void ThreadQueue::removeThreads2 ( ThreadEntry **headPtr ,
ThreadEntry **tailPtr ,
BigFile *bf ) {
int32_t saved = g_errno;
ThreadEntry *t = *headPtr;
ThreadEntry *nextLink = NULL;
for ( ; t ; t = nextLink ) {
// do it here in case we modify the linked list below
nextLink = t->m_nextLink;
// get the filestate
FileState *fs = (FileState *)t->m_state;
// skip if NULL
if ( ! fs ) continue;
// skip if not match
if ( fs->m_this != (void *)bf ) continue;
// . let it finish writing if it is a write thread
// . otherwise, if we are exiting, we could free the
// buffer being written and cause the thread to core...
if ( fs->m_doWrite ) {
log(LOG_INFO,"disk: Not removing write thread.");
continue;
}
// . should we really? if we renamed the file to another,
// we need to recompute the offsets to read, etc.. so we
// should fail up to Msg5 with EFILECLOSED or something...
// . i think we did a rename and it got the same fd, and since
// we did not remove the launched or done threads after the
// rename, we're not sure if they read from the newly renamed
// file or not, and our read offset was for the old file...
// . at least set the error flag for doneWrapper()
fs->m_errno2 = EFILECLOSED;
// log it
logf(LOG_INFO,"disk: Removing/flagging operation in thread "
"queue. fs=0x%"PTRFMT"", (PTRTYPE)fs);
// skip if already done
if ( t->m_isDone ) continue;
// skip if launched
if ( t->m_isLaunched ) continue;
// note in the log it is launched
log(LOG_INFO,"disk: Thread is launched.");
// tell donewrapper what happened
fs->m_errno = EFILECLOSED;
g_errno = EFILECLOSED;
// note it
//log(LOG_INFO,"disk: Removing operation from thread queue.");
// remove it from the thread queue
t->m_isDone = true;
t->m_isLaunched = false;
t->m_isOccupied = false;
// keep track
//maxi = i;
// remove from waiting linked list, whichever one it was in
removeLink2 ( headPtr , tailPtr , t );
// add to 'empty' linked list
addLink ( &m_emptyHead , t );
makeCallback ( t );
}
// do we have to decrement top
// if ( m_top == maxi + 1 )
// while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--;
// this was causing us to lose a g_errno value when XmlDoc::~XmlDoc()
// called BigFile::~BigFile() called removeThreads()
//called removeThreads2()
//g_errno = 0;
g_errno = saved;
}
void Threads::printState() {
int64_t now = gettimeofdayInMilliseconds();
for ( int32_t i = 0 ; i < m_numQueues; i++ ) {
ThreadQueue *q = &m_threadQueues[i];
// int32_t loActive = q->m_loLaunched - q->m_loReturned;
// int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
// int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
// int32_t total = loActive + mdActive + hiActive;
//if( total == 0) continue;
// log(LOG_TIMING,
// "admin: Thread counts: type:%s "
// "%"INT32":low %"INT32":med%"INT32":high %"INT32":total",
// q->getThreadType(),loActive, mdActive, hiActive, total);
for ( int32_t j = 0 ; j < q->m_top ; j++ ) {
ThreadEntry *t = &q->m_entries[j];
if(!t->m_isOccupied) continue;
if(t->m_isDone) {
log(LOG_TIMING,
"admin: Thread -done- "
"nice: %"INT32" "
"totalTime: %"INT64" (ms) "
"queuedTime: %"INT64"(ms) "
"runTime: %"INT64"(ms) "
"cleanup: %"INT64"(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
t->m_launchedTime - t->m_queuedTime,
t->m_exitTime - t->m_launchedTime,
now - t->m_exitTime,
g_profiler.
getFnName((PTRTYPE)t->m_callback));
continue;
}
if(t->m_isLaunched) {
log(LOG_TIMING,
"admin: Thread -launched- "
"nice: %"INT32" "
"totalTime: %"INT64"(ms) "
"queuedTime: %"INT64"(ms) "
"runTime: %"INT64"(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
t->m_launchedTime - t->m_queuedTime,
now - t->m_launchedTime,
g_profiler.
getFnName((PTRTYPE)t->m_callback));
continue;
}
log(LOG_TIMING,
"admin: Thread -queued- "
"nice: %"INT32" "
"queueTime: %"INT64"(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
g_profiler.getFnName((PTRTYPE)t->m_callback));
}
}
}
void ThreadQueue::killAllThreads ( ) {
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( ! e->m_isOccupied ) continue;
if ( ! e->m_isLaunched ) continue;
log("threads: killling thread id %i",(int)e->m_joinTid);
pthread_kill ( e->m_joinTid , SIGKILL );
log("threads: joining with thread id %i",(int)e->m_joinTid);
pthread_join ( e->m_joinTid , NULL );
}
}
void Threads::killAllThreads ( ) {
log("threads: killing all threads");
for ( int32_t j = 0 ; j < m_numQueues ; j++ ) {
ThreadQueue *tq = &m_threadQueues[j];
tq->killAllThreads();
}
}