#include "gb-include.h" #include "BigFile.h" #include "Threads.h" #include "Errno.h" #include "Loop.h" #include #include #include // getuid()/pid_t/getpid() #include // waitpid() #include "Rdb.h" // g_mergeUrgent #include // clone() //#include "Msg16.h" // g_pid g_ticker #include "XmlDoc.h" // g_pid g_ticker #include "Profiler.h" #include "Stats.h" #include "Process.h" // try using pthreads again //#define PTHREADS // use these stubs so libplotter.a works #ifndef PTHREADS int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; } int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; } #else #include pthread_attr_t s_attr; #endif // main process id (or thread id if using pthreads) static pid_t s_pid = (pid_t) -1; pid_t getpidtid() { #ifdef PTHREADS // gettid() is a bit newer so not in our libc32... // so kinda fake it. return the "thread" id, not process id. // Threads::amThread() should still work based on thread ids because // the main process has a unique thread id as well. return (pid_t)pthread_self(); #else return getpid(); #endif } // BIG PROBLEM: // When doing pthread_join it doesn't always ensure a thread doesn't go // zombie. It seems like when SIGIOs are generated by sigqueue() because // of a full signal queue, threads start going zombie on me. // PROBLEM #1: with using the "state" to identify a thread in the queue. // Often a caller make s a disk access using a certain "state" var. // He gets control back when cleanUp() calls t->m_callback. And he launches // another read thread in there, which calls Threads::exit(state) before // the original thread entry's m_isOccupied is set to false. So // Threads::exit(state) mistakenly picks the thread entry from the former // thread picks it has the same state as its thread! // PROBLEM #2: technically, a thread in Threads::exit() can set its m_done // bit then send the signal. When the Loop sig handler sees the done bit is // set it decrements the global thread count even though the thread may // not have actually exited yet!! I spawned like 1,000 threads this way!!!!! // a global class extern'd in .h file Threads g_threads; // . call this before calling a ThreadEntry's m_startRoutine // . allows us to set the priority of the thread int startUp ( void *state ) ; void *startUp2 ( void *state ) ; // JAB: warning abatement //static void launchThreadsWrapper ( int fd , void *state ); static void killStalledFiltersWrapper ( int fd , void *state ); static void makeCallback ( ThreadEntry *t ) ; // is the caller a thread? bool Threads::amThread () { if ( s_pid == -1 ) return false; return ( getpidtid() != s_pid ); } #ifndef PTHREADS static long s_bad = 0; static long s_badPid = -1; #endif #define MAX_PID 32767 #ifndef PTHREADS static int s_errno ; static int s_errnos [ MAX_PID + 1 ]; // this was improvised from linuxthreads/errno.c //#define CURRENT_STACK_FRAME ({ char __csf; &__csf; }) // WARNING: you MUST compile with -DREENTRANT for this to work int *__errno_location (void) { long pid = (long) getpid(); //if ( pid == s_pid ) return &g_errno; if ( pid <= (long)MAX_PID ) return &s_errnos[pid]; s_bad++; s_badPid = pid; return &s_errno; } #endif // this also limit the maximum number of outstanding (live) threads #define MAX_STACKS 20 // stack must be page aligned for mprotect #define PAGESIZE 8192 // how much of stack to use as guard space #define GUARDSIZE (32*1024) // . crashed in saving with 800k, so try 1M // . must be multiple of PAGESIZE #define STACK_SIZE ((512+256) * 1024) // jeta was having some problems, but i don't think they were related to // this stack size of 512k, but i will try boosting to 800k anyway. //#define STACK_SIZE (512 * 1024) // 256k was not enough, try 512k //#define STACK_SIZE (256 * 1024) // at 128k (and even 200k) some threads do not return! why?? there's // obviously some stack overwriting going on! //#define STACK_SIZE (128 * 1024) static char *s_stackAlloc = NULL; static long s_stackAllocSize; #ifndef PTHREADS static char *s_stack = NULL; static long s_stackSize; static char *s_stackPtrs [ MAX_STACKS ]; #endif static long s_next [ MAX_STACKS ]; static long s_head ; // returns NULL if none left long Threads::getStack ( ) { if ( s_head == -1 ) return -1; long i = s_head; s_head = s_next [ s_head ]; return i; } void Threads::returnStack ( long si ) { if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; } s_next[si] = s_head; s_head = si; } void Threads::reset ( ) { if ( s_stackAlloc ) { mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE); mfree ( s_stackAlloc , s_stackAllocSize, "ThreadStack"); } s_stackAlloc = NULL; for ( long i = 0 ; i < MAX_THREAD_QUEUES ; i++ ) m_threadQueues[i].reset(); } bool Threads::init ( ) { m_needsCleanup = false; //m_needBottom = false; // sanity check if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; } // set s_pid to the main process id #ifdef PTHREADS s_pid = pthread_self(); log("threads: main process THREAD id = %lu",(long unsigned)s_pid); pthread_t tid = pthread_self(); sched_param param; int policy; // scheduling parameters of target thread pthread_getschedparam ( tid, &policy, ¶m); log("threads: min/max thread priority settings = %li/%li (policy=%li)", (long)sched_get_priority_min(policy), (long)sched_get_priority_max(policy), (long)policy); #else s_pid = getpid(); #endif #ifdef _STACK_GROWS_UP return log("thread: Stack growing up not supported."); #endif //g_conf.m_logDebugThread = true; // . damn, this only applies to fork() calls, i guess // . a quick a dirty way to restrict # of threads so we don't explode /* struct rlimit lim; lim.rlim_max = 100; if ( setrlimit(RLIMIT_NPROC,&lim) ) log("thread::init: setrlimit: %s", mstrerror(errno) ); else log("thread::init: set max number of processes to 100"); */ // allow threads until disabled m_disabled = false; // # of low priority threads launched and returned //m_hiLaunched = 0; //m_hiReturned = 0; //m_loLaunched = 0; //m_loReturned = 0; //long m_queryMaxBigDiskThreads ; // > 1M read //long m_queryMaxMedDiskThreads ; // 100k - 1M read //long m_queryMaxSmaDiskThreads ; // < 100k per read // categorize the disk read sizes by these here g_conf.m_bigReadSize = 0x7fffffff; g_conf.m_medReadSize = 1000000; g_conf.m_smaReadSize = 100000; // . register a sleep wrapper to launch threads every 30ms // . somtimes a bunch of threads mark themselves as done and the // cleanUp() handler sees them as all still launched so it doesn't // launch any new ones //if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper)) // return log("thread: Failed to initialize timer callback."); if ( ! g_loop.registerSleepCallback(1000,NULL, killStalledFiltersWrapper,0)) return log("thread: Failed to initialize timer callback2."); // debug //log("thread: main process has pid=%li",(long)s_pid); // . set priority of the main process to 0 // . setpriority() only applies to SCHED_OTHER threads // . priority of threads with niceness 0 will be 0 // . priority of threads with niceness 1 will be 10 // . priority of threads with niceness 2 will be 20 // . see 'man sched_setscheduler' for detail scheduling info // . no need to call getpid(), 0 for pid means the current process #ifndef PTHREADS if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 ) log("thread: Call to setpriority failed: %s.", mstrerror(errno)); #endif // make multiplier higher for raids, can do more seeks //long m = 1; //#ifdef _LARS_ //m = 3; //#endif // register the types of threads here instead of in main.cpp //if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/)) // try running blaster with 5 threads and you'll // . see like a double degrade in performance for some reason!! // . TODO: why? // . well, this should be controlled g_conf.m_maxSpiderDiskThreads // for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for // niceness 0 and below disk threads // . 100 maxThreads out at a time, 32000 can be queued if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000)) return log("thread: Failed to register thread type." ); // . these are used by Msg5 to merge what it reads from disk // . i raised it from 1 to 2 and got better response time from Msg10 // . i leave one open in case one is used for doing a big merge // with high niceness cuz it would hold up high priority ones! // . TODO: is there a better way? cancel it when UdpServer calls // Threads::suspendLowPriorityThreads() ? if ( ! g_threads.registerType ( MERGE_THREAD , 2/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // will raising this from 1 to 2 make it faster too? // i raised since global specs new servers have 2 (hyperthreaded?) cpus long max = g_conf.m_maxCpuThreads; if ( max < 1 ) max = 1; if ( ! g_threads.registerType ( INTERSECT_THREAD,max,200) ) return log("thread: Failed to register thread type." ); // filter thread spawned to call popen() to filter an http reply if ( ! g_threads.registerType ( FILTER_THREAD , 1/*maxThreads*/,300) ) return log("thread: Failed to register thread type." ); // RdbTree uses this to save itself if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // . File.cpp spawns a rename thread for doing renames and unlinks // . doing a tight merge on titldb can be ~250 unlinks if ( ! g_threads.registerType ( UNLINK_THREAD,1/*maxThreads*/,3000) ) return log("thread: Failed to register thread type." ); // generic multipurpose if ( ! g_threads.registerType (GENERIC_THREAD,100/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // for call SSL_accept() which blocks for 10ms even when socket // is non-blocking... //if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100)) // return log("thread: Failed to register thread type." ); #ifndef PTHREADS // sanity check if ( GUARDSIZE >= STACK_SIZE ) return log("thread: Stack guard size too big."); // not more than this outstanding long maxThreads = 0; for ( long i = 0 ; i < m_numQueues ; i++ ) maxThreads += m_threadQueues[i].m_maxLaunched; // limit to stack we got if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS; // allocate the stack space s_stackAllocSize = STACK_SIZE * maxThreads + PAGESIZE ; // clear stack to help check for overwrites s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" ); if ( ! s_stackAlloc ) return log("thread: Unable to allocate %li bytes for thread " "stacks.", s_stackAllocSize); log(LOG_INIT,"thread: Using %li bytes for %li thread stacks.", s_stackAllocSize,maxThreads); // align s_stack = (char *)(((int) s_stackAlloc + PAGESIZE-1) & ~(PAGESIZE-1)); // new size s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc); // protect the whole stack while not in use if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) ) log("thread: Call to mprotect failed: %s.",mstrerror(errno)); // test //s_stack[0] = 1; // init the linked list for ( long i = 0 ; i < MAX_STACKS ; i++ ) { if ( i == MAX_STACKS - 1 ) s_next[i] = -1; else s_next[i] = i + 1; s_stackPtrs[i] = s_stack + STACK_SIZE * i; } s_head = 0; // don't do real time stuff for now return true; #else // . keep stack size small, 128k // . if we get problems, we'll increase this to 256k // . seems like it grows dynamically from 4K to up to 2M as needed //if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) ) // return log("thread: init: pthread_attr_setstacksize: %s", // mstrerror(errno)); //pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED ); //pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM ); return true; #endif } // all types should be registered in main.cpp before any threads launch bool Threads::registerType ( char type , long maxThreads , long maxEntries ) { // return false and set g_errno if no more room if ( m_numQueues >= MAX_THREAD_QUEUES ) { g_errno = EBUFTOOSMALL; return log(LOG_LOGIC,"thread: registerType: Too many thread " "queues"); } // initialize the ThreadQueue class for this type if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries)) return false; // we got one more queue now m_numQueues++; return true; } long Threads::getNumThreadsOutOrQueued() { long n = 0; for ( long i = 0 ; i < m_numQueues ; i++ ) { // skip INTERSECT_THREAD, used to intersect termlists to // resolve a query. i've seen these get stuck in an infinite // loop sometimes. if ( i == INTERSECT_THREAD ) continue; // skip filter threads, those get stuck sometimes if ( i == FILTER_THREAD ) continue; // tally up all other threads n += m_threadQueues[i].getNumThreadsOutOrQueued(); } return n; } // . returns false (and may set errno) if failed to launch a thread // . returns true if thread added to queue successfully // . may be launched instantly or later depending on # of threads in the queue bool Threads::call ( char type , long niceness , void *state , void (* callback )(void *state,ThreadEntry *t) , void *(* startRoutine)(void *state,ThreadEntry *t) ) { // debug //return false; #ifdef _VALGRIND_ return false; #endif // don't spawn any if disabled if ( m_disabled ) return false; if ( ! g_conf.m_useThreads ) return false; // . sanity check // . a thread can NOT call this //if ( getpid() != s_pid ) { // fprintf(stderr,"thread: call: bad engineer\n"); // ::exit(-1); //} // don't launch for now //return false; // . sanity check // . ensure top 4 bytes of state is the callback //if ( *(long *)state != (long)callback ) { // g_errno = EBADENGINEER; // sleep(50000); // return log("thread: call: top 4 bytes of state != callback"); //} // debug msg //log("adding thread to queue, type=%li",(long)type); // find the type long i; for ( i = 0 ; i < m_numQueues ; i++ ) if ( m_threadQueues[i].m_threadType == type ) break; // bitch if type not added via registerType() call if ( i == m_numQueues ) { g_errno = EBADENGINEER; return log(LOG_LOGIC,"thread: addtoqueue: Unregistered " "thread type"); } // debug msg //log("thread: call: adding entry for thread"); // . add to this queue // . returns NULL and sets g_errno on error ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state, callback,startRoutine); if ( ! t ) return log("thread: Failed to add entry to thread pool: " "%s.",mstrerror(g_errno)); // debug msg //log("added"); // clear g_errno //g_errno = 0; // . try to launch as many threads as we can // . this sets g_errno on error // . if it has an error, just ignore it, our thread is queued m_threadQueues[i].launchThread ( t ) ; // return false if there was an error launching the thread //if ( g_errno ) return false; // clear g_errno g_errno = 0; // success return true; } // static void launchThreadsWrapper ( int fd , void *state ) { // // debug msg // //if ( g_conf.m_timingDebugEnabled ) // // log("thread: launchThreadsWrapper: entered"); // // clean up // g_threads.cleanUp(NULL,1000); // // and launch // g_threads.launchThreads(); // } static void killStalledFiltersWrapper ( int fd , void *state ) { // bail if no pid if ( g_pid == -1 ) return; // . only kill after ticker reaches a count of 30 // . we are called once every second, so inc it each time long timeout = g_filterTimeout; if ( timeout <= 0 ) timeout = 30; if ( g_ticker++ < timeout ) return; // debug log("threads: killing stalled filter process of age %li " "seconds and pid=%li.",g_ticker,(long)g_pid); // kill him int err = kill ( g_pid , 9 ); // don't kill again g_pid = -1; if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) ); } // . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it // is from when a thread exited // . we put that signal there using sigqeueue() in Threads::exit() // . this way another thread can be launched right away long Threads::launchThreads ( ) { // try launching from each queue long numLaunched = 0; for ( long i = m_numQueues - 1 ; i >= 0 ; i-- ) { // clear g_errno g_errno = 0; // launch as many threads as we can from queue #i while ( m_threadQueues[i].launchThread ( ) ) numLaunched++; // continue if no g_errno set if ( ! g_errno ) continue; // otherwise bitch about it log("thread: Failed to launch thread: %s.",mstrerror(g_errno)); } // clear g_errno g_errno = 0; return numLaunched; } // . will cancel currently running low priority threads // . will prevent any low priority threads from launching // . will only cancel disk threads for now void Threads::suspendLowPriorityThreads() { // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS."); // just cancel disk threads for now for ( long i = 0 ; i < m_numQueues; i++ ) m_threadQueues[i].suspendLowPriorityThreads(); } void Threads::resumeLowPriorityThreads() { // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS."); for ( long i = 0 ; i < m_numQueues; i++ ) m_threadQueues[i].resumeLowPriorityThreads(); } //void Threads::cancelLowPriorityThreads () { // for ( long i = 0 ; i < m_numQueues; i++ ) // m_threadQueues[i].cancelLowPriorityThreads(); //} /////////////////////////////////////////////////////////////////////// // functions for ThreadQueue /////////////////////////////////////////////////////////////////////// ThreadQueue::ThreadQueue ( ) { m_entries = NULL; m_entriesSize = 0; } void ThreadQueue::reset ( ) { if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" ); m_entries = NULL; m_top = 0; } bool ThreadQueue::init ( char threadType, long maxThreads, long maxEntries ) { m_threadType = threadType; m_launched = 0; m_returned = 0; m_maxLaunched = maxThreads; // # of low priority threads launched and returned m_hiLaunched = 0; m_hiReturned = 0; m_mdLaunched = 0; m_mdReturned = 0; m_loLaunched = 0; m_loReturned = 0; // we now count write threads so we can limit that m_writesLaunched = 0; m_writesReturned = 0; // these are for disk threads, which we now limit based on read sizes m_hiLaunchedBig = 0; m_hiReturnedBig = 0; m_mdLaunchedBig = 0; m_mdReturnedBig = 0; m_loLaunchedBig = 0; m_loReturnedBig = 0; m_hiLaunchedMed = 0; m_hiReturnedMed = 0; m_mdLaunchedMed = 0; m_mdReturnedMed = 0; m_loLaunchedMed = 0; m_loReturnedMed = 0; m_hiLaunchedSma = 0; m_hiReturnedSma = 0; m_mdLaunchedSma = 0; m_mdReturnedSma = 0; m_loLaunchedSma = 0; m_loReturnedSma = 0; //m_entriesUsed = 0; m_top = 0; m_isLowPrioritySuspended = false; // alloc space for entries m_maxEntries = maxEntries; m_entriesSize = sizeof(ThreadEntry)*m_maxEntries; m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" ); if ( ! m_entries ) return log("thread: Failed to allocate %li bytes " "for thread queue.",m_entriesSize); // debug msg //log("INIT CALLED. setting all m_isDone to 1."); // clear m_isOccupied et al for new guys //for ( long i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) { for ( long i = 0 ; i < m_maxEntries ; i++ ) { m_entries[i].m_isOccupied = false; m_entries[i].m_isLaunched = false; m_entries[i].m_isDone = true; m_entries[i].m_qnum = threadType; m_entries[i].m_stack = NULL; } return true; } long ThreadQueue::getNumThreadsOutOrQueued() { long n = m_launched - m_returned; for ( long i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( ! e->m_isOccupied ) continue; if ( e->m_isLaunched ) continue; if ( e->m_isDone ) continue; // do not count "reads", only count writes //if ( m_threadType == DISK_THREAD && e->m_state ) { // FileState *fs = (FileState *)e->m_state; // if ( fs->m_doWrite ) continue; //} } return n; } // return NULL and set g_errno on error ThreadEntry *ThreadQueue::addEntry ( long niceness , void *state , void (* callback )(void *state, ThreadEntry *t) , void *(* startRoutine)(void *state, ThreadEntry *t) ) { // if we are 90% full and niceness is > 0, knock it off long max = m_maxEntries; if ( m_threadType == DISK_THREAD && niceness > 0 ) { max = (m_maxEntries * 90) / 100; if ( max <= 0 ) max = 1; } // debug test //if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; } // get first available entry, not in use long i; //for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) for ( i = 0 ; i < max ; i++ ) if ( ! m_entries[i].m_isOccupied ) break; // caution //if ( i >= MAX_THREAD_ENTRIES ) { if ( i >= max ) { g_errno = ENOTHREADSLOTS; static time_t s_time = 0; time_t now = getTime(); if ( now - s_time > 5 ) { log("thread: Could not add thread to queue. Already " "have %li entries.",max); s_time = now; } return NULL; } // debug msg //fprintf(stderr,"addEntry my pid=%lu\n", (long)getpid() ); // get an available entry ThreadEntry *t = &m_entries [ i ]; // debug msg //log("claiming entry state=%lu, occupied=%li",(long)t->m_state, // (long)t->m_isOccupied); // stick it in t->m_niceness = niceness; t->m_state = state; t->m_callback = callback; t->m_startRoutine = startRoutine; t->m_isOccupied = true; t->m_isCancelled = false; t->m_stack = NULL; // debug msg //log("setting t=%lu m_isDone to 0", (long)t ); t->m_isDone = false; t->m_isLaunched = false; t->m_queuedTime = gettimeofdayInMilliseconds(); t->m_readyForBail = false; t->m_allocBuf = NULL; t->m_allocSize = 0; t->m_errno = 0; // and when the ohcrap callback gets called and the thread // is cleaned up it will check the FileState readsize and // m_isWrite to see which launch counts to decrement, so // since FileState will be corrupted, we need to store // this info directly into the thread entry. if ( m_threadType == DISK_THREAD && t->m_state ) { FileState *fs = (FileState *)t->m_state; t->m_bytesToGo = fs->m_bytesToGo; t->m_doWrite = fs->m_doWrite ; } else { t->m_bytesToGo = 0; t->m_doWrite = false; } // inc the used count //m_entriesUsed++; // debug msg //log("m_entriesUsed now %li",m_entriesUsed); // might have to inc top as well if ( i == m_top ) m_top++; // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%lx] queued %s thread for launch. " "niceness=%lu. ", (unsigned long)t, getThreadType(), niceness ); // success return t; } long Threads::timedCleanUp (long maxTime, long niceness) { if ( ! m_needsCleanup ) return 0; //if ( g_inSigHandler ) return 0; long long startTime = gettimeofdayInMillisecondsLocal(); long long took = 0; if ( niceness >= MAX_NICENESS ) m_needsCleanup = false; //for ( long i = -1 ; i <= niceness ; i++ ) { for ( long i = 0 ; i <= niceness ; i++ ) { for ( long j = 0 ; j < m_numQueues ; j++ ) m_threadQueues[j].timedCleanUp ( i ); launchThreads(); took = startTime - gettimeofdayInMillisecondsLocal(); if ( took <= maxTime ) continue; // ok, we have to cut if short... m_needsCleanup = true; break; } return took; } bool Threads::isHittingFile ( BigFile *bf ) { return m_threadQueues[DISK_THREAD].isHittingFile(bf); } bool ThreadQueue::isHittingFile ( BigFile *bf ) { // loop through candidates for ( long i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // must not be done if ( t->m_isDone ) continue; // must be launched.. really?? //if ( ! t->m_isLaunched ) continue; // must be a read if ( t->m_startRoutine != readwriteWrapper_r ) continue; // shortcut FileState *fs = (FileState *)t->m_state; // get bigfile ptr if ( fs->m_this == bf ) return true; } return false; } void Threads::bailOnReads ( ) { m_threadQueues[DISK_THREAD].bailOnReads(); } // Process.cpp calls these callbacks before their time in order to // set EDISKSTUCK void ThreadQueue::bailOnReads ( ) { // note it log("threads: bypassing read threads"); // loop through candidates for ( long i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet //if ( ! t->m_isLaunched ) continue; // must be niceness 0 if ( t->m_niceness != 0 ) continue; // must not be done if ( t->m_isDone ) continue; // must not have already called callback if ( t->m_callback == ohcrap ) continue; // must be a read if ( t->m_startRoutine != readwriteWrapper_r ) continue; // shortcut FileState *fs = (FileState *)t->m_state; // do not stop writes if ( fs->m_doWrite ) continue; // must be niceness 0 too! if ( fs->m_niceness != 0 ) continue; // what is this? unlaunched... //if ( t->m_pid == 0 ) continue; // can only bail on a thread after it copies its FileState // class into its stack so we can bypass it and free the // original FileState without causing a core. if thread // is not yet launched we have to call the callback here too // otherwise it never gets launched until the disk is unstuck! if ( ! t->m_readyForBail && t->m_isLaunched ) continue; // set error t->m_errno = EDISKSTUCK; // set this too g_errno = EDISKSTUCK; // do not allow caller to free the alloc'd buf in case // its read finally comes through! t->m_allocBuf = fs->m_allocBuf; t->m_allocSize = fs->m_allocSize; fs->m_allocBuf = NULL; fs->m_allocSize = 0; // call it t->m_callback ( t->m_state , t ); // do not re-call it... t->m_callback = ohcrap; // invalidate state (FileState usually) t->m_state = NULL; // do not launch if not yet launched if ( t->m_isLaunched ) continue; // delete him if not yet launched, otherwise we try to // launch it later with a corrupted/unstable FileState... // and that causes our launch counts to get out of whack i // think... t->m_isOccupied = false; // note it log("threads: bailing unlaunched thread"); // do we have to decrement top if ( m_top == i + 1 ) while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) m_top--; } } // BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to // ohcrap() because it was taking too long to do its read and we prematurely // called its callback above in bailOnReads(). In that case we still have to // free the disk read buffer which was never used. And doneWrapper() in // BigFile.cpp is never called. void ohcrap ( void *state , ThreadEntry *t ) { // free the read buffer here then if ( t->m_allocBuf ) mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" ); log("threads: got one"); } // . cleans up any threads that have exited // . their m_isDone should be set to true // . don't process threads whose niceness is > maxNiceness bool ThreadQueue::timedCleanUp ( long maxNiceness ) { // top: long numCallbacks = 0; // loop through candidates for ( long i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // skip if not qualified if ( t->m_niceness > maxNiceness ) continue; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet if ( ! t->m_isLaunched ) continue; // . we were segfaulting right here before because the thread // was setting t->m_pid and at this point it was not // set so t->m_pid was a bogus value // . thread may have seg faulted, in which case sigbadhandler() // in Loop.cpp will get it and set errno to 0x7fffffff #ifndef PTHREADS // MDW: i hafta take this out because the errno_location thing // is not working on the newer gcc if ( ! t->m_isDone && t->m_pid >= 0 && s_errnos [t->m_pid] == 0x7fffffff ) { log("thread: Got abnormal thread termination. Seems " "like the thread might have cored."); s_errnos[t->m_pid] = 0; goto again; } #endif // skip if not done yet if ( ! t->m_isDone ) continue; #ifdef PTHREADS // if pthread_create() failed it returns the errno and we // needsJoin is false, so do not try to join // to a thread if we did not create it, lest pthread_join() // cores if ( t->m_needsJoin ) { // . join up with that thread // . damn, sometimes he can block forever on his // call to sigqueue(), long status = pthread_join ( t->m_joinTid , NULL ); if ( status != 0 ) { log("threads: pthread_join %li = %s (%li)", (long)t->m_joinTid,mstrerror(status), status); } // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined1 with t=0x%lx " "jointid=0x%lx.", (long)t,(long)t->m_joinTid); } #else again: int status ; pid_t pid = waitpid ( t->m_pid , &status , 0 ); int err = errno; // debug the waitpid if ( g_conf.m_logDebugThread || g_process.m_exiting ) log(LOG_DEBUG,"thread: Waiting for t=0x%lx pid=%li.", (unsigned long)t,(long)t->m_pid); // bitch and continue if join failed if ( pid != t->m_pid ) { // waitpid() gets interrupted by various signals so // we need to repeat (SIGCHLD?) if ( err == EINTR ) goto again; log("thread: Call to waitpid(%li) returned %li: %s.", (long)t->m_pid,(long)pid,mstrerror(err)); continue; } // if status not 0 then process got abnormal termination if ( ! WIFEXITED(status) ) { if ( WIFSIGNALED(status) ) log("thread: Child process (pid=%i) exited " "from unhandled signal number %li.", pid,(long)WTERMSIG(status)); else log("thread: Child process (pid=%i) exited " "for unknown reason." , pid ); } //mfree ( t->m_stack , STACK_SIZE , "Threads" ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined with pid=%li pid=%li.", (long)t->m_pid,(long)t->m_pid); #endif // we may get cleaned up and re-used and our niceness reassignd // right after set m_isDone to true, so save niceness long niceness = t->m_niceness; char qnum = t->m_qnum; ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum]; if ( tq != this ) { char *xx = NULL; *xx = 0; } // get read size before cleaning it up -- it could get nuked long rs = 0; bool isWrite = false; if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) { //FileState *fs = (FileState *)t->m_state; rs = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( niceness <= 0) tq->m_hiReturned++; else if ( niceness == 1) tq->m_mdReturned++; else if ( niceness >= 2) tq->m_loReturned++; // deal with the tiers for disk threads based on read sizes if ( tq->m_threadType == DISK_THREAD ) { // writes are special cases if ( isWrite ) m_writesReturned++; if ( rs >= 0 && niceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) tq->m_loReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_loReturnedMed++; else tq->m_loReturnedSma++; } else if ( rs >= 0 && niceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) tq->m_mdReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_mdReturnedMed++; else tq->m_mdReturnedSma++; } else if ( rs >= 0 ) { if ( rs > g_conf.m_medReadSize ) tq->m_hiReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_hiReturnedMed++; else tq->m_hiReturnedSma++; } } // now count him as returned m_returned++; // prepare for relaunch if we were cancelled if ( t->m_isCancelled ) { t->m_isCancelled = false; t->m_isLaunched = false; t->m_isDone = false; log("thread: Thread cancelled. Preparing thread " "for relaunch"); continue; } numCallbacks++; // not running any more t->m_isLaunched = false; // not occupied any more t->m_isOccupied = false; // do we have to decrement top if ( m_top == i + 1 ) while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) m_top--; // send a cancel sig to the thread in case it's still there //int err = pthread_cancel ( t->m_tid ); //if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s", // mstrerror(err) ); // one less entry occupied //m_entriesUsed--; // debug msg //log("m_entriesUsed now %li",m_entriesUsed); // one more returned //m_returned++; // clear the g_errno in case set by a previous callback //g_errno = 0; // launch as many threads as we can before calling the // callback since this may hog the CPU like Msg20 does //g_threads.launchThreads(); g_errno = 0; //g_loop.startBlockedCpuTimer(); //only allow a quickpoll if we are nice. //g_loop.canQuickPoll(t->m_niceness); makeCallback ( t ); //long long took = gettimeofdayInMilliseconds()-startTime; //if(took > 8 && maxNiceness > 0) { // if(g_conf.m_sequentialProfiling) // log(LOG_TIMING, // "admin: Threads spent %lli ms to callback " // "%li callbacks, nice: %li", // took, numCallbacks, maxNiceness); // g_threads.m_needBottom = true; // maxNiceness = 0; //} // clear errno again g_errno = 0; if ( g_conf.m_logDebugThread ) { long long now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%lx] %s done1. " "active=%li " "time since queued = %llu ms " "time since launch = %llu ms " "time since pre-exit = %llu ms " "time since exit = %llu ms", (unsigned long)t, getThreadType() , (long)(m_launched - m_returned) , now - t->m_queuedTime, now - t->m_launchedTime, now - t->m_preExitTime , now - t->m_exitTime ); } } //since we need finer grained control in loop, we no longer collect //the callbacks, sort, then call them. we now call them right away //that way we can break out if we start taking too long and //give control back to udpserver. return numCallbacks != 0; } void makeCallback ( ThreadEntry *t ) { // sanity check - if in a high niceness callback, we should // only be calling niceness 0 callbacks here // no, this is only called from sleep wrappers originating from // Loop.cpp, so we should be ok //if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; } // save it long saved = g_niceness; // log it now if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: enter thread callback t=0x%lx " //"type=%s " "state=0x%lx " "nice=%li", (long)t, //getThreadType(), (long)t->m_state, (long)t->m_niceness); // time it? long long start; if ( g_conf.m_maxCallbackDelay >= 0 ) start = gettimeofdayInMillisecondsLocal(); // then set it if ( t->m_niceness >= 1 ) g_niceness = 1; else g_niceness = 0; t->m_callback ( t->m_state , t ); // time it? if ( g_conf.m_maxCallbackDelay >= 0 ) { long long elapsed = gettimeofdayInMillisecondsLocal() - start; if ( elapsed >= g_conf.m_maxCallbackDelay ) log("threads: Took %lli ms to call " "thread callback niceness=%li", elapsed,(long)saved); } // log it now if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread ) log(LOG_DEBUG,"loop: exit thread callback t=0x%lx " //"type=%s " "nice=%li", (long)t, //getThreadType(), (long)t->m_niceness); // restore global niceness g_niceness = saved; } bool Threads::cleanUp ( ThreadEntry *t , long maxNiceness ) { bool didSomething = false; loop: // assume no more cleanup needed m_needsCleanup = false; //m_needBottom = false; // debug msg //log("cleanUp"); // debug msg //log("cleaning up exited threads and calling callbacks"); for ( long i = 0 ; i < m_numQueues ; i++ ) { didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness ); // . if we broke from the loop //if(m_needBottom) maxNiceness = 0; } // . loop more if we got a new one // . thread will set this when about to exit // . waitpid() may be interrupted by a SIGCHLD and not get his pid if ( m_needsCleanup ) goto loop; return didSomething; } // . cleans up any threads that have exited // . their m_isDone should be set to true // . don't process threads whose niceness is > maxNiceness bool ThreadQueue::cleanUp ( ThreadEntry *tt , long maxNiceness ) { // call all callbacks after all threads are cleaned up void (* callbacks[64])(void *state,ThreadEntry *); void *states [64]; long long times [64]; long long times2 [64]; long long times3 [64]; long long times4 [64]; ThreadEntry *tids [64]; long long startTime = gettimeofdayInMilliseconds(); top: long numCallbacks = 0; // loop through candidates for ( long i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // skip if not qualified if ( t->m_niceness > maxNiceness ) { //if(t->m_isDone) { // g_threads.m_needBottom = true; // //g_threads.m_needsCleanup = true; //} continue; } // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet if ( ! t->m_isLaunched ) continue; // . we were segfaulting right here before because the thread // was setting t->m_pid and at this point it was not // set so t->m_pid was a bogus value // . thread may have seg faulted, in which case sigbadhandler() // in Loop.cpp will get it and set errno to 0x7fffffff #ifndef PTHREADS // MDW: i hafta take this out because the errno_location thing // is not working on the newer gcc if ( ! t->m_isDone && t->m_pid >= 0 && s_errnos [t->m_pid] == 0x7fffffff ) { log("thread: Got abnormal thread termination. Seems " "like the thread might have cored."); s_errnos[t->m_pid] = 0; goto again; } #endif // skip if not done yet if ( ! t->m_isDone ) continue; #ifdef PTHREADS if ( t->m_needsJoin ) { // . join up with that thread // . damn, sometimes he can block forever on his // call to sigqueue(), long status = pthread_join ( t->m_joinTid , NULL ); if ( status != 0 ) { log("threads: pthread_join2 %li = %s (%li)", (long)t->m_joinTid,mstrerror(status), status); } // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined2 with t=0x%lx " "jointid=0x%lx.", (long)t,(long)t->m_joinTid); } #else again: int status ; pid_t pid = waitpid ( t->m_pid , &status , 0 ); int err = errno; // debug the waitpid if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: Waiting for t=0x%lx pid=%li.", (long)t,(long)t->m_pid); // bitch and continue if join failed if ( pid != t->m_pid ) { // waitpid() gets interrupted by various signals so // we need to repeat (SIGCHLD?) if ( err == EINTR ) goto again; log("thread: Call to waitpid(%li) returned %li: %s.", (long)t->m_pid,(long)pid,mstrerror(err)); continue; } // if status not 0 then process got abnormal termination if ( ! WIFEXITED(status) ) { if ( WIFSIGNALED(status) ) log("thread: Child process (pid=%i) exited " "from unhandled signal number %li.", pid,(long)WTERMSIG(status)); else log("thread: Child process (pid=%i) exited " "for unknown reason." , pid ); } //mfree ( t->m_stack , STACK_SIZE , "Threads" ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); #endif // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined with pid=%li pid=%li.", (long)t->m_pid,(long)t->m_pid); // we may get cleaned up and re-used and our niceness reassignd // right after set m_isDone to true, so save niceness long niceness = t->m_niceness; char qnum = t->m_qnum; ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum]; if ( tq != this ) { char *xx = NULL; *xx = 0; } // get read size before cleaning it up -- it could get nuked long rs = 0; bool isWrite = false; if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) { //FileState *fs = (FileState *)t->m_state; rs = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( niceness <= 0) tq->m_hiReturned++; else if ( niceness == 1) tq->m_mdReturned++; else if ( niceness >= 2) tq->m_loReturned++; // deal with the tiers for disk threads based on read sizes if ( tq->m_threadType == DISK_THREAD ) { // writes are special cases if ( isWrite ) m_writesReturned++; if ( rs >= 0 && niceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) tq->m_loReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_loReturnedMed++; else tq->m_loReturnedSma++; } else if ( rs >= 0 && niceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) tq->m_mdReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_mdReturnedMed++; else tq->m_mdReturnedSma++; } else if ( rs >= 0 ) { if ( rs > g_conf.m_medReadSize ) tq->m_hiReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_hiReturnedMed++; else tq->m_hiReturnedSma++; } } // . we should count down here, not in the master thread // . solves Problem #2 ? // . TODO: this is not necessaruly atomic we should set // t->m_aboutToExit to true so cleanUp can periodically set // m_returned to what it should be!!! //g_threads.m_threadQueues[qnum].m_returned++; // now count him as returned m_returned++; // prepare for relaunch if we were cancelled if ( t->m_isCancelled ) { t->m_isCancelled = false; t->m_isLaunched = false; t->m_isDone = false; log("thread: Thread cancelled. Preparing thread " "for relaunch"); continue; } // debug msg //log("[%lu] CLEANING UP THREAD type=%li, numLaunched=%li", // m_entries[i].m_tid , m_threadType , m_launched ); // remove it // debug msg //log("CLN TID=%lu t=%lu",(long)t->m_tid , (long)t); //log("thread callback for tid=%lu",(long)t->m_tid ); // . save important stuff before freeing up the ThreadEntry // for possible take over. // . calling the callback may launch a thread which may // claim THIS thread entry, t //void (* callback)(void *state); //callback = t->m_callback; //void *state = t->m_state; callbacks [ numCallbacks ] = t->m_callback; states [ numCallbacks ] = t->m_state; times [ numCallbacks ] = t->m_queuedTime; times2 [ numCallbacks ] = t->m_launchedTime; times3 [ numCallbacks ] = t->m_preExitTime; times4 [ numCallbacks ] = t->m_exitTime; tids [ numCallbacks ] = t; numCallbacks++; // SOLUTION: before calling the callback which may launch // another thread with this same tid, thus causing an error, // we should set these to false first: // not running any more t->m_isLaunched = false; // not occupied any more t->m_isOccupied = false; // do we have to decrement top if ( m_top == i + 1 ) while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) m_top--; // send a cancel sig to the thread in case it's still there //int err = pthread_cancel ( t->m_tid ); //if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s", // mstrerror(err) ); // one less entry occupied //m_entriesUsed--; // debug msg //log("m_entriesUsed now %li",m_entriesUsed); // one more returned //m_returned++; // clear the g_errno in case set by a previous callback //g_errno = 0; // launch as many threads as we can before calling the // callback since this may hog the CPU like Msg20 does //g_threads.launchThreads(); g_errno = 0; makeCallback ( t ); // long long took = gettimeofdayInMilliseconds()-startTime; // if(took > 8 && maxNiceness > 0) { // if(g_conf.m_sequentialProfiling) // log(LOG_TIMING, // "admin: Threads spent %lli ms to callback " // "%li callbacks, nice: %li", // took, numCallbacks, maxNiceness); // g_threads.m_needBottom = true; // maxNiceness = 0; // } // clear errno again g_errno = 0; if ( g_conf.m_logDebugThread ) { long long now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%lx] %s done2. " "active=%li " "time since queued = %llu ms " "time since launch = %llu ms " "time since pre-exit = %llu ms " "time since exit = %llu ms", (unsigned long)t, getThreadType() , (long)(m_launched - m_returned) , now - t->m_queuedTime, now - t->m_launchedTime, now - t->m_preExitTime , now - t->m_exitTime ); } // calling thread callback //log("calling thread id %li callback", (long)(t->m_tid)); // first call it's callback //callback ( state ); // clear after just in case //g_errno = 0; // debug msg //log("CLN2 TID=%lu t=%li",(long)t->m_tid ,(long)t); // return now if tt was specified //if ( tt ) return; } long long took2 = gettimeofdayInMilliseconds()-startTime; if(numCallbacks > 0 && took2 > 5) log(LOG_DEBUG, "threads: took %lli ms to callback %li " "callbacks, nice: %li", took2, numCallbacks, maxNiceness); //since we need finer grained control in loop, we no longer collect //the callbacks, sort, then call them. we now call them right away //that way we can break out if we start taking too long and //give control back to udpserver. return numCallbacks != 0; // print out that we got them if ( g_conf.m_logDebugThread ) { long long now = gettimeofdayInMilliseconds(); for ( long i = 0 ; i < numCallbacks ; i++ ) log(LOG_DEBUG,"thread: [tid=%lu] %s done3. " "active=%li " "time since queued = %llu ms " "time since launch = %llu ms " "time since pre-exit = %llu ms " "time since exit = %llu ms", (unsigned long)tids[i], getThreadType() , (long)(m_launched - m_returned) , now - times [i], now - times2[i] , now - times3[i] , now - times4[i] ); } // . before calling callbacks, launch any other threads waiting in // this queue // . TODO: break into parts, cleanup, launch, call callbacks //while ( launchThread() ); // . sort callbacks by queued time // . do bubble sort cuz usually it's not too many threads we cleaned up bool flag ; void (* tmpCallback)(void *state,ThreadEntry *t); long long tmpTime; void *tmpState; long long now = gettimeofdayInMilliseconds(); bubble: flag = false; for ( long i = 1 ; i < numCallbacks ; i++ ) { if ( times[i] >= times[i-1] ) continue; tmpTime = times [i ]; tmpState = states [i ]; tmpCallback = callbacks[i ]; times [i ] = times [i-1]; states [i ] = states [i-1]; tids [i ] = tids [i-1]; callbacks [i ] = callbacks[i-1]; times [i-1] = tmpTime; states [i-1] = tmpState; callbacks [i-1] = tmpCallback; flag = true; } if ( flag ) goto bubble; // call the callbacks now in order of oldest queued time first for ( long i = 0 ; i < numCallbacks ; i++ ) { g_errno = 0; callbacks[i] ( states[i] , NULL ); } long long took = gettimeofdayInMilliseconds()-now; if(numCallbacks > 0 && took > 5) log(LOG_TIMING, "admin: took %lli ms to callback %li " "callbacks, nice: %li", took, numCallbacks, maxNiceness); #if 0 //I think this is more efficient, for now we'll just use the old way //theres no reason to sort these, lets just find the top one //and call it. do it again it until we've called all of them. long newNumCallbacks = numCallbacks; for ( long i = 0 ; i < numCallbacks ; i++ ) { long long maxTime = 0; long maxNdx = 0; for ( long j = 0 ; j < newNumCallbacks ; j++ ) { if(maxTime >= times[i]) { maxTime = times[i]; maxNdx = i; } } g_errno = 0; callbacks[maxNdx] ( states[maxNdx] ); //copy last one into called slot and decrement newNumCallbacks--; times [maxNdx] = times [newNumCallbacks]; states [maxNdx] = states [newNumCallbacks]; callbacks [maxNdx] = callbacks[newNumCallbacks]; } #endif g_errno = 0; // close more threads if we were limited by callbacks[] size if ( numCallbacks >= 64 ) goto top; //return true if we called something back; //returns wrong value if numCallbacks was exactly 64, not too serious... return numCallbacks != 0; } // used by UdpServer to see if it should call a low priority callback long Threads::getNumActiveHighPriorityCpuThreads() { ThreadQueue *q ; long hiActive = 0 ; q = &g_threads.m_threadQueues[INTERSECT_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[MERGE_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; return hiActive; } // used by UdpServer to see if it should call a low priority callback long Threads::getNumActiveHighPriorityThreads() { ThreadQueue *q ; long hiActive = 0 ; q = &g_threads.m_threadQueues[DISK_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[INTERSECT_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[MERGE_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; return hiActive; } // . returns false if no thread launched // . returns true if thread was launched // . sets g_errno on error // . don't launch a low priority thread if a high priority thread is running // . i.e. don't launch a high niceness thread if a low niceness is running bool ThreadQueue::launchThread ( ThreadEntry *te ) { // debug msg //log("trying to launch for type=%li",(long)m_threadType); // clean up any threads that have exited //cleanUp (); // if no entries then nothing to launch if ( m_top <= 0 ) return false; // or if no stacks left, don't even try if ( s_head == -1 ) return false; // . how many threads are active now? // . NOTE: not perfectly thread safe here long long active = m_launched - m_returned ; // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: launchThread: active=%lli max=%li.", active,m_maxLaunched); // return if the max is already launched if ( active >= m_maxLaunched ) return false; // do not launch a low priority merge, intersect or filter thread if we // have high priority cpu threads already going on. this way a // low priority spider thread will not launch if a high priority // cpu-based thread of any kind (right now just MERGE or INTERSECT) // is already running. long hiActive2 = g_threads.getNumActiveHighPriorityCpuThreads() ; // return log("MAX. %li are launched. %li now in queue.", // active , m_entriesUsed ); // . sanity check // . a thread can NOT call this //if ( getpid() != s_pid ) { // fprintf(stderr,"thread: launchThread: bad engineer\n"); // ::exit(-1); //} //long long now = gettimeofdayInMilliseconds(); long long now = -1LL; // pick thread with lowest niceness first long minNiceness = 0x7fffffff; long long maxWait = -1; long mini = -1; bool minIsWrite = false; long lowest = 0x7fffffff; long highest = 0; // . now base our active thread counts on niceness AND read sizes // . this is only used for DISK_THREADs // . loActive* includes niceness >= 1 long loActiveBig = m_loLaunchedBig - m_loReturnedBig; long loActiveMed = m_loLaunchedMed - m_loReturnedMed; long loActiveSma = m_loLaunchedSma - m_loReturnedSma; long mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig; long mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed; long mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma; long hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig; long hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed; long hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma; long activeWrites = m_writesLaunched - m_writesReturned; // how many niceness=2 threads are currently running now? long long loActive = m_loLaunched - m_loReturned; long long mdActive = m_mdLaunched - m_mdReturned; //long long hiActive = m_hiLaunched - m_hiReturned; long total = loActive + mdActive; long max = g_conf.m_spiderMaxDiskThreads; if ( max <= 0 ) max = 1; // hi priority max // JAB: warning abatement //long long hiActive = m_hiLaunched - m_hiReturned; // i dunno what the point of this was... so i commented it out //long max2 = g_conf.m_queryMaxDiskThreads ; //if ( max2 <= 0 ) max2 = 1; // only do this check if we're a addlists/instersect thread queue //if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false; // loop through candidates for ( long i = 0 ; i < m_top ; i++ ) { // skip if not occupied if ( ! m_entries[i].m_isOccupied ) continue; long niceness = m_entries[i].m_niceness; // get lowest niceness level of launched threads if ( m_entries[i].m_isLaunched ) { // if he's done, skip him if ( m_entries[i].m_isDone ) continue; // get the highest niceness for all that are launched if ( niceness > highest ) highest = niceness; // get the lowest niceness for all that are launched if ( niceness < lowest ) lowest = niceness; // continue now since it's already launched continue; } // . these filters really make it so the spider does not // impact query response time //if ( niceness >= 1 && hiActive > 0 ) continue; // don't consider any lows if one already running //if ( niceness >= 2 && loActive > 0 ) continue; // don't consider any lows if a hi already running //if ( niceness >= 2 && hiActive > 0 ) continue; // don't consider any mediums if one already running //if ( niceness == 1 && mdActive > 0 ) continue; // don't consider any mediums if a hi already running //if ( niceness == 1 && hiActive > 0 ) continue; //if ( m_threadType == DISK_THREAD ) { // if ( niceness >= 1 && hiActive > 0 ) continue; // if ( niceness >= 2 && loActive >= max ) continue; // if ( niceness == 1 && mdActive >= max ) continue; //} // treat niceness 1 as niceness 2 for ranking purposes // IFF we're not too backlogged with file merges. i.e. // IFF we're merging faster than we're dumping. // only merges and dumps have niceness 1 really. // spider disk reads are all niceness 2. // Now Rdb::addList just refuses to add data if we have too // many unmerged files on disk! // now we use niceness 1 for "real merges" so those reads take // priority over spider build reads. (8/14/12) //if(niceness == 1 /*&& g_numUrgentMerges <= 0*/) niceness = 2; // if he doesn't beat or tie us, skip him if ( niceness > minNiceness ) continue; // no more than "max" medium and low priority threads should // be active/launched at any one time if ( niceness >= 1 && total >= max ) continue; // shortcut ThreadEntry *t = &m_entries[i]; // what is this guy's read size? // the filestate provided could have been //FileState *fs ; long readSize = 0 ; bool isWrite = false; if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) { //fs = (FileState *)m_entries[i].m_state; readSize = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( isWrite && activeWrites > g_conf.m_maxWriteThreads ) continue; if ( m_threadType == MERGE_THREAD || m_threadType == INTERSECT_THREAD || m_threadType == FILTER_THREAD ) if ( niceness > 0 && hiActive2 > 0 ) continue; // how many threads can be launched for this readSize/niceness? if ( niceness >= 1 && m_threadType == DISK_THREAD ) { if ( readSize > g_conf.m_medReadSize ) { if ( loActiveBig + mdActiveBig >= g_conf.m_spiderMaxBigDiskThreads ) continue; } else if ( readSize > g_conf.m_smaReadSize ) { if ( loActiveMed + mdActiveMed >= g_conf.m_spiderMaxMedDiskThreads ) continue; } else if ( loActiveSma + mdActiveSma >= g_conf.m_spiderMaxSmaDiskThreads ) continue; } else if ( niceness < 1 && m_threadType == DISK_THREAD ) { if ( readSize > g_conf.m_medReadSize ) { if ( hiActiveBig >= g_conf.m_queryMaxBigDiskThreads ) continue; } else if ( readSize > g_conf.m_smaReadSize ) { if ( hiActiveMed >= g_conf.m_queryMaxMedDiskThreads ) continue; } else if ( hiActiveSma >= g_conf.m_queryMaxSmaDiskThreads ) continue; } // be lazy with this since it uses a significant amount of cpu if ( now == -1LL ) now = gettimeofdayInMilliseconds(); // how long has this entry been waiting in the queue to launch? long long waited = now - m_entries[i].m_queuedTime ; // adjust "waited" if it originally had a niceness of 1 if ( m_entries[i].m_niceness >= 1 ) { // save threads gain precedence if ( m_threadType == SAVETREE_THREAD ) waited += 49999999; else if ( m_threadType == UNLINK_THREAD ) waited += 39999999; else if ( m_threadType == MERGE_THREAD ) waited += 29999999; else if ( m_threadType == INTERSECT_THREAD ) waited += 29999999; else if ( m_threadType == FILTER_THREAD ) waited += 9999999; // if its a write thread... do it quick else if ( isWrite ) waited += 19999999; // . if it has waited more than 500 ms it needs // to launch now... // . these values seem to be VERY well tuned on // my production machines, but they may have to // be tuned for other machines? TODO. // . they should auto-tune so when merge is more // important it should starve the other spider reads else if ( waited >= 500 ) waited += 9999999; // it hurts for these guys to wait that long else waited *= 4; } // is real merge? if ( m_entries[i].m_niceness == 1 ) waited += 19999999; // watch out for clock skew if ( waited < 0 ) waited = 0; // if tied, the lowest time wins if ( niceness == minNiceness && waited <= maxWait ) continue; // we got a new winner mini = i; minNiceness = niceness; maxWait = waited; minIsWrite = isWrite; } // if no candidate, bail honorably if ( mini == -1 ) return false; // . if we've got an urgent thread (niceness=1) going on, do not allow // niceness=2 threads to launch // . actually, don't launch *ANY* if doing an urgent merge even if // no niceness=1 thread currently launched // . CAUTION! when titledb is merging it dumps tfndb and if tfndb // goes urgent,make sure it dumps out with niceness 1, otherwise // this will freeze us!! since titledb won't be able to continue // merging until tfndb finishes dumping... // . Now Rdb::addList just refuses to add data if we have too // many unmerged files on disk! Let others still read from us, // just not add to us... //if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 ) // return false; // if we're urgent, don't launch a niceness 2 unless he's waited // at least 200ms in the queue //if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 ) // return false; // . if the thread to launch has niceness > lowest launched then bail // . i.e. don't launch a low-priority thread if we have highs running // . we no longer let a niceness of 1 prevent a niceness of 2 from // launching, this way we can launch merge threads at a niceness // of 1 w/o hurting the spidering too much, but still giving the // merge some preferential treatment over the disk so we don't // RdbDump data faster than we can finish the merge. if ( minNiceness > lowest && lowest < 1 ) return false; // . don't launch a low priority thread if there's a high launched // from any ThreadQueue // . hi priority is niceness <= 0, med/lo priority is niceness >= 1 //long long hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned; // now just limit it to this queue... so you can launch a low // merge thread even if there's a high disk read going on //if ( minNiceness >= 1 && hiActive > 0 ) return false; // if we're low priority and suspended is true then bail //if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false; // if we're going to launch a high priority thread then CANCEL // any low priority disk-read threads already in progress //if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads (); // . let's cancel ALL low priority threads if we're launching a high // . hi priority is niceness <= 0, low priority is niceness >= 1 //long long loActive = g_threads.m_loLaunched - g_threads.m_loReturned; long realNiceness = m_entries[mini].m_niceness; //if ( realNiceness <= 0 && (loActive + mdActive) > 0 ) // // this actually cancels medium priority threads, too // g_threads.cancelLowPriorityThreads(); // point to winning entry ThreadEntry *t = &m_entries[mini]; // if descriptor was closed, just return error now, we // cannot try to re-open because the file might have been // unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid // FileState ptr because it does its own saving, so check for NULLs. FileState *fs = (FileState *)t->m_state; bool allocated = false; if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) { // allocate the read buffer here! if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) { long need = t->m_bytesToGo + fs->m_allocOff; char *p = (char *) mmalloc ( need , "ThreadReadBuf" ); if ( p ) { fs->m_buf = p + fs->m_allocOff; fs->m_allocBuf = p; fs->m_allocSize = need; allocated = true; } else log("thread: read buf alloc failed for %li " "bytes.",need); // just let the BigFile::readWrite_r() handle the // error for the NULL read buf } // . otherwise, they are intact, so get the real fds // . we know the stored File is still around because of that bool doWrite = fs->m_doWrite; BigFile *bb = fs->m_this; fs->m_fd1 = bb->getfd (fs->m_filenum1, !doWrite, &fs->m_vfd1); fs->m_fd2 = bb->getfd (fs->m_filenum2, !doWrite, &fs->m_vfd2); // is this bad? if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s", fs->m_fd1,bb->m_baseFilename); if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.", fs->m_fd2,bb->m_baseFilename); fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 ); fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 ); } // count it as launched now, before we actually launch it m_launched++; // priority-based GLOBAL & LOCAL launch count if ( realNiceness <= 0 ) m_hiLaunched++; else if ( realNiceness == 1 ) m_mdLaunched++; else if ( realNiceness >= 2 ) m_loLaunched++; // deal with the tiers for disk threads based on read sizes if ( m_threadType == DISK_THREAD ) { // writes are special cases if ( minIsWrite ) m_writesLaunched++; //FileState *fs = (FileState *)m_entries[mini].m_state; long rs = t->m_bytesToGo; // 0; //if ( fs ) rs = fs->m_bytesToGo; if ( realNiceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) m_loLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_loLaunchedMed++; else m_loLaunchedSma++; } else if ( realNiceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) m_mdLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_mdLaunchedMed++; else m_mdLaunchedSma++; } else { if ( rs > g_conf.m_medReadSize ) m_hiLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_hiLaunchedMed++; else m_hiLaunchedSma++; } } // debug msg //if ( m_threadType == 0 ) // log("creating thread, t=%lu state=%lu launched = %li", // t , (long)t->m_state , m_launched ); // and set the flag t->m_isLaunched = true; // . launch it // . this sets the pthread_t ptr for identificatoin // . returns false on error //pthread_t tmp; loop: // debug msg if ( g_conf.m_logDebugThread ) { active = m_launched - m_returned ; long long now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%lx] launched %s thread. " "active=%lli " "niceness=%lu. waited %llu ms in queue.", (unsigned long)t, getThreadType(), active, realNiceness, now - t->m_queuedTime); } // be lazy with this since it uses a significant amount of cpu if ( now == -1LL ) now = gettimeofdayInMilliseconds(); //t->m_launchedTime = g_now; t->m_launchedTime = now; loop2: // spawn the thread long count = 0; pid_t pid; #ifndef PTHREADS //int status; //int ret; // random failure test //if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; } // malloc twice the size t->m_stackSize = STACK_SIZE; //t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" ); long si = g_threads.getStack ( ); if ( si < 0 ) { log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer."); goto hadError; } t->m_si = si; t->m_stack = s_stackPtrs [ si ]; // UNprotect the whole stack so we can use it mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE , PROT_READ | PROT_WRITE ); // clear g_errno g_errno = 0; // . make another process // . do not use sig handlers, so if a child process gets any unhandled // signal (like SEGV) it will just exit pid = clone ( startUp , t->m_stack + t->m_stackSize , CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND | SIGCHLD , (void *)t ); // . we set the pid because we are the one that checks it! // . if we just let him do it, when we check in cleanup routine // we can get an uninitialized pid t->m_pid = pid; // might as well bitch if we should here if ( s_bad ) { log(LOG_LOGIC,"thread: PID received: %li > %li. Bad.", s_badPid, (long)MAX_PID); //char *xx = NULL; *xx = 0; } // wait for him //ret = waitpid ( -1*pid , &status , 0 ); //if ( ret != pid ) // log("waitpid(pid=%li): ret=%li err=%s", // (long)pid,(long)ret,mstrerror(errno)); // check if he's done //if ( ! t->m_isDone ) log("NOT DONE"); // set the pid //t->m_pid = pid; // error? if ( pid == (pid_t)-1 ) g_errno = errno; // // now use pthreads again... are they stable yet? // #else // assume it does not go through t->m_needsJoin = false; // pthread inherits our sigmask, so don't let it handle sigalrm // signals in Loop.cpp, it'll screw things up. that handler // is only meant to be called by the main process. if we end up // double calling it, this thread may think g_callback is non-null // then it gets set to NULL, then the thread cores! seen it... sigset_t sigs; sigemptyset ( &sigs ); sigaddset ( &sigs , SIGALRM ); if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 ) log("threads: failed to block sig"); // this returns 0 on success, or the errno otherwise g_errno = pthread_create ( &t->m_joinTid , &s_attr, startUp2 , t) ; if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 ) log("threads: failed to unblock sig"); #endif // we're back from pthread_create if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: Back from clone t=0x%lx pid=%li.", (long)t,(long)pid); // return true on successful creation of the thread if ( g_errno == 0 ) { // good stuff, the thread needs a join now t->m_needsJoin = true; if ( count > 0 ) log("thread: Call to clone looped %li times.",count); return true; } #undef usleep // forever loop if ( g_errno == EAGAIN ) { if ( count++ == 0 ) log("thread: Call to clone had error: %s.", mstrerror(g_errno)); usleep(1); goto loop2; } // debug msg // log("created tid=%li",(long)t->m_tid); // return true; //} // do again if g_errno is AGAIN if ( g_errno == EINTR ) { log("thread: Call to clone was interrupted. Trying again."); goto loop; } #ifndef PTHREADS hadError: #endif if ( g_errno ) log("thread: pthread_create had error = %s", mstrerror(g_errno)); // it didn't launch, did it? dec the count. m_launched--; // priority-based LOCAL & GLOBAL launch counts if ( realNiceness <= 0 ) m_hiLaunched--; else if ( realNiceness == 1 ) m_mdLaunched--; else if ( realNiceness >= 2 ) m_loLaunched--; // . deal with the tiers for disk threads based on read sizes // . WARNING: we cannot easily change tiers dynamically // because it will throw these counts off if ( m_threadType == DISK_THREAD ) { // writes are special cases if ( minIsWrite ) m_writesLaunched--; //FileState *fs = (FileState *)m_entries[mini].m_state; long rs = t->m_bytesToGo; // 0; //if ( fs ) rs = fs->m_bytesToGo; if ( realNiceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) m_loLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_loLaunchedMed--; else m_loLaunchedSma--; } else if ( realNiceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) m_mdLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_mdLaunchedMed--; else m_mdLaunchedSma--; } else { if ( rs > g_conf.m_medReadSize ) m_hiLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_hiLaunchedMed--; else m_hiLaunchedSma--; } } // unset the flag t->m_isLaunched = false; // bail on other errors log("thread: Call to clone had error: %s.", mstrerror(g_errno)); // correction on this error log("thread: Try not using so much memory. " "memused now =%lli.",g_mem.getUsedMem()); // free allocated buffer if ( allocated ) { mfree ( fs->m_buf , fs->m_allocSize , "ThreadReadBuf" ); fs->m_buf = NULL; } // if this is the direct thread request do not call callback, just // return false if ( t == te ) return log("thread: Returning false."); // do it blocking log("thread: Calling without thread. This will crash many times. " "Please fix it."); // unsigned long long profilerStart,profilerEnd; // unsigned long long statStart,statEnd; //if (g_conf.m_profilingEnabled){ // address=(long)t->m_startRoutine; // g_profiler.startTimer(address, __PRETTY_FUNCTION__); //} t->m_startRoutine ( t->m_state , t ); //if (g_conf.m_profilingEnabled) { // if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__)) // log(LOG_WARN,"admin: Couldn't add the fn %li", // (long)address); //} t->m_exitTime = gettimeofdayInMilliseconds(); // flag it for cleanup t->m_isDone = true; t->m_isLaunched = true; // clean it up cleanUp ( t , 200/*maxNiceness thread can have to be cleaned up*/ ); // ignore error g_errno = 0; // we kinda launched one, so say true here return true; // false; } #ifndef PTHREADS static bool s_firstTime = true; #endif // threads start up with cacnellation deferred until pthreads_testcancel() // is called, but we never call that int startUp ( void *state ) { // get thread entry ThreadEntry *t = (ThreadEntry *)state; // no! now parent does since he is the one that needs to check it // in the cleanup routine // remember the pid //t->m_pid = getpid(); // . sanity check // . a thread can NOT call this #ifndef PTHREADS if ( getpid() == s_pid ) log("thread: Thread has same pid %i as main process.",s_pid); #endif // the cleanup handler //pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state ); // our signal set sigset_t set; sigemptyset(&set); //sigaddset(&set, SIGHUP); // we need this here so if we break the gb process with gdb it // does not kill the child processes when it sends out the SIGINT. sigaddset(&set, SIGINT); // ignore the real time signal, man... //sigaddset(&set, GB_SIGRTMIN); //pthread_sigmask(SIG_BLOCK, &set, NULL); #ifndef PTHREADS sigprocmask(SIG_BLOCK, &set, NULL); #else pthread_sigmask(SIG_BLOCK,&set,NULL); #endif // . what this lwp's priority be? // . can range from -20 to +20 // . the lower p, the more cpu time it gets // . this is really the niceness, not the priority int p ; // currently our niceness ranges from -1 to 2 for us if ( t->m_niceness == 2 ) p = 19 ; else if ( t->m_niceness == 1 ) p = 10 ; else p = 0 ; // remember the tid //t->m_tid = pthread_self(); // debug if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%lx] in startup pid=%li pppid=%li", (unsigned long)t,(long)getpidtid(),(long)getppid()); // debug msg //fprintf(stderr,"new thread tid=%li pid=%li\n", // (long)t->m_tid,(long)t->m_pid); // . set this process's priority // . setpriority() is only used for SCHED_OTHER threads //if ( pthread_setschedprio ( getpidtid() , p ) ) { #ifndef PTHREADS if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) { // do we even support logging from a thread? if ( s_firstTime ) { log("thread: Call to setpriority(%lu,%i) in thread " "failed: %s. This " "message will not be repeated.", (unsigned long)getpidtid(),p,mstrerror(errno)); s_firstTime = false; } errno = 0; } #endif /* sched_param sp; int pp; int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ; if ( err ) log("thread: startUp: pthread_getschedparam: %s", mstrerror(err)); // adjust the priority p = 1; sp.sched_priority = p; // and reassign err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ; log("thread: startUp: pthread_setschedparam(%li): %s", p,mstrerror(err)); */ // somehow, it works ok when we have this print statement delay!!! //fprintf(stderr,"thread pid = %li\n",(long)getpid()); // . call the startRoutine // . IMPORTANT: this can NEVER do non-blocking stuff t->m_startRoutine ( t->m_state , t ); // pop it off //pthread_cleanup_pop ( 1 /*execute handler?*/ ); // . now throw it on g_loop's sigqueue // . the first 4 bytes of t->m_state should be t->m_callback // . no! just use 1 to tell Loop to call g_threads.cleanUp() // . TODO: pass in a ptr to cleanUpWrapper() instead of "t" sigval_t svt; svt.sival_int = (int)t ; //(int)(t->m_state); // fd; // set exit time long long now = gettimeofdayInMilliseconds(); t->m_preExitTime = now; t->m_exitTime = now; if ( g_conf.m_logDebugThread ) { log(LOG_DEBUG,"thread: [t=0x%lx] done with startup pid=%li", (unsigned long)t,(long)getpidtid()); } // . now mark thread as ready for removal // . do this BEFORE queing the signal since we're still a thread!!! // . cleanUp() will take care of the rest // . cleanUp() will call pthread_join on us! t->m_isDone = true; // let Loop.cpp's sigHandler_r call g_thread.cleanUp() g_threads.m_needsCleanup = true; //if(t->m_niceness > 0) g_threads.m_needBottom = true; // . send the signal // . if queue is full g_loop will get a SIGIO and call // g_threads.cleanUp()/launchThreads() in it's doPoll() routine // . we reserve GB_SIGRTMIN itself for unblocked interrupts for // UdpServer // . damn, it seems that if the queue is full pthread_join is unable // to detach threads.... so sleep until it clears up // . HEY! process is supposed to send us an ECHLD signal? right? //sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ; // . it does not send us a signal automatically, so we must do it! // . i noticed during the linkdb rebuild we were not getting the signal sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ; return 0; } // pthread_create uses this one void *startUp2 ( void *state ) { startUp ( state ); return NULL; } // watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we // could be in a signal handler here void ThreadQueue::suspendLowPriorityThreads() { // disable for now //return; // just return if already suspended if ( m_isLowPrioritySuspended ) return; // log it //log("ThreadQueue:: suspending low priority threads!!!!!!"); // set the flag so low priority threads won't be launched m_isLowPrioritySuspended = true; // . cancel any outstanding low priority threads that are running // . no, only cancel if we run another disk thread // . this will happen above in ThreadQueue::launchThread() //cancelLowPriorityThreads(); } // this is called by UdpServer::destroySlot() and should NOT be in a sig handlr void ThreadQueue::resumeLowPriorityThreads() { // disable for now //return; // bail if no need if ( ! m_isLowPrioritySuspended ) return; // turn em back on m_isLowPrioritySuspended = false; // just in case, though //if ( g_inSigHandler ) { // log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig " // "handler."); // return; //} // try to start up some threads then g_threads.launchThreads(); } void ThreadQueue::print ( ) { // loop through candidates for ( long i = 0 ; i < m_top ; i++ ) { ThreadEntry *t = &m_entries[i]; // print it log(LOG_INIT,"thread: address=%lu pid=%u state=%lu " "occ=%i done=%i lnch=%i", (unsigned long)t , t->m_pid , (unsigned long)t->m_state , t->m_isOccupied , t->m_isDone , t->m_isLaunched ); } } const char *ThreadQueue::getThreadType ( ) { const char *s = "unknown"; if ( m_threadType == DISK_THREAD ) s = "disk"; if ( m_threadType == MERGE_THREAD ) s = "merge"; if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists"; if ( m_threadType == FILTER_THREAD ) s = "filter"; if ( m_threadType == SAVETREE_THREAD ) s = "savetree"; if ( m_threadType == UNLINK_THREAD ) s = "unlink"; if ( m_threadType == GENERIC_THREAD ) s = "generic"; return s; } #include "BigFile.h" // FileState class long Threads::getDiskThreadLoad ( long maxNiceness , long *totalToRead ) { ThreadQueue *q = &m_threadQueues[DISK_THREAD]; ThreadEntry *e = q->m_entries; long top = q->m_top; *totalToRead = 0; long n = 0; // we really can't suspend threads cuz they might have the // mutex lock so we just cancel the disk threads here then for ( long i = 0 ; i < top ; i++ ) { // get entry ThreadEntry *t = &e[i]; // skip if not occupied if ( ! t->m_isOccupied ) continue; // skip if it's nicer than what we want if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue; // skip if already done if ( t->m_isDone ) continue; // cast state data FileState *fs = (FileState *) t->m_state; // sometimes NULL, like from Sync.cpp's call if ( ! fs ) continue; // only remove read operations, since write operations get // the fd up front if ( t->m_doWrite ) continue; // how many byte to do //long todo = fs->m_bytesToGo - fs->m_bytesDone; long todo = t->m_bytesToGo; // multiply by 2 if a write if ( t->m_doWrite ) todo *= 2; // add to total bytes to read *totalToRead += todo; // count the thread n++; } return n; } // when a BigFile is removed, much like we remove its pages from DiskPageCache // we also remove any unlaunched reads/writes on it from the thread queue. void ThreadQueue::removeThreads ( BigFile *bf ) { // did the BigFile get hosed? that means our BigFile was // unlinked or closed before we got a chance to launch the // thread. long maxi = -1; for ( long i = 0 ; i < m_top ; i++ ) { ThreadEntry *t = &m_entries[i]; // skip if not occupied if ( ! t->m_isOccupied ) continue; // get the filestate FileState *fs = (FileState *)t->m_state; // skip if NULL if ( ! fs ) continue; // skip if not match if ( fs->m_this != (void *)bf ) continue; // . let it finish writing if it is a write thread // . otherwise, if we are exiting, we could free the // buffer being written and cause the thread to core... if ( fs->m_doWrite ) { log(LOG_INFO,"disk: Not removing write thread."); continue; } // . should we really? if we renamed the file to another, // we need to recompute the offsets to read, etc.. so we // should fail up to Msg5 with EFILECLOSED or something... // . i think we did a rename and it got the same fd, and since // we did not remove the launched or done threads after the // rename, we're not sure if they read from the newly renamed // file or not, and our read offset was for the old file... // . at least set the error flag for doneWrapper() fs->m_errno2 = EFILECLOSED; // log it logf(LOG_INFO,"disk: Removing/flagging operation in thread " "queue. fs=0x%lx", (long)fs); // skip if already done if ( t->m_isDone ) continue; // skip if launched if ( t->m_isLaunched ) continue; // note in the log it is launched log(LOG_INFO,"disk: Thread is launched."); // tell donewrapper what happened fs->m_errno = EFILECLOSED; g_errno = EFILECLOSED; // note it //log(LOG_INFO,"disk: Removing operation from thread queue."); // remove it from the thread queue t->m_isDone = true; t->m_isLaunched = false; t->m_isOccupied = false; // keep track maxi = i; makeCallback ( t ); } // do we have to decrement top if ( m_top == maxi + 1 ) while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--; g_errno = 0; } void Threads::printState() { long long now = gettimeofdayInMilliseconds(); for ( long i = 0 ; i < m_numQueues; i++ ) { ThreadQueue *q = &m_threadQueues[i]; long loActive = q->m_loLaunched - q->m_loReturned; long mdActive = q->m_mdLaunched - q->m_mdReturned; long hiActive = q->m_hiLaunched - q->m_hiReturned; long total = loActive + mdActive + hiActive; if( total == 0) continue; log(LOG_TIMING, "admin: Thread counts: type:%s " "%li:low %li:med %li:high %li:total", q->getThreadType(), loActive, mdActive, hiActive, total); for ( long j = 0 ; j < q->m_top ; j++ ) { ThreadEntry *t = &q->m_entries[j]; if(!t->m_isOccupied) continue; if(t->m_isDone) { log(LOG_TIMING, "admin: Thread -done- " "nice: %li " "totalTime: %lli (ms) " "queuedTime: %lli(ms) " "runTime: %lli(ms) " "cleanup: %lli(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, t->m_launchedTime - t->m_queuedTime, t->m_exitTime - t->m_launchedTime, now - t->m_exitTime, g_profiler.getFnName((long)t->m_callback)); continue; } if(t->m_isLaunched) { log(LOG_TIMING, "admin: Thread -launched- " "nice: %li " "totalTime: %lli(ms) " "queuedTime: %lli(ms) " "runTime: %lli(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, t->m_launchedTime - t->m_queuedTime, now - t->m_launchedTime, g_profiler.getFnName((long)t->m_callback)); continue; } log(LOG_TIMING, "admin: Thread -queued- " "nice: %li " "queueTime: %lli(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, g_profiler.getFnName((long)t->m_callback)); } } }