#include "gb-include.h" #include "BigFile.h" #include "Threads.h" #include "Errno.h" #include "Loop.h" #include #include #include // getuid()/pid_t/getpid() #include // waitpid() #include "Rdb.h" // g_mergeUrgent #include // clone() //#include "Msg16.h" // g_pid g_ticker #include "XmlDoc.h" // g_pid g_ticker #include "Profiler.h" #include "Stats.h" #include "Process.h" // try using pthreads again //#define PTHREADS // use these stubs so libplotter.a works #ifndef PTHREADS int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; } int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; } #else #include //pthread_attr_t s_attr; #endif // main process id (or thread id if using pthreads) //static pid_t s_pid = (pid_t) -1; // on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit: static pthread_t s_pid = (pthread_t) -1; //pid_t getpidtid() { // on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit: pthread_t getpidtid() { #ifdef PTHREADS // gettid() is a bit newer so not in our libc32... // so kinda fake it. return the "thread" id, not process id. // Threads::amThread() should still work based on thread ids because // the main process has a unique thread id as well. return pthread_self(); #else return (pthread_t)getpid(); #endif } // BIG PROBLEM: // When doing pthread_join it doesn't always ensure a thread doesn't go // zombie. It seems like when SIGIOs are generated by sigqueue() because // of a full signal queue, threads start going zombie on me. // PROBLEM #1: with using the "state" to identify a thread in the queue. // Often a caller make s a disk access using a certain "state" var. // He gets control back when cleanUp() calls t->m_callback. And he launches // another read thread in there, which calls Threads::exit(state) before // the original thread entry's m_isOccupied is set to false. So // Threads::exit(state) mistakenly picks the thread entry from the former // thread picks it has the same state as its thread! // PROBLEM #2: technically, a thread in Threads::exit() can set its m_done // bit then send the signal. When the Loop sig handler sees the done bit is // set it decrements the global thread count even though the thread may // not have actually exited yet!! I spawned like 1,000 threads this way!!!!! // a global class extern'd in .h file Threads g_threads; // . call this before calling a ThreadEntry's m_startRoutine // . allows us to set the priority of the thread int startUp ( void *state ) ; void *startUp2 ( void *state ) ; // JAB: warning abatement //static void launchThreadsWrapper ( int fd , void *state ); static void killStalledFiltersWrapper ( int fd , void *state ); static void makeCallback ( ThreadEntry *t ) ; // is the caller a thread? bool Threads::amThread () { if ( s_pid == (pthread_t)-1 ) return false; #ifdef PTHREADS // gettid() is a bit newer so not in our libc32... // so kinda fake it. return the "thread" id, not process id. // Threads::amThread() should still work based on thread ids because // the main process has a unique thread id as well. return (pthread_self() != s_pid); #else return ( getpidtid() != s_pid ); #endif } #ifndef PTHREADS static int32_t s_bad = 0; static int32_t s_badPid = -1; #endif #define MAX_PID 32767 #ifndef PTHREADS static int s_errno ; static int s_errnos [ MAX_PID + 1 ]; // this was improvised from linuxthreads/errno.c //#define CURRENT_STACK_FRAME ({ char __csf; &__csf; }) // WARNING: you MUST compile with -DREENTRANT for this to work int *__errno_location (void) { int32_t pid = (int32_t) getpid(); //if ( pid == s_pid ) return &g_errno; if ( pid <= (int32_t)MAX_PID ) return &s_errnos[pid]; s_bad++; s_badPid = pid; return &s_errno; } #endif // stack must be page aligned for mprotect #define THRPAGESIZE 8192 // how much of stack to use as guard space #define GUARDSIZE (32*1024) // . crashed in saving with 800k, so try 1M // . must be multiple of THRPAGESIZE //#define STACK_SIZE ((512+256) * 1024) // pthread_create() cores in calloc() if we don't make STACK_SIZE bigger: #define STACK_SIZE ((512+256+1024) * 1024) // jeta was having some problems, but i don't think they were related to // this stack size of 512k, but i will try boosting to 800k anyway. //#define STACK_SIZE (512 * 1024) // 256k was not enough, try 512k //#define STACK_SIZE (256 * 1024) // at 128k (and even 200k) some threads do not return! why?? there's // obviously some stack overwriting going on! //#define STACK_SIZE (128 * 1024) static char *s_stackAlloc = NULL; static int32_t s_stackAllocSize; //#ifndef PTHREADS static char *s_stack = NULL; static int32_t s_stackSize; static char *s_stackPtrs [ MAX_STACKS ]; //#endif static int32_t s_next [ MAX_STACKS ]; static int32_t s_head ; // returns NULL if none left int32_t Threads::getStack ( ) { if ( s_head == -1 ) return -1; int32_t i = s_head; s_head = s_next [ s_head ]; return i; } void Threads::returnStack ( int32_t si ) { if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; } s_next[si] = s_head; s_head = si; } void Threads::reset ( ) { if ( s_stackAlloc ) { mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE); mfree ( s_stackAlloc , s_stackAllocSize, "ThreadStack"); } s_stackAlloc = NULL; for ( int32_t i = 0 ; i < MAX_THREAD_QUEUES ; i++ ) m_threadQueues[i].reset(); } Threads::Threads ( ) { m_numQueues = 0; m_initialized = false; } void Threads::setPid ( ) { // set s_pid to the main process id #ifdef PTHREADS // on 64bit arch pid is 32 bit and pthread_t is 64 bit s_pid = (pthread_t)pthread_self(); //log(LOG_INFO, // "threads: main process THREAD id = %"UINT32"",(int32_t unsigned)s_pid); pthread_t tid = pthread_self(); sched_param param; int policy; // scheduling parameters of target thread pthread_getschedparam ( tid, &policy, ¶m); //log(LOG_INFO, // "threads: min/max thread priority settings = %"INT32"/%"INT32" (policy=%"INT32")", // (int32_t)sched_get_priority_min(policy), // (int32_t)sched_get_priority_max(policy), // (int32_t)policy); #else s_pid = getpid(); #endif } bool Threads::init ( ) { if ( m_initialized ) return true; m_initialized = true; m_needsCleanup = false; //m_needBottom = false; // sanity check //if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; } if ( sizeof(pid_t) > sizeof(pthread_t) ) { char *xx=NULL;*xx=0; } setPid(); #ifdef _STACK_GROWS_UP return log("thread: Stack growing up not supported."); #endif //g_conf.m_logDebugThread = true; // . damn, this only applies to fork() calls, i guess // . a quick a dirty way to restrict # of threads so we don't explode /* struct rlimit lim; lim.rlim_max = 100; if ( setrlimit(RLIMIT_NPROC,&lim) ) log("thread::init: setrlimit: %s", mstrerror(errno) ); else log("thread::init: set max number of processes to 100"); */ // allow threads until disabled m_disabled = false; /* // # of low priority threads launched and returned m_hiLaunched = 0; m_hiReturned = 0; m_loLaunched = 0; m_loReturned = 0; */ //int32_t m_queryMaxBigDiskThreads ; // > 1M read //int32_t m_queryMaxMedDiskThreads ; // 100k - 1M read //int32_t m_queryMaxSmaDiskThreads ; // < 100k per read // categorize the disk read sizes by these here // g_conf.m_bigReadSize = 0x7fffffff; // g_conf.m_medReadSize = 1000000; // g_conf.m_smaReadSize = 100000; // . register a sleep wrapper to launch threads every 30ms // . sometimes a bunch of threads mark themselves as done and the // cleanUp() handler sees them as all still launched so it doesn't // launch any new ones //if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper)) // return log("thread: Failed to initialize timer callback."); if ( ! g_loop.registerSleepCallback(1000,NULL, killStalledFiltersWrapper,0)) return log("thread: Failed to initialize timer callback2."); // debug //log("thread: main process has pid=%"INT32"",(int32_t)s_pid); // . set priority of the main process to 0 // . setpriority() only applies to SCHED_OTHER threads // . priority of threads with niceness 0 will be 0 // . priority of threads with niceness 1 will be 10 // . priority of threads with niceness 2 will be 20 // . see 'man sched_setscheduler' for detail scheduling info // . no need to call getpid(), 0 for pid means the current process #ifndef PTHREADS if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 ) log("thread: Call to setpriority failed: %s.", mstrerror(errno)); #endif // make multiplier higher for raids, can do more seeks //int32_t m = 1; //#ifdef _LARS_ //m = 3; //#endif // register the types of threads here instead of in main.cpp //if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/)) // try running blaster with 5 threads and you'll // . see like a double degrade in performance for some reason!! // . TODO: why? // . well, this should be controlled g_conf.m_maxSpiderDiskThreads // for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for // niceness 0 and below disk threads // . 100 maxThreads out at a time, 32000 can be queued if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000)) return log("thread: Failed to register thread type." ); // . these are used by Msg5 to merge what it reads from disk // . i raised it from 1 to 2 and got better response time from Msg10 // . i leave one open in case one is used for doing a big merge // with high niceness cuz it would hold up high priority ones! // . TODO: is there a better way? cancel it when UdpServer calls // Threads::suspendLowPriorityThreads() ? // . this used to be 2 but now defaults to 10 in Parms.cpp. i found // i have less int32_t gray lines in the performance graph when i // did that on trinity. int32_t max2 = g_conf.m_maxCpuMergeThreads; if ( max2 < 1 ) max2 = 1; if ( ! g_threads.registerType ( MERGE_THREAD , max2,1000) ) return log("thread: Failed to register thread type." ); // will raising this from 1 to 2 make it faster too? // i raised since global specs new servers have 2 (hyperthreaded?) cpus int32_t max = g_conf.m_maxCpuThreads; if ( max < 1 ) max = 1; if ( ! g_threads.registerType ( INTERSECT_THREAD,max,10) ) return log("thread: Failed to register thread type." ); // filter thread spawned to call popen() to filter an http reply if ( ! g_threads.registerType ( FILTER_THREAD, 2/*maxThreads*/,300) ) return log("thread: Failed to register thread type." ); // RdbTree uses this to save itself if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // . File.cpp spawns a rename thread for doing renames and unlinks // . doing a tight merge on titldb can be ~250 unlinks // . MDW up from 1 to 30 max, after doing a ddump on 3000+ collections // it was taking forever to go one at a time through the unlink // thread queue. seemed like a 1 second space between unlinks. // 1/23/1014 if ( ! g_threads.registerType ( UNLINK_THREAD,5/*maxThreads*/,3000) ) return log("thread: Failed to register thread type." ); // generic multipurpose if ( ! g_threads.registerType (GENERIC_THREAD,20/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // for call SSL_accept() which blocks for 10ms even when socket // is non-blocking... //if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100)) // return log("thread: Failed to register thread type." ); //#ifndef PTHREADS // sanity check if ( GUARDSIZE >= STACK_SIZE ) return log("thread: Stack guard size too big."); // not more than this outstanding int32_t maxThreads = 0; for ( int32_t i = 0 ; i < m_numQueues ; i++ ) maxThreads += m_threadQueues[i].m_maxLaunched; // limit to stack we got if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS; // allocate the stack space s_stackAllocSize = STACK_SIZE * maxThreads + THRPAGESIZE ; // clear stack to help check for overwrites s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" ); if ( ! s_stackAlloc ) return log("thread: Unable to allocate %"INT32" bytes for thread " "stacks.", s_stackAllocSize); log(LOG_INIT,"thread: Using %"INT32" bytes for %"INT32" thread stacks.", s_stackAllocSize,maxThreads); // align s_stack = (char *)(((uint64_t)s_stackAlloc+THRPAGESIZE-1)&~(THRPAGESIZE-1)); // new size s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc); // protect the whole stack while not in use if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) ) log("thread: Call to mprotect failed: %s.",mstrerror(errno)); // test //s_stack[0] = 1; // init the linked list for ( int32_t i = 0 ; i < MAX_STACKS ; i++ ) { if ( i == MAX_STACKS - 1 ) s_next[i] = -1; else s_next[i] = i + 1; s_stackPtrs[i] = s_stack + STACK_SIZE * i; } s_head = 0; // don't do real time stuff for now return true; //#else // . keep stack size small, 128k // . if we get problems, we'll increase this to 256k // . seems like it grows dynamically from 4K to up to 2M as needed //if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) ) // return log("thread: init: pthread_attr_setstacksize: %s", // mstrerror(errno)); //pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED ); //pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM ); // return true; //#endif } // all types should be registered in main.cpp before any threads launch bool Threads::registerType ( char type , int32_t maxThreads , int32_t maxEntries ) { // return false and set g_errno if no more room if ( m_numQueues >= MAX_THREAD_QUEUES ) { g_errno = EBUFTOOSMALL; return log(LOG_LOGIC,"thread: registerType: Too many thread " "queues"); } // initialize the ThreadQueue class for this type if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries)) return false; // we got one more queue now m_numQueues++; return true; } int32_t Threads::getNumThreadsOutOrQueued() { int32_t n = 0; for ( int32_t i = 0 ; i < m_numQueues ; i++ ) { // skip INTERSECT_THREAD, used to intersect termlists to // resolve a query. i've seen these get stuck in an infinite // loop sometimes. if ( i == INTERSECT_THREAD ) continue; // skip filter threads, those get stuck sometimes if ( i == FILTER_THREAD ) continue; // tally up all other threads n += m_threadQueues[i].getNumThreadsOutOrQueued(); } return n; } int32_t Threads::getNumWriteThreadsOut() { return m_threadQueues[DISK_THREAD].getNumWriteThreadsOut(); } int32_t Threads::getNumActiveWriteUnlinkRenameThreadsOut() { // these do not countthreads that are done, and just awaiting join int32_t n = m_threadQueues[DISK_THREAD].getNumWriteThreadsOut(); n += m_threadQueues[UNLINK_THREAD].getNumActiveThreadsOut(); return n; } // . returns false (and may set errno) if failed to launch a thread // . returns true if thread added to queue successfully // . may be launched instantly or later depending on # of threads in the queue bool Threads::call ( char type , int32_t niceness , void *state , void (* callback )(void *state,ThreadEntry *t) , void *(* startRoutine)(void *state,ThreadEntry *t) ) { // debug //return false; #ifdef _VALGRIND_ return false; #endif g_errno = 0; // don't spawn any if disabled if ( m_disabled ) return false; if ( ! g_conf.m_useThreads ) return false; if ( type == DISK_THREAD && ! g_conf.m_useThreadsForDisk ) return false; if ( type == MERGE_THREAD && ! g_conf.m_useThreadsForIndexOps ) return false; if ( type == INTERSECT_THREAD && ! g_conf.m_useThreadsForIndexOps ) return false; if ( type == FILTER_THREAD && ! g_conf.m_useThreadsForSystemCalls ) return false; if ( ! m_initialized && ! init() ) return log("db: Threads init failed." ); // . sanity check // . a thread can NOT call this //if ( getpid() != s_pid ) { // fprintf(stderr,"thread: call: bad engineer\n"); // ::exit(-1); //} // don't launch for now //return false; // . sanity check // . ensure top 4 bytes of state is the callback //if ( *(int32_t *)state != (int32_t)callback ) { // g_errno = EBADENGINEER; // sleep(50000); // return log("thread: call: top 4 bytes of state != callback"); //} // debug msg //log("adding thread to queue, type=%"INT32"",(int32_t)type); // find the type int32_t i; for ( i = 0 ; i < m_numQueues ; i++ ) if ( m_threadQueues[i].m_threadType == type ) break; // bitch if type not added via registerType() call if ( i == m_numQueues ) { g_errno = EBADENGINEER; return log(LOG_LOGIC,"thread: addtoqueue: Unregistered " "thread type %"INT32"",(int32_t)type); } // debug msg //log("thread: call: adding entry for thread"); // . add to this queue // . returns NULL and sets g_errno on error ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state, callback,startRoutine); if ( ! t ) return log("thread: Failed to add entry to thread pool: " "%s.",mstrerror(g_errno)); // debug msg //log("added"); // clear g_errno //g_errno = 0; // . try to launch as many threads as we can // . this sets g_errno on error // . if it has an error, just ignore it, our thread is queued launchLoop: if ( m_threadQueues[i].launchThread2 ( ) ) goto launchLoop; //if ( ! m_threadQueues[i].launchThread2 ( t ) && g_errno ) { // log("thread: failed thread launch: %s",mstrerror(g_errno)); // return false; //} // return false if there was an error launching the thread //if ( g_errno ) return false; // clear g_errno g_errno = 0; // success return true; } // static void launchThreadsWrapper ( int fd , void *state ) { // // debug msg // //if ( g_conf.m_timingDebugEnabled ) // // log("thread: launchThreadsWrapper: entered"); // // clean up // g_threads.cleanUp(NULL,1000); // // and launch // g_threads.launchThreads(); // } static void killStalledFiltersWrapper ( int fd , void *state ) { // bail if no pid if ( g_pid == -1 ) return; // . only kill after ticker reaches a count of 30 // . we are called once every second, so inc it each time int32_t timeout = g_filterTimeout; if ( timeout <= 0 ) timeout = 30; if ( g_ticker++ < timeout ) return; // debug log("threads: killing stalled filter process of age %"INT32" " "seconds and pid=%"INT32".",g_ticker,(int32_t)g_pid); // kill him int err = kill ( g_pid , 9 ); // don't kill again g_pid = -1; if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) ); } // . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it // is from when a thread exited // . we put that signal there using sigqeueue() in Threads::exit() // . this way another thread can be launched right away int32_t Threads::launchThreads ( ) { // stop launching threads if trying to exit. // only launch save tree threads. so if in the middle of saving // we allow it to complete? if ( g_process.m_mode == EXIT_MODE ) return 0; // try launching from each queue int32_t numLaunched = 0; // try to launch DISK threads last so cpu-based threads get precedence for ( int32_t i = m_numQueues - 1 ; i >= 0 ; i-- ) { // clear g_errno g_errno = 0; // launch as many threads as we can from queue #i while ( m_threadQueues[i].launchThread2( ) ) numLaunched++; // continue if no g_errno set if ( ! g_errno ) continue; // otherwise bitch about it log("thread: Failed to launch thread: %s.",mstrerror(g_errno)); } // clear g_errno g_errno = 0; return numLaunched; } // . will cancel currently running low priority threads // . will prevent any low priority threads from launching // . will only cancel disk threads for now void Threads::suspendLowPriorityThreads() { // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS."); // just cancel disk threads for now for ( int32_t i = 0 ; i < m_numQueues; i++ ) m_threadQueues[i].suspendLowPriorityThreads(); } void Threads::resumeLowPriorityThreads() { // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS."); for ( int32_t i = 0 ; i < m_numQueues; i++ ) m_threadQueues[i].resumeLowPriorityThreads(); } //void Threads::cancelLowPriorityThreads () { // for ( int32_t i = 0 ; i < m_numQueues; i++ ) // m_threadQueues[i].cancelLowPriorityThreads(); //} void checkList ( ThreadEntry **headPtr , ThreadEntry **tailPtr ) { ThreadEntry *t = *headPtr; // another check if ( tailPtr && *headPtr && ! *tailPtr ) { char *xx=NULL;*xx=0; } if ( tailPtr && ! *headPtr && *tailPtr ) { char *xx=NULL;*xx=0; } if ( ! t ) return; // head can not have a prev if ( t->m_prevLink ) { char *xx=NULL;*xx=0; } ThreadEntry *last = NULL; for ( ; t ; t = t->m_nextLink ) { if ( t->m_prevLink != last ) { char *xx=NULL;*xx=0; } if ( last && last->m_nextLink != t ) { char *xx=NULL;*xx=0; } last = t; } if ( ! tailPtr ) return; t = *tailPtr; // tail can not have a next if ( t->m_nextLink ) { char *xx=NULL;*xx=0; } last = NULL; for ( ; t ; t = t->m_prevLink ) { if ( t->m_nextLink != last ) { char *xx=NULL;*xx=0; } if ( last && last->m_prevLink != t ) { char *xx=NULL;*xx=0; } last = t; } } // remove from a head-only linked list void removeLink ( ThreadEntry **headPtr , ThreadEntry *t ) { // if ( g_conf.m_logDebugThread ) // logf(LOG_DEBUG,"thread: removeLink [t=0x%"PTRFMT"]", // (PTRTYPE)t); if ( g_conf.m_logDebugThread ) checkList ( headPtr , NULL ); if ( t->m_prevLink ) t->m_prevLink->m_nextLink = t->m_nextLink; else *headPtr = t->m_nextLink; if ( t->m_nextLink ) t->m_nextLink->m_prevLink = t->m_prevLink; t->m_nextLink = NULL; t->m_prevLink = NULL; if ( g_conf.m_logDebugThread ) checkList ( headPtr , NULL ); } // MDW: verify this::::: and the one above!!!!!!!!!!!!!!!! void removeLink2 ( ThreadEntry **headPtr , ThreadEntry **tailPtr , ThreadEntry *t ) { // if ( g_conf.m_logDebugThread ) // logf(LOG_DEBUG,"thread: removeLink2 [t=0x%"PTRFMT"]", // (PTRTYPE)t); if ( g_conf.m_logDebugThread ) checkList ( headPtr , tailPtr ); if ( t->m_prevLink ) t->m_prevLink->m_nextLink = t->m_nextLink; else *headPtr = t->m_nextLink; if ( t->m_nextLink ) t->m_nextLink->m_prevLink = t->m_prevLink; else *tailPtr = t->m_prevLink; t->m_nextLink = NULL; t->m_prevLink = NULL; if ( g_conf.m_logDebugThread ) checkList ( headPtr , tailPtr ); } // add to a head/tail linked list's tail void addLinkToTail ( ThreadEntry **headPtr , ThreadEntry **tailPtr , ThreadEntry *t ) { // if ( g_conf.m_logDebugThread ) // logf(LOG_DEBUG,"thread: addLinkToTail [t=0x%"PTRFMT"]", // (PTRTYPE)t); if ( g_conf.m_logDebugThread ) checkList ( headPtr , tailPtr ); if ( *tailPtr ) { (*tailPtr)->m_nextLink = t; t->m_nextLink = NULL; t->m_prevLink = *tailPtr; *tailPtr = t; // t is the new tail } else { *headPtr = t; *tailPtr = t; t->m_nextLink = NULL; t->m_prevLink = NULL; } if ( g_conf.m_logDebugThread ) checkList ( headPtr , tailPtr ); } // add to a head/tail linked list's head void addLinkToHead ( ThreadEntry **headPtr , ThreadEntry **tailPtr , ThreadEntry *t ) { // if ( g_conf.m_logDebugThread ) // logf(LOG_DEBUG,"thread: addLinkToHead [t=0x%"PTRFMT"]", // (PTRTYPE)t); if ( g_conf.m_logDebugThread ) checkList ( headPtr , tailPtr ); if ( *headPtr ) { (*headPtr)->m_prevLink = t; t->m_prevLink = NULL; t->m_nextLink = *headPtr; *headPtr = t; // t is the new head } else { *headPtr = t; *tailPtr = t; t->m_nextLink = NULL; t->m_prevLink = NULL; } if ( g_conf.m_logDebugThread ) checkList ( headPtr , tailPtr ); } // add to a head-only linked list void addLink ( ThreadEntry **headPtr , ThreadEntry *t ) { // if ( g_conf.m_logDebugThread ) // logf(LOG_DEBUG,"thread: addLink [t=0x%"PTRFMT"]", // (PTRTYPE)t); if ( g_conf.m_logDebugThread ) checkList ( headPtr , NULL ); if ( *headPtr ) { (*headPtr)->m_prevLink = t; t->m_prevLink = NULL; t->m_nextLink = *headPtr; *headPtr = t; // t is the new head } else { *headPtr = t; t->m_nextLink = NULL; t->m_prevLink = NULL; } if ( g_conf.m_logDebugThread ) checkList ( headPtr , NULL ); } /////////////////////////////////////////////////////////////////////// // functions for ThreadQueue /////////////////////////////////////////////////////////////////////// ThreadQueue::ThreadQueue ( ) { m_entries = NULL; m_entriesSize = 0; } void ThreadQueue::reset ( ) { if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" ); m_entries = NULL; m_top = 0; } bool ThreadQueue::init ( char threadType, int32_t maxThreads, int32_t maxEntries ) { m_threadType = threadType; m_launched = 0; m_returned = 0; m_maxLaunched = maxThreads; // # of low priority threads launched and returned /* m_hiLaunched = 0; m_hiReturned = 0; m_mdLaunched = 0; m_mdReturned = 0; m_loLaunched = 0; m_loReturned = 0; // we now count write threads so we can limit that m_writesLaunched = 0; m_writesReturned = 0; // these are for disk threads, which we now limit based on read sizes m_hiLaunchedBig = 0; m_hiReturnedBig = 0; m_mdLaunchedBig = 0; m_mdReturnedBig = 0; m_loLaunchedBig = 0; m_loReturnedBig = 0; m_hiLaunchedMed = 0; m_hiReturnedMed = 0; m_mdLaunchedMed = 0; m_mdReturnedMed = 0; m_loLaunchedMed = 0; m_loReturnedMed = 0; m_hiLaunchedSma = 0; m_hiReturnedSma = 0; m_mdLaunchedSma = 0; m_mdReturnedSma = 0; m_loLaunchedSma = 0; m_loReturnedSma = 0; */ //m_entriesUsed = 0; //m_top = 0; m_isLowPrioritySuspended = false; // alloc space for entries m_maxEntries = maxEntries; m_entriesSize = sizeof(ThreadEntry)*m_maxEntries; m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" ); if ( ! m_entries ) return log("thread: Failed to allocate %"INT32" bytes " "for thread queue.",m_entriesSize); // debug msg //log("INIT CALLED. setting all m_isDone to 1."); // clear m_isOccupied et al for new guys //for ( int32_t i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) { for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { m_entries[i].m_isOccupied = false; m_entries[i].m_isLaunched = false; m_entries[i].m_isDone = true; m_entries[i].m_qnum = threadType; m_entries[i].m_stack = NULL; } m_emptyHead = NULL; m_waitHead0 = NULL; m_waitHead1 = NULL; m_waitHead2 = NULL; m_waitHead3 = NULL; m_waitHead4 = NULL; m_waitHead5 = NULL; m_waitHead6 = NULL; m_waitTail0 = NULL; m_waitTail1 = NULL; m_waitTail2 = NULL; m_waitTail3 = NULL; m_waitTail4 = NULL; m_waitTail5 = NULL; m_waitTail6 = NULL; m_launchedHead = NULL; // do not spam the log with log debug msgs even if it is on char debug = g_conf.m_logDebugThread; g_conf.m_logDebugThread = 0; for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *t = &m_entries[i]; t->m_prevLink = NULL; t->m_nextLink = NULL; addLink ( &m_emptyHead , t ); } g_conf.m_logDebugThread = debug; return true; } int32_t ThreadQueue::getNumActiveThreadsOut() { int32_t n = 0; for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( ! e->m_isOccupied ) continue; if ( ! e->m_isLaunched ) continue; // if it is done and just waiting for a join, do not count if ( e->m_isDone ) continue; n++; } return n; } int32_t ThreadQueue::getNumThreadsOutOrQueued() { // MDW: we also need to count threads that are returned but need their // callback called so, in the case of RdbDump, the rdblist that was written // to disk can update the rdbmap before it gets saved, so it doesn't get // out of sync. Process.cpp calls .suspendMerge() to make sure that all // merge operations are suspended as well. int32_t n = 0; for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( e->m_isOccupied ) n++; } return n; /* int32_t n = m_launched - m_returned; for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( ! e->m_isOccupied ) continue; if ( e->m_isLaunched ) continue; if ( e->m_isDone ) continue; // do not count "reads", only count writes //if ( m_threadType == DISK_THREAD && e->m_state ) { // FileState *fs = (FileState *)e->m_state; // if ( fs->m_doWrite ) continue; //} } return n; */ } int32_t ThreadQueue::getNumWriteThreadsOut () { // only consider disk threads if ( m_threadType != DISK_THREAD ) return 0; int32_t n = 0; for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( ! e->m_isOccupied ) continue; if ( ! e->m_isLaunched ) continue; if ( e->m_isDone ) continue; FileState *fs = (FileState *)e->m_state; if ( ! fs ) continue; if ( ! fs->m_doWrite ) continue; n++; } return n; } // return NULL and set g_errno on error ThreadEntry *ThreadQueue::addEntry ( int32_t niceness , void *state , void (* callback )(void *state, ThreadEntry *t) , void *(* startRoutine)(void *state, ThreadEntry *t) ) { // if we are 90% full and niceness is > 0, knock it off // int32_t max = m_maxEntries; // if ( m_threadType == DISK_THREAD && niceness > 0 ) { // max = (m_maxEntries * 90) / 100; // if ( max <= 0 ) max = 1; // } ThreadEntry *t = m_emptyHead; if ( ! t ) { g_errno = ENOTHREADSLOTS; static time_t s_time = 0; time_t now = getTime(); if ( now - s_time > 5 ) { log("thread: Could not add thread to queue. Already " "have %"INT32" entries.",m_maxEntries); s_time = now; } return NULL; } // sanity if ( t->m_isOccupied ) { char *xx=NULL;*xx=0; } // debug test //if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; } // get first available entry, not in use //int32_t i; //for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) // for ( i = 0 ; i < max ; i++ ) // if ( ! m_entries[i].m_isOccupied ) break; // caution // //if ( i >= MAX_THREAD_ENTRIES ) { // if ( i >= max ) { // g_errno = ENOTHREADSLOTS; // static time_t s_time = 0; // time_t now = getTime(); // if ( now - s_time > 5 ) { // log("thread: Could not add thread to queue. Already " // "have %"INT32" entries.",max); // s_time = now; // } // return NULL; // } // debug msg //fprintf(stderr,"addEntry my pid=%"UINT32"\n", (int32_t)getpid() ); // get an available entry //ThreadEntry *t = &m_entries [ i ]; // debug msg //log("claiming entry state=%"UINT32", occupied=%"INT32"",(int32_t)t->m_state, // (int32_t)t->m_isOccupied); // stick it in t->m_niceness = niceness; t->m_state = state; t->m_callback = callback; t->m_startRoutine = startRoutine; t->m_isOccupied = true; t->m_isCancelled = false; t->m_stack = NULL; // debug msg //log("setting t=%"UINT32" m_isDone to 0", (int32_t)t ); t->m_isDone = false; t->m_isLaunched = false; t->m_queuedTime = gettimeofdayInMilliseconds(); t->m_readyForBail = false; t->m_allocBuf = NULL; t->m_allocSize = 0; t->m_errno = 0; t->m_bestHeadPtr = NULL; t->m_bestTailPtr = NULL; // and when the ohcrap callback gets called and the thread // is cleaned up it will check the FileState readsize and // m_isWrite to see which launch counts to decrement, so // since FileState will be corrupted, we need to store // this info directly into the thread entry. if ( m_threadType == DISK_THREAD && t->m_state ) { FileState *fs = (FileState *)t->m_state; t->m_bytesToGo = fs->m_bytesToGo; t->m_doWrite = fs->m_doWrite ; } else { t->m_bytesToGo = 0; t->m_doWrite = false; } // inc the used count //m_entriesUsed++; // debug msg //log("m_entriesUsed now %"INT32"",m_entriesUsed); // might have to inc top as well //if ( i == m_top ) m_top++; // there's only two linked lists we can wait in if we are not disk if ( m_threadType != DISK_THREAD ) { ThreadEntry **bestHeadPtr = NULL; ThreadEntry **bestTailPtr = NULL; if ( niceness <= 0 ) { bestHeadPtr = &m_waitHead0; bestTailPtr = &m_waitTail0; } // 'merge' threads from disk merge ops have niceness 1 else if ( niceness == 1 ) { bestHeadPtr = &m_waitHead1; bestTailPtr = &m_waitTail1; } // niceness is 2? MAX_NICENESS else { bestHeadPtr = &m_waitHead2; bestTailPtr = &m_waitTail2; } // remove from empty list removeLink ( &m_emptyHead , t ); // add to waiting list at the end addLinkToTail ( bestHeadPtr , bestTailPtr , t ); // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "queued %s thread for launch1. " "niceness=%"INT32". ", (PTRTYPE)t, getThreadType(), (int32_t)niceness ); return t; } // // otherwise we are a disk thread, there's 7 linked lists to use // // shortcuts //int32_t rs = t->m_bytesToGo; char nice = t->m_niceness; // get best thread candidate from best linked list of candidates ThreadEntry **bestHeadPtr = NULL; ThreadEntry **bestTailPtr = NULL; // short/med/long high priority (niceness 0) disk reads in head0/1/2 // but we can't launch one more if already at our quota. if ( ! bestHeadPtr && nice == 0 ) { //&& rs <= g_conf.m_smaReadSize ) { bestHeadPtr = &m_waitHead0; bestTailPtr = &m_waitTail0; } // if ( ! bestHeadPtr && nice == 0 && rs <= g_conf.m_medReadSize ) { // bestHeadPtr = &m_waitHead1; // bestTailPtr = &m_waitTail1; // } // if ( ! bestHeadPtr && nice == 0 ) { // bestHeadPtr = &m_waitHead2; // bestTailPtr = &m_waitTail2; // } // low priority (merge or dump) disk WRITES go in waithead4 if ( ! bestHeadPtr && t->m_doWrite ) { bestHeadPtr = &m_waitHead4; bestTailPtr = &m_waitTail4; } // niceness 1 read threads here if ( ! bestHeadPtr && nice == 1 ) { bestHeadPtr = &m_waitHead5; bestTailPtr = &m_waitTail5; } // niceness 2 read threads here if ( ! bestHeadPtr ) { bestHeadPtr = &m_waitHead6; bestTailPtr = &m_waitTail6; } if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "remove from empty list, add to wait list", (PTRTYPE)t); // remove from empty list removeLink ( &m_emptyHead , t ); // sanity if ( m_emptyHead == t ) { char *xx=NULL;*xx=0; } // add to the new waiting list at the end addLinkToTail ( bestHeadPtr , bestTailPtr , t ); // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "queued %s thread for launch2. " "niceness=%"INT32". ", (PTRTYPE)t, getThreadType(), (int32_t)niceness ); // success return t; } int32_t Threads::timedCleanUp (int32_t maxTime, int32_t niceness) { // skip it if exiting if ( g_process.m_mode == EXIT_MODE ) return 0; if ( ! m_needsCleanup ) return 0; //if ( g_inSigHandler ) return 0; int64_t startTime = gettimeofdayInMillisecondsLocal(); int64_t took = 0; if ( niceness >= MAX_NICENESS ) m_needsCleanup = false; //for ( int32_t i = -1 ; i <= niceness ; i++ ) { for ( int32_t i = 0 ; i <= niceness ; i++ ) { for ( int32_t j = 0 ; j < m_numQueues ; j++ ) m_threadQueues[j].timedCleanUp ( niceness ); launchThreads(); if ( maxTime < 0 ) continue; took = startTime - gettimeofdayInMillisecondsLocal(); if ( took <= maxTime ) continue; // ok, we have to cut if short... m_needsCleanup = true; break; } return took; } bool Threads::isHittingFile ( BigFile *bf ) { return m_threadQueues[DISK_THREAD].isHittingFile(bf); } bool ThreadQueue::isHittingFile ( BigFile *bf ) { // loop through candidates for ( int32_t i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // must not be done if ( t->m_isDone ) continue; // must be launched.. really?? //if ( ! t->m_isLaunched ) continue; // must be a read if ( t->m_startRoutine != readwriteWrapper_r ) continue; // int16_tcut FileState *fs = (FileState *)t->m_state; // get bigfile ptr if ( fs->m_this == bf ) return true; } return false; } void Threads::bailOnReads ( ) { m_threadQueues[DISK_THREAD].bailOnReads(); } // Process.cpp calls these callbacks before their time in order to // set EDISKSTUCK void ThreadQueue::bailOnReads ( ) { // note it log("threads: bypassing read threads"); ThreadEntry *t = m_launchedHead; ThreadEntry *nextLink = NULL; // loop through candidates //for ( int32_t i = 0 ; i < m_top; i++ ) { for ( ; t ; t = nextLink ) { // do it here in case we modify the linked list below nextLink = t->m_nextLink; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet //if ( ! t->m_isLaunched ) continue; // must be niceness 0 if ( t->m_niceness != 0 ) continue; // must not be done if ( t->m_isDone ) continue; // must not have already called callback if ( t->m_callback == ohcrap ) continue; // must be a read if ( t->m_startRoutine != readwriteWrapper_r ) continue; // int16_tcut FileState *fs = (FileState *)t->m_state; // do not stop writes if ( fs->m_doWrite ) continue; // must be niceness 0 too! if ( fs->m_niceness != 0 ) continue; // what is this? unlaunched... //if ( t->m_pid == 0 ) continue; // can only bail on a thread after it copies its FileState // class into its stack so we can bypass it and free the // original FileState without causing a core. if thread // is not yet launched we have to call the callback here too // otherwise it never gets launched until the disk is unstuck! if ( ! t->m_readyForBail && t->m_isLaunched ) continue; // set error t->m_errno = EDISKSTUCK; // set this too g_errno = EDISKSTUCK; // do not allow caller to free the alloc'd buf in case // its read finally comes through! t->m_allocBuf = fs->m_allocBuf; t->m_allocSize = fs->m_allocSize; fs->m_allocBuf = NULL; fs->m_allocSize = 0; // call it t->m_callback ( t->m_state , t ); // do not re-call it... t->m_callback = ohcrap; // invalidate state (FileState usually) t->m_state = NULL; // do not launch if not yet launched if ( t->m_isLaunched ) continue; // delete him if not yet launched, otherwise we try to // launch it later with a corrupted/unstable FileState... // and that causes our launch counts to get out of whack i // think... t->m_isOccupied = false; // note it log("threads: bailing unlaunched thread"); // remove from linked list then removeLink ( &m_launchedHead , t ); addLink ( &m_emptyHead , t ); // do we have to decrement top // if ( m_top == i + 1 ) // while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) // m_top--; } } // BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to // ohcrap() because it was taking too long to do its read and we prematurely // called its callback above in bailOnReads(). In that case we still have to // free the disk read buffer which was never used. And doneWrapper() in // BigFile.cpp is never called. void ohcrap ( void *state , ThreadEntry *t ) { // free the read buffer here then if ( t->m_allocBuf ) mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" ); log("threads: got one"); } // . cleans up any threads that have exited // . their m_isDone should be set to true // . don't process threads whose niceness is > maxNiceness // . return true if we cleaned one up bool ThreadQueue::timedCleanUp ( int32_t maxNiceness ) { // top: int32_t numCallbacks = 0; ThreadEntry *t = m_launchedHead; ThreadEntry *nextLink = NULL; // loop through candidates for ( ; t ; t = nextLink ) { // get it here in case we remove 't' from the linked list below nextLink = t->m_nextLink; // point to it //ThreadEntry *t = &m_entries[i]; // skip if not qualified if ( t->m_niceness > maxNiceness ) continue; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet if ( ! t->m_isLaunched ) continue; // . we were segfaulting right here before because the thread // was setting t->m_pid and at this point it was not // set so t->m_pid was a bogus value // . thread may have seg faulted, in which case sigbadhandler() // in Loop.cpp will get it and set errno to 0x7fffffff #ifndef PTHREADS // MDW: i hafta take this out because the errno_location thing // is not working on the newer gcc if ( ! t->m_isDone && t->m_pid >= 0 && s_errnos [t->m_pid] == 0x7fffffff ) { log("thread: Got abnormal thread termination. Seems " "like the thread might have cored."); s_errnos[t->m_pid] = 0; goto again; } #endif // skip if not done yet if ( ! t->m_isDone ) continue; #ifdef PTHREADS // if pthread_create() failed it returns the errno and we // needsJoin is false, so do not try to join // to a thread if we did not create it, lest pthread_join() // cores if ( t->m_needsJoin ) { // . join up with that thread // . damn, sometimes he can block forever on his // call to sigqueue(), int64_t startTime = gettimeofdayInMillisecondsLocal(); int64_t took; int32_t status = pthread_join ( t->m_joinTid , NULL ); took = startTime - gettimeofdayInMillisecondsLocal(); if ( took > 50 ) { log("threads: pthread_join took %i ms", (int)took); } if ( status != 0 ) { log("threads: pthread_join %"INT64" = %s (%"INT32")", (int64_t)t->m_joinTid,mstrerror(status), status); } // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined1 with " "t=0x%"PTRFMT" " "jointid=0x%"XINT64".", (PTRTYPE)t,(int64_t)t->m_joinTid); // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; } #else again: int status ; pid_t pid = waitpid ( t->m_pid , &status , 0 ); int err = errno; // debug the waitpid if ( g_conf.m_logDebugThread || g_process.m_exiting ) log(LOG_DEBUG,"thread: Waiting for t=0x%"PTRFMT" " "pid=%"INT32".", (PTRTYPE)t,(int32_t)t->m_pid); // bitch and continue if join failed if ( pid != t->m_pid ) { // waitpid() gets interrupted by various signals so // we need to repeat (SIGCHLD?) if ( err == EINTR ) goto again; log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.", (int32_t)t->m_pid,(int32_t)pid,mstrerror(err)); continue; } // if status not 0 then process got abnormal termination if ( ! WIFEXITED(status) ) { if ( WIFSIGNALED(status) ) log("thread: Child process (pid=%i) exited " "from unhandled signal number %"INT32".", pid,(int32_t)WTERMSIG(status)); else log("thread: Child process (pid=%i) exited " "for unknown reason." , pid ); } //mfree ( t->m_stack , STACK_SIZE , "Threads" ); // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".", (int32_t)t->m_pid,(int32_t)t->m_pid); #endif // we may get cleaned up and re-used and our niceness reassignd // right after set m_isDone to true, so save niceness //int32_t niceness = t->m_niceness; char qnum = t->m_qnum; ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum]; if ( tq != this ) { char *xx = NULL; *xx = 0; } // get read size before cleaning it up -- it could get nuked //int32_t rs = 0; bool isWrite = false; if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) { //FileState *fs = (FileState *)t->m_state; //rs = t->m_bytesToGo; isWrite = t->m_doWrite ; } /* if ( niceness <= 0) tq->m_hiReturned++; else if ( niceness == 1) tq->m_mdReturned++; else if ( niceness >= 2) tq->m_loReturned++; // deal with the tiers for disk threads based on read sizes if ( tq->m_threadType == DISK_THREAD ) { // writes are special cases if ( isWrite ) m_writesReturned++; if ( rs >= 0 && niceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) tq->m_loReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_loReturnedMed++; else tq->m_loReturnedSma++; } else if ( rs >= 0 && niceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) tq->m_mdReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_mdReturnedMed++; else tq->m_mdReturnedSma++; } else if ( rs >= 0 ) { if ( rs > g_conf.m_medReadSize ) tq->m_hiReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_hiReturnedMed++; else tq->m_hiReturnedSma++; } } */ // now count him as returned m_returned++; // prepare for relaunch if we were cancelled if ( t->m_isCancelled ) { t->m_isCancelled = false; t->m_isLaunched = false; t->m_isDone = false; // take out of launched linked list removeLink ( &m_launchedHead , t ); // get the waiting linked list from whence we came ThreadEntry **bestHeadPtr = t->m_bestHeadPtr; ThreadEntry **bestTailPtr = t->m_bestTailPtr; // put BACK into the waiting linked list at the head addLinkToHead ( bestHeadPtr , bestTailPtr , t ); log("thread: Thread cancelled. Preparing thread " "for relaunch"); continue; } numCallbacks++; // not running any more t->m_isLaunched = false; // not occupied any more t->m_isOccupied = false; // do we have to decrement top // if ( m_top == i + 1 ) // while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) // m_top--; if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "remove from launched list, add to empty list", (PTRTYPE)t); // now move it to the empty list removeLink ( &m_launchedHead , t ); addLink ( &m_emptyHead , t ); // send a cancel sig to the thread in case it's still there //int err = pthread_cancel ( t->m_tid ); //if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s", // mstrerror(err) ); // one less entry occupied //m_entriesUsed--; // debug msg //log("m_entriesUsed now %"INT32"",m_entriesUsed); // one more returned //m_returned++; // clear the g_errno in case set by a previous callback //g_errno = 0; // launch as many threads as we can before calling the // callback since this may hog the CPU like Msg20 does //g_threads.launchThreads(); g_errno = 0; //g_loop.startBlockedCpuTimer(); //only allow a quickpoll if we are nice. //g_loop.canQuickPoll(t->m_niceness); makeCallback ( t ); //int64_t took = gettimeofdayInMilliseconds()-startTime; //if(took > 8 && maxNiceness > 0) { // if(g_conf.m_sequentialProfiling) // log(LOG_TIMING, // "admin: Threads spent %"INT64" ms to callback " // "%"INT32" callbacks, nice: %"INT32"", // took, numCallbacks, maxNiceness); // g_threads.m_needBottom = true; // maxNiceness = 0; //} // clear errno again g_errno = 0; if ( g_conf.m_logDebugThread ) { int64_t now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done1. " "active=%"INT32" " "time since queued = %"UINT64" ms " "time since launch = %"UINT64" ms " "time since pre-exit = %"UINT64" ms " "time since exit = %"UINT64" ms", (PTRTYPE)t, getThreadType() , (int32_t)(m_launched - m_returned) , (uint64_t)(now - t->m_queuedTime), (uint64_t)(now - t->m_launchedTime), (uint64_t)(now - t->m_preExitTime) , (uint64_t)(now - t->m_exitTime) ); } } //since we need finer grained control in loop, we no longer collect //the callbacks, sort, then call them. we now call them right away //that way we can break out if we start taking too long and //give control back to udpserver. return numCallbacks != 0; } void makeCallback ( ThreadEntry *t ) { // sanity check - if in a high niceness callback, we should // only be calling niceness 0 callbacks here // no, this is only called from sleep wrappers originating from // Loop.cpp, so we should be ok //if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; } // save it int32_t saved = g_niceness; // log it now if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: enter thread callback t=0x%"PTRFMT" " //"type=%s " "state=0x%"PTRFMT" " "nice=%"INT32"", (PTRTYPE)t, //getThreadType(), (PTRTYPE)t->m_state, (int32_t)t->m_niceness); // time it? int64_t start; if ( g_conf.m_maxCallbackDelay >= 0 ) start = gettimeofdayInMillisecondsLocal(); // then set it if ( t->m_niceness >= 1 ) g_niceness = 1; else g_niceness = 0; t->m_callback ( t->m_state , t ); // time it? if ( g_conf.m_maxCallbackDelay >= 0 ) { int64_t elapsed = gettimeofdayInMillisecondsLocal() - start; if ( elapsed >= g_conf.m_maxCallbackDelay ) log("threads: Took %"INT64" ms to call " "thread callback niceness=%"INT32"", elapsed,(int32_t)saved); } // log it now if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread ) log(LOG_DEBUG,"loop: exit thread callback t=0x%"PTRFMT" " //"type=%s " "nice=%"INT32"", (PTRTYPE)t, //getThreadType(), (int32_t)t->m_niceness); // restore global niceness g_niceness = saved; } bool Threads::cleanUp ( ThreadEntry *t , int32_t maxNiceness ) { bool didSomething = false; loop: // assume no more cleanup needed m_needsCleanup = false; //m_needBottom = false; // debug msg //log("cleanUp"); // debug msg //log("cleaning up exited threads and calling callbacks"); for ( int32_t i = 0 ; i < m_numQueues ; i++ ) { didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness ); // . if we broke from the loop //if(m_needBottom) maxNiceness = 0; } // . loop more if we got a new one // . thread will set this when about to exit // . waitpid() may be interrupted by a SIGCHLD and not get his pid if ( m_needsCleanup ) goto loop; return didSomething; } // . cleans up any threads that have exited // . their m_isDone should be set to true // . don't process threads whose niceness is > maxNiceness bool ThreadQueue::cleanUp ( ThreadEntry *tt , int32_t maxNiceness ) { // call all callbacks after all threads are cleaned up void (* callbacks[64])(void *state,ThreadEntry *); void *states [64]; int64_t times [64]; int64_t times2 [64]; int64_t times3 [64]; int64_t times4 [64]; ThreadEntry *tids [64]; int64_t startTime = gettimeofdayInMilliseconds(); // top: ThreadEntry *t = m_launchedHead; ThreadEntry *nextLink = NULL; int32_t numCallbacks = 0; // loop through candidates //for ( int32_t i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) { for ( ; t && numCallbacks < 64 ; t = nextLink ) { // do it here in case we modify the linked list below nextLink = t->m_nextLink; // point to it //ThreadEntry *t = &m_entries[i]; // skip if not qualified if ( t->m_niceness > maxNiceness ) { //if(t->m_isDone) { // g_threads.m_needBottom = true; // //g_threads.m_needsCleanup = true; //} continue; } // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet if ( ! t->m_isLaunched ) continue; // . we were segfaulting right here before because the thread // was setting t->m_pid and at this point it was not // set so t->m_pid was a bogus value // . thread may have seg faulted, in which case sigbadhandler() // in Loop.cpp will get it and set errno to 0x7fffffff #ifndef PTHREADS // MDW: i hafta take this out because the errno_location thing // is not working on the newer gcc if ( ! t->m_isDone && t->m_pid >= 0 && s_errnos [t->m_pid] == 0x7fffffff ) { log("thread: Got abnormal thread termination. Seems " "like the thread might have cored."); s_errnos[t->m_pid] = 0; goto again; } #endif // skip if not done yet if ( ! t->m_isDone ) continue; #ifdef PTHREADS if ( t->m_needsJoin ) { // . join up with that thread // . damn, sometimes he can block forever on his // call to sigqueue(), int32_t status = pthread_join ( t->m_joinTid , NULL ); if ( status != 0 ) { log("threads: " "pthread_join2 %"INT64" = %s (%"INT32")", (int64_t)t->m_joinTid,mstrerror(status), status); } // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined2 with " "t=0x%"PTRFMT" " "jointid=0x%"XINT64".", (PTRTYPE)t,(int64_t)t->m_joinTid); // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; } #else again: int status ; pid_t pid = waitpid ( t->m_pid , &status , 0 ); int err = errno; // debug the waitpid if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: Waiting for " "t=0x%"PTRFMT" pid=%"INT32".", (PTRTYPE)t,(int32_t)t->m_pid); // bitch and continue if join failed if ( pid != t->m_pid ) { // waitpid() gets interrupted by various signals so // we need to repeat (SIGCHLD?) if ( err == EINTR ) goto again; log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.", (int32_t)t->m_pid,(int32_t)pid,mstrerror(err)); continue; } // if status not 0 then process got abnormal termination if ( ! WIFEXITED(status) ) { if ( WIFSIGNALED(status) ) log("thread: Child process (pid=%i) exited " "from unhandled signal number %"INT32".", pid,(int32_t)WTERMSIG(status)); else log("thread: Child process (pid=%i) exited " "for unknown reason." , pid ); } //mfree ( t->m_stack , STACK_SIZE , "Threads" ); // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; #endif // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".", (int32_t)t->m_pid,(int32_t)t->m_pid); // we may get cleaned up and re-used and our niceness reassignd // right after set m_isDone to true, so save niceness //int32_t niceness = t->m_niceness; char qnum = t->m_qnum; ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum]; if ( tq != this ) { char *xx = NULL; *xx = 0; } /* // get read size before cleaning it up -- it could get nuked int32_t rs = 0; bool isWrite = false; if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) { //FileState *fs = (FileState *)t->m_state; rs = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( niceness <= 0) tq->m_hiReturned++; else if ( niceness == 1) tq->m_mdReturned++; else if ( niceness >= 2) tq->m_loReturned++; // deal with the tiers for disk threads based on read sizes if ( tq->m_threadType == DISK_THREAD ) { // writes are special cases if ( isWrite ) m_writesReturned++; if ( rs >= 0 && niceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) tq->m_loReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_loReturnedMed++; else tq->m_loReturnedSma++; } else if ( rs >= 0 && niceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) tq->m_mdReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_mdReturnedMed++; else tq->m_mdReturnedSma++; } else if ( rs >= 0 ) { if ( rs > g_conf.m_medReadSize ) tq->m_hiReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_hiReturnedMed++; else tq->m_hiReturnedSma++; } } */ // . we should count down here, not in the master thread // . solves Problem #2 ? // . TODO: this is not necessaruly atomic we should set // t->m_aboutToExit to true so cleanUp can periodically set // m_returned to what it should be!!! //g_threads.m_threadQueues[qnum].m_returned++; // now count him as returned m_returned++; // prepare for relaunch if we were cancelled if ( t->m_isCancelled ) { t->m_isCancelled = false; t->m_isLaunched = false; t->m_isDone = false; // take out of the linked list of launched threads removeLink ( &m_launchedHead , t ); // what waiting linked list did it come from? // get the waiting linked list from whence we came ThreadEntry **bestHeadPtr = t->m_bestHeadPtr; ThreadEntry **bestTailPtr = t->m_bestTailPtr; // and put it back at the head addLinkToHead ( bestHeadPtr , bestTailPtr , t ); log("thread: Thread cancelled. Preparing thread " "for relaunch"); continue; } // debug msg //log("[%"UINT32"] CLEANING UP THREAD type=%"INT32", numLaunched=%"INT32"", // m_entries[i].m_tid , m_threadType , m_launched ); // remove it // debug msg //log("CLN TID=%"UINT32" t=%"UINT32"",(int32_t)t->m_tid , (int32_t)t); //log("thread callback for tid=%"UINT32"",(int32_t)t->m_tid ); // . save important stuff before freeing up the ThreadEntry // for possible take over. // . calling the callback may launch a thread which may // claim THIS thread entry, t //void (* callback)(void *state); //callback = t->m_callback; //void *state = t->m_state; callbacks [ numCallbacks ] = t->m_callback; states [ numCallbacks ] = t->m_state; times [ numCallbacks ] = t->m_queuedTime; times2 [ numCallbacks ] = t->m_launchedTime; times3 [ numCallbacks ] = t->m_preExitTime; times4 [ numCallbacks ] = t->m_exitTime; tids [ numCallbacks ] = t; numCallbacks++; // SOLUTION: before calling the callback which may launch // another thread with this same tid, thus causing an error, // we should set these to false first: // not running any more t->m_isLaunched = false; // not occupied any more t->m_isOccupied = false; // do we have to decrement top //if ( m_top == i + 1 ) // while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) // m_top--; if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "remove from launched list, add to empty list", (PTRTYPE)t); // now move it to the empty list removeLink ( &m_launchedHead , t ); addLink ( &m_emptyHead , t ); // send a cancel sig to the thread in case it's still there //int err = pthread_cancel ( t->m_tid ); //if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s", // mstrerror(err) ); // one less entry occupied //m_entriesUsed--; // debug msg //log("m_entriesUsed now %"INT32"",m_entriesUsed); // one more returned //m_returned++; // clear the g_errno in case set by a previous callback //g_errno = 0; // launch as many threads as we can before calling the // callback since this may hog the CPU like Msg20 does //g_threads.launchThreads(); g_errno = 0; makeCallback ( t ); // int64_t took = gettimeofdayInMilliseconds()-startTime; // if(took > 8 && maxNiceness > 0) { // if(g_conf.m_sequentialProfiling) // log(LOG_TIMING, // "admin: Threads spent %"INT64" ms to callback " // "%"INT32" callbacks, nice: %"INT32"", // took, numCallbacks, maxNiceness); // g_threads.m_needBottom = true; // maxNiceness = 0; // } // clear errno again g_errno = 0; if ( g_conf.m_logDebugThread ) { int64_t now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done2. " "active=%"INT32" " "time since queued = %"UINT64" ms " "time since launch = %"UINT64" ms " "time since pre-exit = %"UINT64" ms " "time since exit = %"UINT64" ms", (PTRTYPE)t, getThreadType() , (int32_t)(m_launched - m_returned) , (uint64_t)(now - t->m_queuedTime), (uint64_t)(now - t->m_launchedTime), (uint64_t)(now - t->m_preExitTime) , (uint64_t)(now - t->m_exitTime) ); } // calling thread callback //log("calling thread id %"INT32" callback", (int32_t)(t->m_tid)); // first call it's callback //callback ( state ); // clear after just in case //g_errno = 0; // debug msg //log("CLN2 TID=%"UINT32" t=%"INT32"",(int32_t)t->m_tid ,(int32_t)t); // return now if tt was specified //if ( tt ) return; } int64_t took2 = gettimeofdayInMilliseconds()-startTime; if(numCallbacks > 0 && took2 > 5) log(LOG_DEBUG, "threads: took %"INT64" ms to callback %"INT32" " "callbacks, nice: %"INT32"", took2, numCallbacks, maxNiceness); //since we need finer grained control in loop, we no longer collect //the callbacks, sort, then call them. we now call them right away //that way we can break out if we start taking too long and //give control back to udpserver. return numCallbacks != 0; /* // print out that we got them if ( g_conf.m_logDebugThread ) { int64_t now = gettimeofdayInMilliseconds(); for ( int32_t i = 0 ; i < numCallbacks ; i++ ) log(LOG_DEBUG,"thread: [tid=%"PTRFMT"] %s done3. " "active=%"INT32" " "time since queued = %"UINT64" ms " "time since launch = %"UINT64" ms " "time since pre-exit = %"UINT64" ms " "time since exit = %"UINT64" ms", (PTRTYPE)tids[i], getThreadType() , (int32_t)(m_launched - m_returned) , (uint64_t)(now - times [i]), (uint64_t)(now - times2[i]) , (uint64_t)(now - times3[i]) , (uint64_t)(now - times4[i]) ); } // . before calling callbacks, launch any other threads waiting in // this queue // . TODO: break into parts, cleanup, launch, call callbacks //while ( launchThread() ); // . sort callbacks by queued time // . do bubble sort cuz usually it's not too many threads we cleaned up bool flag ; void (* tmpCallback)(void *state,ThreadEntry *t); int64_t tmpTime; void *tmpState; int64_t now = gettimeofdayInMilliseconds(); bubble: flag = false; for ( int32_t i = 1 ; i < numCallbacks ; i++ ) { if ( times[i] >= times[i-1] ) continue; tmpTime = times [i ]; tmpState = states [i ]; tmpCallback = callbacks[i ]; times [i ] = times [i-1]; states [i ] = states [i-1]; tids [i ] = tids [i-1]; callbacks [i ] = callbacks[i-1]; times [i-1] = tmpTime; states [i-1] = tmpState; callbacks [i-1] = tmpCallback; flag = true; } if ( flag ) goto bubble; // call the callbacks now in order of oldest queued time first for ( int32_t i = 0 ; i < numCallbacks ; i++ ) { g_errno = 0; callbacks[i] ( states[i] , NULL ); } int64_t took = gettimeofdayInMilliseconds()-now; if(numCallbacks > 0 && took > 5) log(LOG_TIMING, "admin: took %"INT64" ms to callback %"INT32" " "callbacks, nice: %"INT32"", took, numCallbacks, maxNiceness); #if 0 //I think this is more efficient, for now we'll just use the old way //theres no reason to sort these, lets just find the top one //and call it. do it again it until we've called all of them. int32_t newNumCallbacks = numCallbacks; for ( int32_t i = 0 ; i < numCallbacks ; i++ ) { int64_t maxTime = 0; int32_t maxNdx = 0; for ( int32_t j = 0 ; j < newNumCallbacks ; j++ ) { if(maxTime >= times[i]) { maxTime = times[i]; maxNdx = i; } } g_errno = 0; callbacks[maxNdx] ( states[maxNdx] ); //copy last one into called slot and decrement newNumCallbacks--; times [maxNdx] = times [newNumCallbacks]; states [maxNdx] = states [newNumCallbacks]; callbacks [maxNdx] = callbacks[newNumCallbacks]; } #endif g_errno = 0; // close more threads if we were limited by callbacks[] size if ( numCallbacks >= 64 ) goto top; //return true if we called something back; //returns wrong value if numCallbacks was exactly 64, not too serious... return numCallbacks != 0; */ } // used by UdpServer to see if it should call a low priority callback bool Threads::hasHighPriorityCpuThreads() { ThreadQueue *q ; //int32_t hiActive = 0 ; ThreadEntry *t ; q = &g_threads.m_threadQueues[INTERSECT_THREAD]; t = q->m_launchedHead; for ( ; t ; t = t->m_nextLink ) if ( t->m_niceness == 0 ) return true; //hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[MERGE_THREAD]; t = q->m_launchedHead; for ( ; t ; t = t->m_nextLink ) if ( t->m_niceness == 0 ) return true; //hiActive += q->m_hiLaunched - q->m_hiReturned; return false; } // used by UdpServer to see if it should call a low priority callback int32_t Threads::getNumActiveHighPriorityThreads() { ThreadQueue *q ; ThreadEntry *t; int32_t hiActive = 0 ; q = &g_threads.m_threadQueues[DISK_THREAD]; t = q->m_launchedHead; for ( ; t ; t = t->m_nextLink ) if ( t->m_niceness == 0 ) hiActive++; //hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[INTERSECT_THREAD]; t = q->m_launchedHead; for ( ; t ; t = t->m_nextLink ) if ( t->m_niceness == 0 ) hiActive++; //hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[MERGE_THREAD]; t = q->m_launchedHead; for ( ; t ; t = t->m_nextLink ) if ( t->m_niceness == 0 ) hiActive++; //hiActive += q->m_hiLaunched - q->m_hiReturned; return hiActive; } // . returns false if no thread launched // . returns true if thread was launched // . sets g_errno on error // . don't launch a low priority thread if a high priority thread is running // . i.e. don't launch a high niceness thread if a low niceness is running bool ThreadQueue::launchThread2 ( ) { // or if no stacks left, don't even try if ( s_head == -1 ) return false; // . how many threads are active now? // . NOTE: not perfectly thread safe here int64_t active = m_launched - m_returned ; // debug msg if ( g_conf.m_logDebugThread && m_threadType == DISK_THREAD ) log(LOG_DEBUG,"thread: q=%s launchThread: active=%"INT64" " "max=%"INT32".",getThreadType(), active,m_maxLaunched); // return if the max is already launched for this thread queue if ( active >= m_maxLaunched ) return false; if ( m_threadType != DISK_THREAD ) { // if one thread of this type is already out, forget it // then we can't have 100 GENERIC THREADS!!! with this... //if ( m_launchedHead ) return false; // first try niceness 0 queue ThreadEntry **bestHeadPtr = &m_waitHead0; ThreadEntry **bestTailPtr = &m_waitTail0; // if empty, try niceness 1 if ( ! *bestHeadPtr ) { bestHeadPtr = &m_waitHead1; bestTailPtr = &m_waitTail1; } // then niceness 2 if ( ! *bestHeadPtr ) { bestHeadPtr = &m_waitHead2; bestTailPtr = &m_waitTail2; } // if bother empty, that was easy if ( ! *bestHeadPtr ) return false; // do not launch a low priority merge, intersect or filter // thread if we/ have high priority cpu threads already going // on. this way a low priority spider thread will not launch // if a high priority cpu-based thread of any kind (right now // just MERGE or INTERSECT) is already running. if ( (*bestHeadPtr)->m_niceness > 0 && g_threads.hasHighPriorityCpuThreads ( ) ) return false; // otherwise launch the next one // MERGE_THREAD, INTERSECT, FILTER, UNLINK, SAVETREE return launchThreadForReals ( bestHeadPtr , bestTailPtr ); } // disk thread is much more complicated // use several queues // debug msg //log("trying to launch for type=%"INT32"",(int32_t)m_threadType); // clean up any threads that have exited //cleanUp (); // if no entries then nothing to launch //if ( m_top <= 0 ) return false; // return log("MAX. %"INT32" are launched. %"INT32" now in queue.", // active , m_entriesUsed ); // . sanity check // . a thread can NOT call this //if ( getpid() != s_pid ) { // fprintf(stderr,"thread: launchThread: bad engineer\n"); // ::exit(-1); //} //int64_t now = gettimeofdayInMilliseconds(); /* int64_t now = -1LL; // pick thread with lowest niceness first int32_t minNiceness = 0x7fffffff; int64_t maxWait = -1; //int32_t mini = -1; //bool minIsWrite = false; int32_t lowest = 0x7fffffff; int32_t highest = 0; // . now base our active thread counts on niceness AND read sizes // . this is only used for DISK_THREADs // . loActive* includes niceness >= 1 int32_t loActiveBig = m_loLaunchedBig - m_loReturnedBig; int32_t loActiveMed = m_loLaunchedMed - m_loReturnedMed; int32_t loActiveSma = m_loLaunchedSma - m_loReturnedSma; int32_t mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig; int32_t mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed; int32_t mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma; int32_t hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig; int32_t hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed; int32_t hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma; int32_t activeWrites = m_writesLaunched - m_writesReturned; // how many niceness=2 threads are currently running now? int64_t loActive = m_loLaunched - m_loReturned; int64_t mdActive = m_mdLaunched - m_mdReturned; //int64_t hiActive = m_hiLaunched - m_hiReturned; int32_t total = loActive + mdActive; */ // hi priority max // JAB: warning abatement //int64_t hiActive = m_hiLaunched - m_hiReturned; // get lowest niceness level of launched threads ThreadEntry *t = m_launchedHead; bool hasLowNicenessOut = false; int32_t activeCount = 0; int32_t spiderCount = 0; // int32_t launchedBig0 = 0 , launchedMed0 = 0 , launchedSma0 = 0; // int32_t launchedBig1 = 0 , launchedMed1 = 0 , launchedSma1 = 0; for ( ; t ; t = t->m_nextLink ) { // if he's done, skip him. maybe he hasn't been cleaned up yet if ( t->m_isDone ) continue; // count active out activeCount++; // get the highest niceness for all that are launched //if ( niceness > highest ) highest = niceness; //int32_t rs = t->m_bytesToGo; // is the launched thread niceness 0? (i.e. high priority) if ( t->m_niceness == 0 ) { // set a flag hasLowNicenessOut = true; // count read sizes // if (rs > g_conf.m_medReadSize ) launchedBig0++; // else if (rs > g_conf.m_smaReadSize ) launchedMed0++; // else launchedSma0++; } else { spiderCount++; } // if ( rs > g_conf.m_medReadSize ) launchedBig1++; // else if ( rs > g_conf.m_smaReadSize ) launchedMed1++; // else launchedSma1++; } // int32_t max = g_conf.m_spiderMaxDiskThreads; // if ( max <= 0 ) max = 1; // if ( activeCount >= max ) // return false; // get best thread candidate from best linked list of candidates ThreadEntry **bestHeadPtr = NULL; ThreadEntry **bestTailPtr = NULL; // short/med/long high priority (niceness 0) disk reads in head0/1/2 // but we can't launch one more if already at our quota. if ( ! bestHeadPtr && m_waitHead0 ) { //launchedSma0 < g_conf.m_queryMaxSmaDiskThreads ) { bestHeadPtr = &m_waitHead0; bestTailPtr = &m_waitTail0; } // if ( ! bestHeadPtr && // m_waitHead1 && // launchedMed0 < g_conf.m_queryMaxMedDiskThreads ) { // bestHeadPtr = &m_waitHead1; // bestTailPtr = &m_waitTail1; // } // if ( ! bestHeadPtr && // m_waitHead2 && // launchedBig0 < g_conf.m_queryMaxBigDiskThreads ) { // bestHeadPtr = &m_waitHead2; // bestTailPtr = &m_waitTail2; // } // if we have a niceness 0 disk read/write outstandind and we are // 1 or 2, do not launch! we do not want low priority disk reads // having to contend with high priority ones. // now we only do this if the 'separate disk reads' parms is true. if ( g_conf.m_separateDiskReads && hasLowNicenessOut && ! bestHeadPtr ) return false; // threads to save conf and tree/bucket files to disk go in waitHead3 // no these use SAVETREE Thread type // if ( ! bestHead ) { // bestHead = m_waitHead3; // bestTail = m_waitTail3; // } // do not allow too high niceness read threads out if ( spiderCount >= g_conf.m_spiderMaxDiskThreads ) return false; // low priority (merge or dump) disk WRITES go in waithead4 if ( ! bestHeadPtr && m_waitHead4 ) { bestHeadPtr = &m_waitHead4; bestTailPtr = &m_waitTail4; } // niceness 1. for merge reads so they superscede regular spider reads // niceness 1 read threads: if ( ! bestHeadPtr && m_waitHead5 ) { bestHeadPtr = &m_waitHead5; bestTailPtr = &m_waitTail5; } // niceness 2 read threads: if ( ! bestHeadPtr && m_waitHead6 ) { bestHeadPtr = &m_waitHead6; bestTailPtr = &m_waitTail6; } // if nobody waiting, return false if ( ! bestHeadPtr ) return false; // i dunno what the point of this was... so i commented it out //int32_t max2 = g_conf.m_queryMaxDiskThreads ; //if ( max2 <= 0 ) max2 = 1; // only do this check if we're a addlists/instersect thread queue //if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false; // point to entry in the best linked list to launch from return launchThreadForReals ( bestHeadPtr , bestTailPtr ); /* // loop through candidates for ( int32_t i = 0 ; i < m_top ; i++ ) { // skip if not occupied if ( ! m_entries[i].m_isOccupied ) continue; int32_t niceness = m_entries[i].m_niceness; // get lowest niceness level of launched threads if ( m_entries[i].m_isLaunched ) { // if he's done, skip him if ( m_entries[i].m_isDone ) continue; // get the highest niceness for all that are launched if ( niceness > highest ) highest = niceness; // get the lowest niceness for all that are launched if ( niceness < lowest ) lowest = niceness; // continue now since it's already launched continue; } // . these filters really make it so the spider does not // impact query response time //if ( niceness >= 1 && hiActive > 0 ) continue; // don't consider any lows if one already running //if ( niceness >= 2 && loActive > 0 ) continue; // don't consider any lows if a hi already running //if ( niceness >= 2 && hiActive > 0 ) continue; // don't consider any mediums if one already running //if ( niceness == 1 && mdActive > 0 ) continue; // don't consider any mediums if a hi already running //if ( niceness == 1 && hiActive > 0 ) continue; //if ( m_threadType == DISK_THREAD ) { // if ( niceness >= 1 && hiActive > 0 ) continue; // if ( niceness >= 2 && loActive >= max ) continue; // if ( niceness == 1 && mdActive >= max ) continue; //} // treat niceness 1 as niceness 2 for ranking purposes // IFF we're not too backlogged with file merges. i.e. // IFF we're merging faster than we're dumping. // only merges and dumps have niceness 1 really. // spider disk reads are all niceness 2. // Now Rdb::addList just refuses to add data if we have too // many unmerged files on disk! // now we use niceness 1 for "real merges" so those reads take // priority over spider build reads. (8/14/12) //if(niceness == 1 ) niceness = 2; // if he doesn't beat or tie us, skip him if ( niceness > minNiceness ) continue; // no more than "max" medium and low priority threads should // be active/launched at any one time if ( niceness >= 1 && total >= max ) continue; // int16_tcut ThreadEntry *t = &m_entries[i]; // what is this guy's read size? // the filestate provided could have been //FileState *fs ; int32_t readSize = 0 ; bool isWrite = false; if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) { //fs = (FileState *)m_entries[i].m_state; readSize = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( isWrite && activeWrites > g_conf.m_maxWriteThreads ) continue; if ( m_threadType == MERGE_THREAD || m_threadType == INTERSECT_THREAD || m_threadType == FILTER_THREAD ) if ( niceness > 0 && hiActive2 > 0 ) continue; // how many threads can be launched for this readSize/niceness? if ( niceness >= 1 && m_threadType == DISK_THREAD ) { if ( readSize > g_conf.m_medReadSize ) { if ( loActiveBig + mdActiveBig >= g_conf.m_spiderMaxBigDiskThreads ) continue; } else if ( readSize > g_conf.m_smaReadSize ) { if ( loActiveMed + mdActiveMed >= g_conf.m_spiderMaxMedDiskThreads ) continue; } else if ( loActiveSma + mdActiveSma >= g_conf.m_spiderMaxSmaDiskThreads ) continue; } else if ( niceness < 1 && m_threadType == DISK_THREAD ) { if ( readSize > g_conf.m_medReadSize ) { if ( hiActiveBig >= g_conf.m_queryMaxBigDiskThreads ) continue; } else if ( readSize > g_conf.m_smaReadSize ) { if ( hiActiveMed >= g_conf.m_queryMaxMedDiskThreads ) continue; } else if ( hiActiveSma >= g_conf.m_queryMaxSmaDiskThreads ) continue; } // be lazy with this since it uses a significant amount of cpu if ( now == -1LL ) now = gettimeofdayInMilliseconds(); // how long has this entry been waiting in the queue to launch? int64_t waited = now - m_entries[i].m_queuedTime ; // adjust "waited" if it originally had a niceness of 1 if ( m_entries[i].m_niceness >= 1 ) { // save threads gain precedence if ( m_threadType == SAVETREE_THREAD ) waited += 49999999; else if ( m_threadType == UNLINK_THREAD ) waited += 39999999; else if ( m_threadType == MERGE_THREAD ) waited += 29999999; else if ( m_threadType == INTERSECT_THREAD ) waited += 29999999; else if ( m_threadType == FILTER_THREAD ) waited += 9999999; // if its a write thread... do it quick else if ( isWrite ) waited += 19999999; // . if it has waited more than 500 ms it needs // to launch now... // . these values seem to be VERY well tuned on // my production machines, but they may have to // be tuned for other machines? TODO. // . they should auto-tune so when merge is more // important it should starve the other spider reads else if ( waited >= 500 ) waited += 9999999; // it hurts for these guys to wait that int32_t else waited *= 4; } // is real merge? if ( m_entries[i].m_niceness == 1 ) waited += 19999999; // watch out for clock skew if ( waited < 0 ) waited = 0; // if tied, the lowest time wins if ( niceness == minNiceness && waited <= maxWait ) continue; // we got a new winner mini = i; minNiceness = niceness; maxWait = waited; minIsWrite = isWrite; } // if no candidate, bail honorably if ( mini == -1 ) return false; // . if we've got an urgent thread (niceness=1) going on, do not allow // niceness=2 threads to launch // . actually, don't launch *ANY* if doing an urgent merge even if // no niceness=1 thread currently launched // . CAUTION! when titledb is merging it dumps tfndb and if tfndb // goes urgent,make sure it dumps out with niceness 1, otherwise // this will freeze us!! since titledb won't be able to continue // merging until tfndb finishes dumping... // . Now Rdb::addList just refuses to add data if we have too // many unmerged files on disk! Let others still read from us, // just not add to us... //if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 ) // return false; // if we're urgent, don't launch a niceness 2 unless he's waited // at least 200ms in the queue //if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 ) // return false; // . if the thread to launch has niceness > lowest launched then bail // . i.e. don't launch a low-priority thread if we have highs running // . we no longer let a niceness of 1 prevent a niceness of 2 from // launching, this way we can launch merge threads at a niceness // of 1 w/o hurting the spidering too much, but still giving the // merge some preferential treatment over the disk so we don't // RdbDump data faster than we can finish the merge. if ( minNiceness > lowest && lowest < 1 ) return false; // . don't launch a low priority thread if there's a high launched // from any ThreadQueue // . hi priority is niceness <= 0, med/lo priority is niceness >= 1 //int64_t hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned; // now just limit it to this queue... so you can launch a low // merge thread even if there's a high disk read going on //if ( minNiceness >= 1 && hiActive > 0 ) return false; // if we're low priority and suspended is true then bail //if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false; // if we're going to launch a high priority thread then CANCEL // any low priority disk-read threads already in progress //if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads (); // . let's cancel ALL low priority threads if we're launching a high // . hi priority is niceness <= 0, low priority is niceness >= 1 //int64_t loActive = g_threads.m_loLaunched - g_threads.m_loReturned; int32_t realNiceness = m_entries[mini].m_niceness; //if ( realNiceness <= 0 && (loActive + mdActive) > 0 ) // // this actually cancels medium priority threads, too // g_threads.cancelLowPriorityThreads(); // point to winning entry ThreadEntry *t = &m_entries[mini]; */ } bool ThreadQueue::launchThreadForReals ( ThreadEntry **headPtr , ThreadEntry **tailPtr ) { ThreadEntry *t = *headPtr; // if descriptor was closed, just return error now, we // cannot try to re-open because the file might have been // unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid // FileState ptr because it does its own saving, so check for NULLs. FileState *fs = (FileState *)t->m_state; bool allocated = false; if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) { // allocate the read buffer here! if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) { int32_t need = t->m_bytesToGo + fs->m_allocOff; char *p = (char *) mmalloc ( need , "ThreadReadBuf" ); if ( p ) { fs->m_buf = p + fs->m_allocOff; fs->m_allocBuf = p; fs->m_allocSize = need; allocated = true; } else log("thread: read buf alloc failed " "for %"INT32" bytes.",need); // just let the BigFile::readWrite_r() handle the // error for the NULL read buf } // . otherwise, they are intact, so get the real fds // . we know the stored File is still around because of that bool doWrite = fs->m_doWrite; BigFile *bb = fs->m_this; fs->m_fd1 = bb->getfd (fs->m_filenum1,!doWrite);//&fs->m_vfd1); fs->m_fd2 = bb->getfd (fs->m_filenum2,!doWrite);//&fs->m_vfd2); // is this bad? if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s", fs->m_fd1,bb->getFilename()); if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.", fs->m_fd2,bb->getFilename()); fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 ); fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 ); } // count it as launched now, before we actually launch it m_launched++; /* // priority-based GLOBAL & LOCAL launch count if ( realNiceness <= 0 ) m_hiLaunched++; else if ( realNiceness == 1 ) m_mdLaunched++; else if ( realNiceness >= 2 ) m_loLaunched++; // deal with the tiers for disk threads based on read sizes if ( m_threadType == DISK_THREAD ) { // writes are special cases if ( t->m_doWrite ) m_writesLaunched++; //FileState *fs = (FileState *)m_entries[mini].m_state; int32_t rs = t->m_bytesToGo; // 0; //if ( fs ) rs = fs->m_bytesToGo; if ( realNiceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) m_loLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_loLaunchedMed++; else m_loLaunchedSma++; } else if ( realNiceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) m_mdLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_mdLaunchedMed++; else m_mdLaunchedSma++; } else { if ( rs > g_conf.m_medReadSize ) m_hiLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_hiLaunchedMed++; else m_hiLaunchedSma++; } } */ // debug msg //if ( m_threadType == 0 ) // log("creating thread, t=%"UINT32" state=%"UINT32" //launched = %"INT32"", // t , (int32_t)t->m_state , m_launched ); // and set the flag t->m_isLaunched = true; // . launch it // . this sets the pthread_t ptr for identificatoin // . returns false on error //pthread_t tmp; loop: // debug msg if ( g_conf.m_logDebugThread ) { int32_t active = m_launched - m_returned ; int64_t now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] launched %s thread. " "active=%"INT32" " "niceness=%"INT32". waited %"UINT64" ms in queue.", (PTRTYPE)t, getThreadType(), active, t->m_niceness , now - t->m_queuedTime); } // be lazy with this since it uses a significant amount of cpu //if ( now == -1LL ) now = gettimeofdayInMilliseconds(); int64_t now = gettimeofdayInMilliseconds(); //t->m_launchedTime = g_now; t->m_launchedTime = now; // loop2: // spawn the thread int32_t count = 0; pid_t pid; #ifndef PTHREADS //int status; //int ret; // random failure test //if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; } // malloc twice the size t->m_stackSize = STACK_SIZE; //t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" ); int32_t si = g_threads.getStack ( ); if ( si < 0 ) { log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer."); goto hadError; } t->m_si = si; t->m_stack = s_stackPtrs [ si ]; // UNprotect the whole stack so we can use it mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE , PROT_READ | PROT_WRITE ); // clear g_errno g_errno = 0; // . make another process // . do not use sig handlers, so if a child process gets any unhandled // signal (like SEGV) it will just exit pid = clone ( startUp , t->m_stack + t->m_stackSize , CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND | SIGCHLD , (void *)t ); // . we set the pid because we are the one that checks it! // . if we just let him do it, when we check in cleanup routine // we can get an uninitialized pid t->m_pid = pid; // might as well bitch if we should here if ( s_bad ) { log(LOG_LOGIC,"thread: PID received: %"INT32" > %"INT32". Bad.", s_badPid, (int32_t)MAX_PID); //char *xx = NULL; *xx = 0; } // wait for him //ret = waitpid ( -1*pid , &status , 0 ); //if ( ret != pid ) // log("waitpid(pid=%"INT32"): ret=%"INT32" err=%s", // (int32_t)pid,(int32_t)ret,mstrerror(errno)); // check if he's done //if ( ! t->m_isDone ) log("NOT DONE"); // set the pid //t->m_pid = pid; // error? if ( pid == (pid_t)-1 ) g_errno = errno; // // now use pthreads again... are they stable yet? // #else // assume it does not go through t->m_needsJoin = false; // pthread inherits our sigmask, so don't let it handle sigalrm // signals in Loop.cpp, it'll screw things up. that handler // is only meant to be called by the main process. if we end up // double calling it, this thread may think g_callback is non-null // then it gets set to NULL, then the thread cores! seen it... // sigset_t sigs; // sigemptyset ( &sigs ); // sigaddset ( &sigs , SIGALRM ); // sigaddset ( &sigs , SIGVTALRM ); // if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 ) // log("threads: failed to block sig"); // supply our own stack to make pthread_create() fast otherwise // it has slowness issues with mmap() // http://www.gossamer-threads.com/lists/linux/kernel/960227 t->m_stackSize = STACK_SIZE; //t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" ); int32_t si = g_threads.getStack ( ); if ( si < 0 ) { log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer."); goto hadError; } t->m_si = si; t->m_stack = s_stackPtrs [ si ]; // check it's aligned if ( (uint64_t)(t->m_stack) & (THRPAGESIZE-1) ) { char *xx=NULL;*xx=0; } // UNprotect the whole stack so we can use it mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE , PROT_READ | PROT_WRITE ); pthread_attr_t attr; pthread_attr_init ( &attr ); pthread_attr_setstack ( &attr , t->m_stack , t->m_stackSize ); if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "remove from wait list, add to launch list", (PTRTYPE)t); // remove from waiting linked list, whichever one it was in removeLink2 ( headPtr , tailPtr , t ); // add to 'launched' linked list addLink ( &m_launchedHead , t ); // save the waiting linked list we came from in case we get cancelled // and we have to put it back into it t->m_bestHeadPtr = headPtr; t->m_bestTailPtr = tailPtr; // debug if ( g_conf.m_logDebugThread ) log("threads: pthread_create: " "stack=%"PTRFMT" stacksize=%"INT64"" , (PTRTYPE)t->m_stack , (int64_t)t->m_stackSize ); // this returns 0 on success, or the errno otherwise g_errno = pthread_create ( &t->m_joinTid , &attr, startUp2 , t) ; // if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 ) // log("threads: failed to unblock sig"); #endif // we're back from pthread_create if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: Back from clone " "t=0x%"PTRFMT" pid=%"INT32".", (PTRTYPE)t,(int32_t)pid); // return true on successful creation of the thread if ( g_errno == 0 ) { // good stuff, the thread needs a join now t->m_needsJoin = true; if ( count > 0 ) log("thread: Call to clone looped %"INT32" times.", count); return true; } //#undef usleep // forever loop if ( g_errno == EAGAIN ) { if ( count++ == 0 ) log("thread: Call to clone had error: %s.", mstrerror(g_errno)); //usleep(1); //goto loop2; goto hadError; } // debug msg // log("created tid=%"INT32"",(int32_t)t->m_tid); // return true; //} // do again if g_errno is AGAIN if ( g_errno == EINTR ) { log("thread: Call to clone was interrupted. Trying again."); goto loop; } //#ifndef _PTHREADS_ hadError: //#endif if ( g_errno ) log("thread: pthread_create had error = %s", mstrerror(g_errno)); // it didn't launch, did it? dec the count. m_launched--; // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); // RETURN THE STACK g_threads.returnStack ( t->m_si ); t->m_stack = NULL; /* // priority-based LOCAL & GLOBAL launch counts if ( realNiceness <= 0 ) m_hiLaunched--; else if ( realNiceness == 1 ) m_mdLaunched--; else if ( realNiceness >= 2 ) m_loLaunched--; // . deal with the tiers for disk threads based on read sizes // . WARNING: we cannot easily change tiers dynamically // because it will throw these counts off if ( m_threadType == DISK_THREAD ) { // writes are special cases if ( t->m_doWrite ) m_writesLaunched--; //FileState *fs = (FileState *)m_entries[mini].m_state; int32_t rs = t->m_bytesToGo; // 0; //if ( fs ) rs = fs->m_bytesToGo; if ( realNiceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) m_loLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_loLaunchedMed--; else m_loLaunchedSma--; } else if ( realNiceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) m_mdLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_mdLaunchedMed--; else m_mdLaunchedSma--; } else { if ( rs > g_conf.m_medReadSize ) m_hiLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_hiLaunchedMed--; else m_hiLaunchedSma--; } } */ // unset the flag t->m_isLaunched = false; // bail on other errors log("thread: Call to clone had error: %s.", mstrerror(g_errno)); // correction on this error log("thread: Try not using so much memory. " "memused now =%"INT64".",g_mem.getUsedMem()); // free allocated buffer if ( allocated ) { mfree ( fs->m_allocBuf , fs->m_allocSize , "ThreadReadBuf" ); fs->m_buf = NULL; } if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "remove from launched list, RE-add to wait list", (PTRTYPE)t); // remove from launched linked list removeLink ( &m_launchedHead , t ); // back into the queue waiting to launch addLinkToHead ( headPtr , tailPtr , t ); // i'm not sure return value matters at this point? the thread // is queued and hopefully will launch at some point return false; /* // if this is the direct thread request do not call callback, just // return false, otherwise we get into an unexpected loop thingy if ( t == te ) return log("thread: Returning false."); // do it blocking log("thread: Calling without thread. This will crash many times. " "Please fix it."); // return false so caller will re-do without thread! // so BigFile::readwrite() will retry without thread and we won't // get into a weird loop thingy if ( te ) return false; // uint64_t profilerStart,profilerEnd; // uint64_t statStart,statEnd; //if (g_conf.m_profilingEnabled){ // address=(int32_t)t->m_startRoutine; // g_profiler.startTimer(address, __PRETTY_FUNCTION__); //} t->m_startRoutine ( t->m_state , t ); //if (g_conf.m_profilingEnabled) { // if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__)) // log(LOG_WARN,"admin: Couldn't add the fn %"INT32"", // (int32_t)address); //} t->m_exitTime = gettimeofdayInMilliseconds(); // flag it for cleanup t->m_isDone = true; t->m_isLaunched = true; // clean it up cleanUp ( t , 200);//maxNiceness thread can have to be cleaned up // ignore error g_errno = 0; // we kinda launched one, so say true here return true; // false; */ } #ifndef PTHREADS static bool s_firstTime = true; #endif // threads start up with cacnellation deferred until pthreads_testcancel() // is called, but we never call that int startUp ( void *state ) { // get thread entry ThreadEntry *t = (ThreadEntry *)state; // no! now parent does since he is the one that needs to check it // in the cleanup routine // remember the pid //t->m_pid = getpid(); // . sanity check // . a thread can NOT call this #ifndef PTHREADS if ( getpid() == s_pid ) log("thread: Thread has same pid %i as main process.",s_pid); #endif // the cleanup handler //pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state ); // our signal set sigset_t set; sigemptyset(&set); //sigaddset(&set, SIGHUP); // we need this here so if we break the gb process with gdb it // does not kill the child processes when it sends out the SIGINT. sigaddset(&set, SIGINT); // ignore the real time signal, man... //sigaddset(&set, GB_SIGRTMIN); //pthread_sigmask(SIG_BLOCK, &set, NULL); #ifndef PTHREADS sigprocmask(SIG_BLOCK, &set, NULL); #else // turn these off in the thread sigaddset ( &set , SIGALRM ); sigaddset ( &set , SIGVTALRM ); pthread_sigmask(SIG_BLOCK,&set,NULL); #endif // . what this lwp's priority be? // . can range from -20 to +20 // . the lower p, the more cpu time it gets // . this is really the niceness, not the priority int p ; // currently our niceness ranges from -1 to 2 for us if ( t->m_niceness == 2 ) p = 19 ; else if ( t->m_niceness == 1 ) p = 10 ; else p = 0 ; // remember the tid //t->m_tid = pthread_self(); // debug if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "in startup pid=%"INT64" pppid=%"INT32"", (PTRTYPE)t,(int64_t)getpidtid(),(int32_t)getppid()); // debug msg //fprintf(stderr,"new thread tid=%"INT32" pid=%"INT32"\n", // (int32_t)t->m_tid,(int32_t)t->m_pid); // . set this process's priority // . setpriority() is only used for SCHED_OTHER threads //if ( pthread_setschedprio ( getpidtid() , p ) ) { #ifndef PTHREADS if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) { // do we even support logging from a thread? if ( s_firstTime ) { log("thread: Call to " "setpriority(%"UINT32",%i) in thread " "failed: %s. This " "message will not be repeated.", (uint32_t)getpidtid(),p,mstrerror(errno)); s_firstTime = false; } errno = 0; } #endif /* sched_param sp; int pp; int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ; if ( err ) log("thread: startUp: pthread_getschedparam: %s", mstrerror(err)); // adjust the priority p = 1; sp.sched_priority = p; // and reassign err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ; log("thread: startUp: pthread_setschedparam(%"INT32"): %s", p,mstrerror(err)); */ // somehow, it works ok when we have this print statement delay!!! //fprintf(stderr,"thread pid = %"INT32"\n",(int32_t)getpid()); // . call the startRoutine // . IMPORTANT: this can NEVER do non-blocking stuff t->m_startRoutine ( t->m_state , t ); // pop it off //pthread_cleanup_pop ( 1 /*execute handler?*/ ); // . now throw it on g_loop's sigqueue // . the first 4 bytes of t->m_state should be t->m_callback // . no! just use 1 to tell Loop to call g_threads.cleanUp() // . TODO: pass in a ptr to cleanUpWrapper() instead of "t" // . sival_int is only 4 bytes on a 64 bit arch... sigval_t svt; svt.sival_int = 1;//(int64_t)t ; //(int)(t->m_state); // fd; // set exit time int64_t now = gettimeofdayInMilliseconds(); t->m_preExitTime = now; t->m_exitTime = now; if ( g_conf.m_logDebugThread ) { log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "done with startup pid=%"INT64"", (PTRTYPE)t,(int64_t)getpidtid()); } // . now mark thread as ready for removal // . do this BEFORE queueing the signal since we're still a thread!!! // . cleanUp() will take care of the rest // . cleanUp() will call pthread_join on us! t->m_isDone = true; // let Loop.cpp's sigHandler_r call g_thread.cleanUp() g_threads.m_needsCleanup = true; //if(t->m_niceness > 0) g_threads.m_needBottom = true; // . send the signal // . if queue is full g_loop will get a SIGIO and call // g_threads.cleanUp()/launchThreads() in it's doPoll() routine // . we reserve GB_SIGRTMIN itself for unblocked interrupts for // UdpServer // . damn, it seems that if the queue is full pthread_join is unable // to detach threads.... so sleep until it clears up // . HEY! process is supposed to send us an ECHLD signal? right? //sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ; // . it does not send us a signal automatically, so we must do it! // . i noticed during the linkdb rebuild we were not getting the signal //sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ; // i verified this breaks select() in Loop.cpp out of it's sleep //fprintf(stderr,"threads sending SIGCHLD\n"); // try a sigchld now! doesn't it already do this? no... // on 64bit arch pthread_t is 64bit and pid_t is 32bit // i dont think this makes sense with pthreads any more, they don't // use pid_t they use pthread_t #ifndef PTHREADS sigqueue ( (pid_t)(int64_t)s_pid, SIGCHLD, svt ) ; #else pthread_kill ( s_pid , SIGCHLD ); #endif return 0; } // pthread_create uses this one void *startUp2 ( void *state ) { startUp ( state ); return NULL; } // watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we // could be in a signal handler here void ThreadQueue::suspendLowPriorityThreads() { // disable for now //return; // just return if already suspended if ( m_isLowPrioritySuspended ) return; // log it //log("ThreadQueue:: suspending low priority threads!!!!!!"); // set the flag so low priority threads won't be launched m_isLowPrioritySuspended = true; // . cancel any outstanding low priority threads that are running // . no, only cancel if we run another disk thread // . this will happen above in ThreadQueue::launchThread() //cancelLowPriorityThreads(); } // this is called by UdpServer::destroySlot() and should NOT be in a sig handlr void ThreadQueue::resumeLowPriorityThreads() { // disable for now //return; // bail if no need if ( ! m_isLowPrioritySuspended ) return; // turn em back on m_isLowPrioritySuspended = false; // just in case, though //if ( g_inSigHandler ) { // log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig " // "handler."); // return; //} // try to start up some threads then g_threads.launchThreads(); } void ThreadQueue::print ( ) { // loop through candidates for ( int32_t i = 0 ; i < m_top ; i++ ) { ThreadEntry *t = &m_entries[i]; // print it log(LOG_INIT,"thread: address=%"PTRFMT" " "pid=%u state=%"PTRFMT" " "occ=%i done=%i lnch=%i", (PTRTYPE)t , t->m_pid , (PTRTYPE)t->m_state , t->m_isOccupied , t->m_isDone , t->m_isLaunched ); } } const char *ThreadQueue::getThreadType ( ) { const char *s = "unknown"; if ( m_threadType == DISK_THREAD ) s = "disk"; if ( m_threadType == MERGE_THREAD ) s = "merge"; if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists"; if ( m_threadType == FILTER_THREAD ) s = "filter"; if ( m_threadType == SAVETREE_THREAD ) s = "savetree"; if ( m_threadType == UNLINK_THREAD ) s = "unlink"; if ( m_threadType == GENERIC_THREAD ) s = "generic"; return s; } #include "BigFile.h" // FileState class /* MDW: this is unused int32_t Threads::getDiskThreadLoad ( int32_t maxNiceness , int32_t *totalToRead ) { ThreadQueue *q = &m_threadQueues[DISK_THREAD]; ThreadEntry *e = q->m_entries; int32_t top = q->m_top; *totalToRead = 0; int32_t n = 0; // we really can't suspend threads cuz they might have the // mutex lock so we just cancel the disk threads here then for ( int32_t i = 0 ; i < top ; i++ ) { // get entry ThreadEntry *t = &e[i]; // skip if not occupied if ( ! t->m_isOccupied ) continue; // skip if it's nicer than what we want if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue; // skip if already done if ( t->m_isDone ) continue; // cast state data FileState *fs = (FileState *) t->m_state; // sometimes NULL, like from Sync.cpp's call if ( ! fs ) continue; // only remove read operations, since write operations get // the fd up front if ( t->m_doWrite ) continue; // how many byte to do //int32_t todo = fs->m_bytesToGo - fs->m_bytesDone; int32_t todo = t->m_bytesToGo; // multiply by 2 if a write if ( t->m_doWrite ) todo *= 2; // add to total bytes to read *totalToRead += todo; // count the thread n++; } return n; } */ // when a BigFile is removed, much like we remove its pages from DiskPageCache // we also remove any unlaunched reads/writes on it from the thread queue. void ThreadQueue::removeThreads ( BigFile *bf ) { // did the BigFile get hosed? that means our BigFile was // unlinked or closed before we got a chance to launch the // thread. //int32_t maxi = -1; //ThreadEntry *head ; removeThreads2 ( &m_waitHead0 , &m_waitTail0 , bf ); removeThreads2 ( &m_waitHead1 , &m_waitTail1 , bf ); removeThreads2 ( &m_waitHead2 , &m_waitTail2 , bf ); removeThreads2 ( &m_waitHead3 , &m_waitTail3 , bf ); removeThreads2 ( &m_waitHead4 , &m_waitTail4 , bf ); removeThreads2 ( &m_waitHead5 , &m_waitTail5 , bf ); removeThreads2 ( &m_waitHead6 , &m_waitTail6 , bf ); } void ThreadQueue::removeThreads2 ( ThreadEntry **headPtr , ThreadEntry **tailPtr , BigFile *bf ) { int32_t saved = g_errno; ThreadEntry *t = *headPtr; ThreadEntry *nextLink = NULL; for ( ; t ; t = nextLink ) { // do it here in case we modify the linked list below nextLink = t->m_nextLink; // get the filestate FileState *fs = (FileState *)t->m_state; // skip if NULL if ( ! fs ) continue; // skip if not match if ( fs->m_this != (void *)bf ) continue; // . let it finish writing if it is a write thread // . otherwise, if we are exiting, we could free the // buffer being written and cause the thread to core... if ( fs->m_doWrite ) { log(LOG_INFO,"disk: Not removing write thread."); continue; } // . should we really? if we renamed the file to another, // we need to recompute the offsets to read, etc.. so we // should fail up to Msg5 with EFILECLOSED or something... // . i think we did a rename and it got the same fd, and since // we did not remove the launched or done threads after the // rename, we're not sure if they read from the newly renamed // file or not, and our read offset was for the old file... // . at least set the error flag for doneWrapper() fs->m_errno2 = EFILECLOSED; // log it logf(LOG_INFO,"disk: Removing/flagging operation in thread " "queue. fs=0x%"PTRFMT"", (PTRTYPE)fs); // skip if already done if ( t->m_isDone ) continue; // skip if launched if ( t->m_isLaunched ) continue; // note in the log it is launched log(LOG_INFO,"disk: Thread is launched."); // tell donewrapper what happened fs->m_errno = EFILECLOSED; g_errno = EFILECLOSED; // note it //log(LOG_INFO,"disk: Removing operation from thread queue."); // remove it from the thread queue t->m_isDone = true; t->m_isLaunched = false; t->m_isOccupied = false; // keep track //maxi = i; // remove from waiting linked list, whichever one it was in removeLink2 ( headPtr , tailPtr , t ); // add to 'empty' linked list addLink ( &m_emptyHead , t ); makeCallback ( t ); } // do we have to decrement top // if ( m_top == maxi + 1 ) // while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--; // this was causing us to lose a g_errno value when XmlDoc::~XmlDoc() // called BigFile::~BigFile() called removeThreads() //called removeThreads2() //g_errno = 0; g_errno = saved; } void Threads::printState() { int64_t now = gettimeofdayInMilliseconds(); for ( int32_t i = 0 ; i < m_numQueues; i++ ) { ThreadQueue *q = &m_threadQueues[i]; // int32_t loActive = q->m_loLaunched - q->m_loReturned; // int32_t mdActive = q->m_mdLaunched - q->m_mdReturned; // int32_t hiActive = q->m_hiLaunched - q->m_hiReturned; // int32_t total = loActive + mdActive + hiActive; //if( total == 0) continue; // log(LOG_TIMING, // "admin: Thread counts: type:%s " // "%"INT32":low %"INT32":med%"INT32":high %"INT32":total", // q->getThreadType(),loActive, mdActive, hiActive, total); for ( int32_t j = 0 ; j < q->m_top ; j++ ) { ThreadEntry *t = &q->m_entries[j]; if(!t->m_isOccupied) continue; if(t->m_isDone) { log(LOG_TIMING, "admin: Thread -done- " "nice: %"INT32" " "totalTime: %"INT64" (ms) " "queuedTime: %"INT64"(ms) " "runTime: %"INT64"(ms) " "cleanup: %"INT64"(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, t->m_launchedTime - t->m_queuedTime, t->m_exitTime - t->m_launchedTime, now - t->m_exitTime, g_profiler. getFnName((PTRTYPE)t->m_callback)); continue; } if(t->m_isLaunched) { log(LOG_TIMING, "admin: Thread -launched- " "nice: %"INT32" " "totalTime: %"INT64"(ms) " "queuedTime: %"INT64"(ms) " "runTime: %"INT64"(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, t->m_launchedTime - t->m_queuedTime, now - t->m_launchedTime, g_profiler. getFnName((PTRTYPE)t->m_callback)); continue; } log(LOG_TIMING, "admin: Thread -queued- " "nice: %"INT32" " "queueTime: %"INT64"(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, g_profiler.getFnName((PTRTYPE)t->m_callback)); } } } void ThreadQueue::killAllThreads ( ) { for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( ! e->m_isOccupied ) continue; if ( ! e->m_isLaunched ) continue; log("threads: killling thread id %i",(int)e->m_joinTid); pthread_kill ( e->m_joinTid , SIGKILL ); log("threads: joining with thread id %i",(int)e->m_joinTid); pthread_join ( e->m_joinTid , NULL ); } } void Threads::killAllThreads ( ) { log("threads: killing all threads"); for ( int32_t j = 0 ; j < m_numQueues ; j++ ) { ThreadQueue *tq = &m_threadQueues[j]; tq->killAllThreads(); } }