#include "gb-include.h" #include "BigFile.h" #include "Threads.h" #include "Errno.h" #include "Loop.h" #include #include #include // getuid()/pid_t/getpid() #include // waitpid() #include "Rdb.h" // g_mergeUrgent #include // clone() //#include "Msg16.h" // g_pid g_ticker #include "XmlDoc.h" // g_pid g_ticker #include "Profiler.h" #include "Stats.h" #include "Process.h" // try using pthreads again //#define PTHREADS // use these stubs so libplotter.a works #ifndef PTHREADS int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; } int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; } #else #include pthread_attr_t s_attr; #endif // main process id (or thread id if using pthreads) //static pid_t s_pid = (pid_t) -1; // on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit: static pthread_t s_pid = (pid_t) -1; //pid_t getpidtid() { // on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit: pthread_t getpidtid() { #ifdef PTHREADS // gettid() is a bit newer so not in our libc32... // so kinda fake it. return the "thread" id, not process id. // Threads::amThread() should still work based on thread ids because // the main process has a unique thread id as well. return pthread_self(); #else return (pthread_t)getpid(); #endif } // BIG PROBLEM: // When doing pthread_join it doesn't always ensure a thread doesn't go // zombie. It seems like when SIGIOs are generated by sigqueue() because // of a full signal queue, threads start going zombie on me. // PROBLEM #1: with using the "state" to identify a thread in the queue. // Often a caller make s a disk access using a certain "state" var. // He gets control back when cleanUp() calls t->m_callback. And he launches // another read thread in there, which calls Threads::exit(state) before // the original thread entry's m_isOccupied is set to false. So // Threads::exit(state) mistakenly picks the thread entry from the former // thread picks it has the same state as its thread! // PROBLEM #2: technically, a thread in Threads::exit() can set its m_done // bit then send the signal. When the Loop sig handler sees the done bit is // set it decrements the global thread count even though the thread may // not have actually exited yet!! I spawned like 1,000 threads this way!!!!! // a global class extern'd in .h file Threads g_threads; // . call this before calling a ThreadEntry's m_startRoutine // . allows us to set the priority of the thread int startUp ( void *state ) ; void *startUp2 ( void *state ) ; // JAB: warning abatement //static void launchThreadsWrapper ( int fd , void *state ); static void killStalledFiltersWrapper ( int fd , void *state ); static void makeCallback ( ThreadEntry *t ) ; // is the caller a thread? bool Threads::amThread () { if ( s_pid == (pthread_t)-1 ) return false; return ( getpidtid() != s_pid ); } #ifndef PTHREADS static int32_t s_bad = 0; static int32_t s_badPid = -1; #endif #define MAX_PID 32767 #ifndef PTHREADS static int s_errno ; static int s_errnos [ MAX_PID + 1 ]; // this was improvised from linuxthreads/errno.c //#define CURRENT_STACK_FRAME ({ char __csf; &__csf; }) // WARNING: you MUST compile with -DREENTRANT for this to work int *__errno_location (void) { int32_t pid = (int32_t) getpid(); //if ( pid == s_pid ) return &g_errno; if ( pid <= (int32_t)MAX_PID ) return &s_errnos[pid]; s_bad++; s_badPid = pid; return &s_errno; } #endif // this also limit the maximum number of outstanding (live) threads #define MAX_STACKS 20 // stack must be page aligned for mprotect #define THRPAGESIZE 8192 // how much of stack to use as guard space #define GUARDSIZE (32*1024) // . crashed in saving with 800k, so try 1M // . must be multiple of THRPAGESIZE #define STACK_SIZE ((512+256) * 1024) // jeta was having some problems, but i don't think they were related to // this stack size of 512k, but i will try boosting to 800k anyway. //#define STACK_SIZE (512 * 1024) // 256k was not enough, try 512k //#define STACK_SIZE (256 * 1024) // at 128k (and even 200k) some threads do not return! why?? there's // obviously some stack overwriting going on! //#define STACK_SIZE (128 * 1024) static char *s_stackAlloc = NULL; static int32_t s_stackAllocSize; #ifndef PTHREADS static char *s_stack = NULL; static int32_t s_stackSize; static char *s_stackPtrs [ MAX_STACKS ]; #endif static int32_t s_next [ MAX_STACKS ]; static int32_t s_head ; // returns NULL if none left int32_t Threads::getStack ( ) { if ( s_head == -1 ) return -1; int32_t i = s_head; s_head = s_next [ s_head ]; return i; } void Threads::returnStack ( int32_t si ) { if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; } s_next[si] = s_head; s_head = si; } void Threads::reset ( ) { if ( s_stackAlloc ) { mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE); mfree ( s_stackAlloc , s_stackAllocSize, "ThreadStack"); } s_stackAlloc = NULL; for ( int32_t i = 0 ; i < MAX_THREAD_QUEUES ; i++ ) m_threadQueues[i].reset(); } Threads::Threads ( ) { m_numQueues = 0; m_initialized = false; } void Threads::setPid ( ) { // set s_pid to the main process id #ifdef PTHREADS s_pid = (pid_t)pthread_self(); //log(LOG_INFO, // "threads: main process THREAD id = %"UINT32"",(int32_t unsigned)s_pid); pthread_t tid = pthread_self(); sched_param param; int policy; // scheduling parameters of target thread pthread_getschedparam ( tid, &policy, ¶m); //log(LOG_INFO, // "threads: min/max thread priority settings = %"INT32"/%"INT32" (policy=%"INT32")", // (int32_t)sched_get_priority_min(policy), // (int32_t)sched_get_priority_max(policy), // (int32_t)policy); #else s_pid = getpid(); #endif } bool Threads::init ( ) { if ( m_initialized ) return true; m_initialized = true; m_needsCleanup = false; //m_needBottom = false; // sanity check //if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; } if ( sizeof(pid_t) > sizeof(pthread_t) ) { char *xx=NULL;*xx=0; } setPid(); #ifdef _STACK_GROWS_UP return log("thread: Stack growing up not supported."); #endif //g_conf.m_logDebugThread = true; // . damn, this only applies to fork() calls, i guess // . a quick a dirty way to restrict # of threads so we don't explode /* struct rlimit lim; lim.rlim_max = 100; if ( setrlimit(RLIMIT_NPROC,&lim) ) log("thread::init: setrlimit: %s", mstrerror(errno) ); else log("thread::init: set max number of processes to 100"); */ // allow threads until disabled m_disabled = false; // # of low priority threads launched and returned //m_hiLaunched = 0; //m_hiReturned = 0; //m_loLaunched = 0; //m_loReturned = 0; //int32_t m_queryMaxBigDiskThreads ; // > 1M read //int32_t m_queryMaxMedDiskThreads ; // 100k - 1M read //int32_t m_queryMaxSmaDiskThreads ; // < 100k per read // categorize the disk read sizes by these here g_conf.m_bigReadSize = 0x7fffffff; g_conf.m_medReadSize = 1000000; g_conf.m_smaReadSize = 100000; // . register a sleep wrapper to launch threads every 30ms // . somtimes a bunch of threads mark themselves as done and the // cleanUp() handler sees them as all still launched so it doesn't // launch any new ones //if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper)) // return log("thread: Failed to initialize timer callback."); if ( ! g_loop.registerSleepCallback(1000,NULL, killStalledFiltersWrapper,0)) return log("thread: Failed to initialize timer callback2."); // debug //log("thread: main process has pid=%"INT32"",(int32_t)s_pid); // . set priority of the main process to 0 // . setpriority() only applies to SCHED_OTHER threads // . priority of threads with niceness 0 will be 0 // . priority of threads with niceness 1 will be 10 // . priority of threads with niceness 2 will be 20 // . see 'man sched_setscheduler' for detail scheduling info // . no need to call getpid(), 0 for pid means the current process #ifndef PTHREADS if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 ) log("thread: Call to setpriority failed: %s.", mstrerror(errno)); #endif // make multiplier higher for raids, can do more seeks //int32_t m = 1; //#ifdef _LARS_ //m = 3; //#endif // register the types of threads here instead of in main.cpp //if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/)) // try running blaster with 5 threads and you'll // . see like a double degrade in performance for some reason!! // . TODO: why? // . well, this should be controlled g_conf.m_maxSpiderDiskThreads // for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for // niceness 0 and below disk threads // . 100 maxThreads out at a time, 32000 can be queued if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000)) return log("thread: Failed to register thread type." ); // . these are used by Msg5 to merge what it reads from disk // . i raised it from 1 to 2 and got better response time from Msg10 // . i leave one open in case one is used for doing a big merge // with high niceness cuz it would hold up high priority ones! // . TODO: is there a better way? cancel it when UdpServer calls // Threads::suspendLowPriorityThreads() ? // . this used to be 2 but now defaults to 10 in Parms.cpp. i found // i have less int32_t gray lines in the performance graph when i // did that on trinity. int32_t max2 = g_conf.m_maxCpuMergeThreads; if ( max2 < 1 ) max2 = 1; if ( ! g_threads.registerType ( MERGE_THREAD , max2,1000) ) return log("thread: Failed to register thread type." ); // will raising this from 1 to 2 make it faster too? // i raised since global specs new servers have 2 (hyperthreaded?) cpus int32_t max = g_conf.m_maxCpuThreads; if ( max < 1 ) max = 1; if ( ! g_threads.registerType ( INTERSECT_THREAD,max,200) ) return log("thread: Failed to register thread type." ); // filter thread spawned to call popen() to filter an http reply if ( ! g_threads.registerType ( FILTER_THREAD , 1/*maxThreads*/,300) ) return log("thread: Failed to register thread type." ); // RdbTree uses this to save itself if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // . File.cpp spawns a rename thread for doing renames and unlinks // . doing a tight merge on titldb can be ~250 unlinks // . MDW up from 1 to 30 max, after doing a ddump on 3000+ collections // it was taking forever to go one at a time through the unlink // thread queue. seemed like a 1 second space between unlinks. // 1/23/1014 if ( ! g_threads.registerType ( UNLINK_THREAD,30/*maxThreads*/,3000) ) return log("thread: Failed to register thread type." ); // generic multipurpose if ( ! g_threads.registerType (GENERIC_THREAD,100/*maxThreads*/,100) ) return log("thread: Failed to register thread type." ); // for call SSL_accept() which blocks for 10ms even when socket // is non-blocking... //if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100)) // return log("thread: Failed to register thread type." ); #ifndef PTHREADS // sanity check if ( GUARDSIZE >= STACK_SIZE ) return log("thread: Stack guard size too big."); // not more than this outstanding int32_t maxThreads = 0; for ( int32_t i = 0 ; i < m_numQueues ; i++ ) maxThreads += m_threadQueues[i].m_maxLaunched; // limit to stack we got if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS; // allocate the stack space s_stackAllocSize = STACK_SIZE * maxThreads + THRPAGESIZE ; // clear stack to help check for overwrites s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" ); if ( ! s_stackAlloc ) return log("thread: Unable to allocate %"INT32" bytes for thread " "stacks.", s_stackAllocSize); log(LOG_INIT,"thread: Using %"INT32" bytes for %"INT32" thread stacks.", s_stackAllocSize,maxThreads); // align s_stack = (char *)(((int)s_stackAlloc+THRPAGESIZE-1)&~(THRPAGESIZE-1)); // new size s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc); // protect the whole stack while not in use if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) ) log("thread: Call to mprotect failed: %s.",mstrerror(errno)); // test //s_stack[0] = 1; // init the linked list for ( int32_t i = 0 ; i < MAX_STACKS ; i++ ) { if ( i == MAX_STACKS - 1 ) s_next[i] = -1; else s_next[i] = i + 1; s_stackPtrs[i] = s_stack + STACK_SIZE * i; } s_head = 0; // don't do real time stuff for now return true; #else // . keep stack size small, 128k // . if we get problems, we'll increase this to 256k // . seems like it grows dynamically from 4K to up to 2M as needed //if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) ) // return log("thread: init: pthread_attr_setstacksize: %s", // mstrerror(errno)); //pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED ); //pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM ); return true; #endif } // all types should be registered in main.cpp before any threads launch bool Threads::registerType ( char type , int32_t maxThreads , int32_t maxEntries ) { // return false and set g_errno if no more room if ( m_numQueues >= MAX_THREAD_QUEUES ) { g_errno = EBUFTOOSMALL; return log(LOG_LOGIC,"thread: registerType: Too many thread " "queues"); } // initialize the ThreadQueue class for this type if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries)) return false; // we got one more queue now m_numQueues++; return true; } int32_t Threads::getNumThreadsOutOrQueued() { int32_t n = 0; for ( int32_t i = 0 ; i < m_numQueues ; i++ ) { // skip INTERSECT_THREAD, used to intersect termlists to // resolve a query. i've seen these get stuck in an infinite // loop sometimes. if ( i == INTERSECT_THREAD ) continue; // skip filter threads, those get stuck sometimes if ( i == FILTER_THREAD ) continue; // tally up all other threads n += m_threadQueues[i].getNumThreadsOutOrQueued(); } return n; } // . returns false (and may set errno) if failed to launch a thread // . returns true if thread added to queue successfully // . may be launched instantly or later depending on # of threads in the queue bool Threads::call ( char type , int32_t niceness , void *state , void (* callback )(void *state,ThreadEntry *t) , void *(* startRoutine)(void *state,ThreadEntry *t) ) { // debug //return false; #ifdef _VALGRIND_ return false; #endif g_errno = 0; // don't spawn any if disabled if ( m_disabled ) return false; if ( ! g_conf.m_useThreads ) return false; if ( type == DISK_THREAD && ! g_conf.m_useThreadsForDisk ) return false; if ( type == MERGE_THREAD && ! g_conf.m_useThreadsForIndexOps ) return false; if ( type == INTERSECT_THREAD && ! g_conf.m_useThreadsForIndexOps ) return false; if ( type == FILTER_THREAD && ! g_conf.m_useThreadsForSystemCalls ) return false; if ( ! m_initialized && ! init() ) return log("db: Threads init failed." ); // . sanity check // . a thread can NOT call this //if ( getpid() != s_pid ) { // fprintf(stderr,"thread: call: bad engineer\n"); // ::exit(-1); //} // don't launch for now //return false; // . sanity check // . ensure top 4 bytes of state is the callback //if ( *(int32_t *)state != (int32_t)callback ) { // g_errno = EBADENGINEER; // sleep(50000); // return log("thread: call: top 4 bytes of state != callback"); //} // debug msg //log("adding thread to queue, type=%"INT32"",(int32_t)type); // find the type int32_t i; for ( i = 0 ; i < m_numQueues ; i++ ) if ( m_threadQueues[i].m_threadType == type ) break; // bitch if type not added via registerType() call if ( i == m_numQueues ) { g_errno = EBADENGINEER; return log(LOG_LOGIC,"thread: addtoqueue: Unregistered " "thread type %"INT32"",(int32_t)type); } // debug msg //log("thread: call: adding entry for thread"); // . add to this queue // . returns NULL and sets g_errno on error ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state, callback,startRoutine); if ( ! t ) return log("thread: Failed to add entry to thread pool: " "%s.",mstrerror(g_errno)); // debug msg //log("added"); // clear g_errno //g_errno = 0; // . try to launch as many threads as we can // . this sets g_errno on error // . if it has an error, just ignore it, our thread is queued m_threadQueues[i].launchThread2 ( NULL ); //if ( ! m_threadQueues[i].launchThread2 ( t ) && g_errno ) { // log("thread: failed thread launch: %s",mstrerror(g_errno)); // return false; //} // return false if there was an error launching the thread //if ( g_errno ) return false; // clear g_errno g_errno = 0; // success return true; } // static void launchThreadsWrapper ( int fd , void *state ) { // // debug msg // //if ( g_conf.m_timingDebugEnabled ) // // log("thread: launchThreadsWrapper: entered"); // // clean up // g_threads.cleanUp(NULL,1000); // // and launch // g_threads.launchThreads(); // } static void killStalledFiltersWrapper ( int fd , void *state ) { // bail if no pid if ( g_pid == -1 ) return; // . only kill after ticker reaches a count of 30 // . we are called once every second, so inc it each time int32_t timeout = g_filterTimeout; if ( timeout <= 0 ) timeout = 30; if ( g_ticker++ < timeout ) return; // debug log("threads: killing stalled filter process of age %"INT32" " "seconds and pid=%"INT32".",g_ticker,(int32_t)g_pid); // kill him int err = kill ( g_pid , 9 ); // don't kill again g_pid = -1; if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) ); } // . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it // is from when a thread exited // . we put that signal there using sigqeueue() in Threads::exit() // . this way another thread can be launched right away int32_t Threads::launchThreads ( ) { // try launching from each queue int32_t numLaunched = 0; for ( int32_t i = m_numQueues - 1 ; i >= 0 ; i-- ) { // clear g_errno g_errno = 0; // launch as many threads as we can from queue #i while ( m_threadQueues[i].launchThread2(NULL) ) numLaunched++; // continue if no g_errno set if ( ! g_errno ) continue; // otherwise bitch about it log("thread: Failed to launch thread: %s.",mstrerror(g_errno)); } // clear g_errno g_errno = 0; return numLaunched; } // . will cancel currently running low priority threads // . will prevent any low priority threads from launching // . will only cancel disk threads for now void Threads::suspendLowPriorityThreads() { // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS."); // just cancel disk threads for now for ( int32_t i = 0 ; i < m_numQueues; i++ ) m_threadQueues[i].suspendLowPriorityThreads(); } void Threads::resumeLowPriorityThreads() { // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS."); for ( int32_t i = 0 ; i < m_numQueues; i++ ) m_threadQueues[i].resumeLowPriorityThreads(); } //void Threads::cancelLowPriorityThreads () { // for ( int32_t i = 0 ; i < m_numQueues; i++ ) // m_threadQueues[i].cancelLowPriorityThreads(); //} /////////////////////////////////////////////////////////////////////// // functions for ThreadQueue /////////////////////////////////////////////////////////////////////// ThreadQueue::ThreadQueue ( ) { m_entries = NULL; m_entriesSize = 0; } void ThreadQueue::reset ( ) { if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" ); m_entries = NULL; m_top = 0; } bool ThreadQueue::init ( char threadType, int32_t maxThreads, int32_t maxEntries ) { m_threadType = threadType; m_launched = 0; m_returned = 0; m_maxLaunched = maxThreads; // # of low priority threads launched and returned m_hiLaunched = 0; m_hiReturned = 0; m_mdLaunched = 0; m_mdReturned = 0; m_loLaunched = 0; m_loReturned = 0; // we now count write threads so we can limit that m_writesLaunched = 0; m_writesReturned = 0; // these are for disk threads, which we now limit based on read sizes m_hiLaunchedBig = 0; m_hiReturnedBig = 0; m_mdLaunchedBig = 0; m_mdReturnedBig = 0; m_loLaunchedBig = 0; m_loReturnedBig = 0; m_hiLaunchedMed = 0; m_hiReturnedMed = 0; m_mdLaunchedMed = 0; m_mdReturnedMed = 0; m_loLaunchedMed = 0; m_loReturnedMed = 0; m_hiLaunchedSma = 0; m_hiReturnedSma = 0; m_mdLaunchedSma = 0; m_mdReturnedSma = 0; m_loLaunchedSma = 0; m_loReturnedSma = 0; //m_entriesUsed = 0; m_top = 0; m_isLowPrioritySuspended = false; // alloc space for entries m_maxEntries = maxEntries; m_entriesSize = sizeof(ThreadEntry)*m_maxEntries; m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" ); if ( ! m_entries ) return log("thread: Failed to allocate %"INT32" bytes " "for thread queue.",m_entriesSize); // debug msg //log("INIT CALLED. setting all m_isDone to 1."); // clear m_isOccupied et al for new guys //for ( int32_t i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) { for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { m_entries[i].m_isOccupied = false; m_entries[i].m_isLaunched = false; m_entries[i].m_isDone = true; m_entries[i].m_qnum = threadType; m_entries[i].m_stack = NULL; } return true; } int32_t ThreadQueue::getNumThreadsOutOrQueued() { int32_t n = m_launched - m_returned; for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) { ThreadEntry *e = &m_entries[i]; if ( ! e->m_isOccupied ) continue; if ( e->m_isLaunched ) continue; if ( e->m_isDone ) continue; // do not count "reads", only count writes //if ( m_threadType == DISK_THREAD && e->m_state ) { // FileState *fs = (FileState *)e->m_state; // if ( fs->m_doWrite ) continue; //} } return n; } // return NULL and set g_errno on error ThreadEntry *ThreadQueue::addEntry ( int32_t niceness , void *state , void (* callback )(void *state, ThreadEntry *t) , void *(* startRoutine)(void *state, ThreadEntry *t) ) { // if we are 90% full and niceness is > 0, knock it off int32_t max = m_maxEntries; if ( m_threadType == DISK_THREAD && niceness > 0 ) { max = (m_maxEntries * 90) / 100; if ( max <= 0 ) max = 1; } // debug test //if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; } // get first available entry, not in use int32_t i; //for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) for ( i = 0 ; i < max ; i++ ) if ( ! m_entries[i].m_isOccupied ) break; // caution //if ( i >= MAX_THREAD_ENTRIES ) { if ( i >= max ) { g_errno = ENOTHREADSLOTS; static time_t s_time = 0; time_t now = getTime(); if ( now - s_time > 5 ) { log("thread: Could not add thread to queue. Already " "have %"INT32" entries.",max); s_time = now; } return NULL; } // debug msg //fprintf(stderr,"addEntry my pid=%"UINT32"\n", (int32_t)getpid() ); // get an available entry ThreadEntry *t = &m_entries [ i ]; // debug msg //log("claiming entry state=%"UINT32", occupied=%"INT32"",(int32_t)t->m_state, // (int32_t)t->m_isOccupied); // stick it in t->m_niceness = niceness; t->m_state = state; t->m_callback = callback; t->m_startRoutine = startRoutine; t->m_isOccupied = true; t->m_isCancelled = false; t->m_stack = NULL; // debug msg //log("setting t=%"UINT32" m_isDone to 0", (int32_t)t ); t->m_isDone = false; t->m_isLaunched = false; t->m_queuedTime = gettimeofdayInMilliseconds(); t->m_readyForBail = false; t->m_allocBuf = NULL; t->m_allocSize = 0; t->m_errno = 0; // and when the ohcrap callback gets called and the thread // is cleaned up it will check the FileState readsize and // m_isWrite to see which launch counts to decrement, so // since FileState will be corrupted, we need to store // this info directly into the thread entry. if ( m_threadType == DISK_THREAD && t->m_state ) { FileState *fs = (FileState *)t->m_state; t->m_bytesToGo = fs->m_bytesToGo; t->m_doWrite = fs->m_doWrite ; } else { t->m_bytesToGo = 0; t->m_doWrite = false; } // inc the used count //m_entriesUsed++; // debug msg //log("m_entriesUsed now %"INT32"",m_entriesUsed); // might have to inc top as well if ( i == m_top ) m_top++; // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "queued %s thread for launch. " "niceness=%"INT32". ", (PTRTYPE)t, getThreadType(), (int32_t)niceness ); // success return t; } int32_t Threads::timedCleanUp (int32_t maxTime, int32_t niceness) { if ( ! m_needsCleanup ) return 0; //if ( g_inSigHandler ) return 0; int64_t startTime = gettimeofdayInMillisecondsLocal(); int64_t took = 0; if ( niceness >= MAX_NICENESS ) m_needsCleanup = false; //for ( int32_t i = -1 ; i <= niceness ; i++ ) { for ( int32_t i = 0 ; i <= niceness ; i++ ) { for ( int32_t j = 0 ; j < m_numQueues ; j++ ) m_threadQueues[j].timedCleanUp ( i ); launchThreads(); if ( maxTime < 0 ) continue; took = startTime - gettimeofdayInMillisecondsLocal(); if ( took <= maxTime ) continue; // ok, we have to cut if int16_t... m_needsCleanup = true; break; } return took; } bool Threads::isHittingFile ( BigFile *bf ) { return m_threadQueues[DISK_THREAD].isHittingFile(bf); } bool ThreadQueue::isHittingFile ( BigFile *bf ) { // loop through candidates for ( int32_t i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // must not be done if ( t->m_isDone ) continue; // must be launched.. really?? //if ( ! t->m_isLaunched ) continue; // must be a read if ( t->m_startRoutine != readwriteWrapper_r ) continue; // int16_tcut FileState *fs = (FileState *)t->m_state; // get bigfile ptr if ( fs->m_this == bf ) return true; } return false; } void Threads::bailOnReads ( ) { m_threadQueues[DISK_THREAD].bailOnReads(); } // Process.cpp calls these callbacks before their time in order to // set EDISKSTUCK void ThreadQueue::bailOnReads ( ) { // note it log("threads: bypassing read threads"); // loop through candidates for ( int32_t i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet //if ( ! t->m_isLaunched ) continue; // must be niceness 0 if ( t->m_niceness != 0 ) continue; // must not be done if ( t->m_isDone ) continue; // must not have already called callback if ( t->m_callback == ohcrap ) continue; // must be a read if ( t->m_startRoutine != readwriteWrapper_r ) continue; // int16_tcut FileState *fs = (FileState *)t->m_state; // do not stop writes if ( fs->m_doWrite ) continue; // must be niceness 0 too! if ( fs->m_niceness != 0 ) continue; // what is this? unlaunched... //if ( t->m_pid == 0 ) continue; // can only bail on a thread after it copies its FileState // class into its stack so we can bypass it and free the // original FileState without causing a core. if thread // is not yet launched we have to call the callback here too // otherwise it never gets launched until the disk is unstuck! if ( ! t->m_readyForBail && t->m_isLaunched ) continue; // set error t->m_errno = EDISKSTUCK; // set this too g_errno = EDISKSTUCK; // do not allow caller to free the alloc'd buf in case // its read finally comes through! t->m_allocBuf = fs->m_allocBuf; t->m_allocSize = fs->m_allocSize; fs->m_allocBuf = NULL; fs->m_allocSize = 0; // call it t->m_callback ( t->m_state , t ); // do not re-call it... t->m_callback = ohcrap; // invalidate state (FileState usually) t->m_state = NULL; // do not launch if not yet launched if ( t->m_isLaunched ) continue; // delete him if not yet launched, otherwise we try to // launch it later with a corrupted/unstable FileState... // and that causes our launch counts to get out of whack i // think... t->m_isOccupied = false; // note it log("threads: bailing unlaunched thread"); // do we have to decrement top if ( m_top == i + 1 ) while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) m_top--; } } // BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to // ohcrap() because it was taking too int32_t to do its read and we prematurely // called its callback above in bailOnReads(). In that case we still have to // free the disk read buffer which was never used. And doneWrapper() in // BigFile.cpp is never called. void ohcrap ( void *state , ThreadEntry *t ) { // free the read buffer here then if ( t->m_allocBuf ) mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" ); log("threads: got one"); } // . cleans up any threads that have exited // . their m_isDone should be set to true // . don't process threads whose niceness is > maxNiceness bool ThreadQueue::timedCleanUp ( int32_t maxNiceness ) { // top: int32_t numCallbacks = 0; // loop through candidates for ( int32_t i = 0 ; i < m_top; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // skip if not qualified if ( t->m_niceness > maxNiceness ) continue; // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet if ( ! t->m_isLaunched ) continue; // . we were segfaulting right here before because the thread // was setting t->m_pid and at this point it was not // set so t->m_pid was a bogus value // . thread may have seg faulted, in which case sigbadhandler() // in Loop.cpp will get it and set errno to 0x7fffffff #ifndef PTHREADS // MDW: i hafta take this out because the errno_location thing // is not working on the newer gcc if ( ! t->m_isDone && t->m_pid >= 0 && s_errnos [t->m_pid] == 0x7fffffff ) { log("thread: Got abnormal thread termination. Seems " "like the thread might have cored."); s_errnos[t->m_pid] = 0; goto again; } #endif // skip if not done yet if ( ! t->m_isDone ) continue; #ifdef PTHREADS // if pthread_create() failed it returns the errno and we // needsJoin is false, so do not try to join // to a thread if we did not create it, lest pthread_join() // cores if ( t->m_needsJoin ) { // . join up with that thread // . damn, sometimes he can block forever on his // call to sigqueue(), int32_t status = pthread_join ( t->m_joinTid , NULL ); if ( status != 0 ) { log("threads: pthread_join %"INT32" = %s (%"INT32")", (int32_t)t->m_joinTid,mstrerror(status), status); } // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined1 with " "t=0x%"PTRFMT" " "jointid=0x%"XINT32".", (PTRTYPE)t,(int32_t)t->m_joinTid); } #else again: int status ; pid_t pid = waitpid ( t->m_pid , &status , 0 ); int err = errno; // debug the waitpid if ( g_conf.m_logDebugThread || g_process.m_exiting ) log(LOG_DEBUG,"thread: Waiting for t=0x%"PTRFMT" " "pid=%"INT32".", (PTRTYPE)t,(int32_t)t->m_pid); // bitch and continue if join failed if ( pid != t->m_pid ) { // waitpid() gets interrupted by various signals so // we need to repeat (SIGCHLD?) if ( err == EINTR ) goto again; log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.", (int32_t)t->m_pid,(int32_t)pid,mstrerror(err)); continue; } // if status not 0 then process got abnormal termination if ( ! WIFEXITED(status) ) { if ( WIFSIGNALED(status) ) log("thread: Child process (pid=%i) exited " "from unhandled signal number %"INT32".", pid,(int32_t)WTERMSIG(status)); else log("thread: Child process (pid=%i) exited " "for unknown reason." , pid ); } //mfree ( t->m_stack , STACK_SIZE , "Threads" ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".", (int32_t)t->m_pid,(int32_t)t->m_pid); #endif // we may get cleaned up and re-used and our niceness reassignd // right after set m_isDone to true, so save niceness int32_t niceness = t->m_niceness; char qnum = t->m_qnum; ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum]; if ( tq != this ) { char *xx = NULL; *xx = 0; } // get read size before cleaning it up -- it could get nuked int32_t rs = 0; bool isWrite = false; if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) { //FileState *fs = (FileState *)t->m_state; rs = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( niceness <= 0) tq->m_hiReturned++; else if ( niceness == 1) tq->m_mdReturned++; else if ( niceness >= 2) tq->m_loReturned++; // deal with the tiers for disk threads based on read sizes if ( tq->m_threadType == DISK_THREAD ) { // writes are special cases if ( isWrite ) m_writesReturned++; if ( rs >= 0 && niceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) tq->m_loReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_loReturnedMed++; else tq->m_loReturnedSma++; } else if ( rs >= 0 && niceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) tq->m_mdReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_mdReturnedMed++; else tq->m_mdReturnedSma++; } else if ( rs >= 0 ) { if ( rs > g_conf.m_medReadSize ) tq->m_hiReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_hiReturnedMed++; else tq->m_hiReturnedSma++; } } // now count him as returned m_returned++; // prepare for relaunch if we were cancelled if ( t->m_isCancelled ) { t->m_isCancelled = false; t->m_isLaunched = false; t->m_isDone = false; log("thread: Thread cancelled. Preparing thread " "for relaunch"); continue; } numCallbacks++; // not running any more t->m_isLaunched = false; // not occupied any more t->m_isOccupied = false; // do we have to decrement top if ( m_top == i + 1 ) while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) m_top--; // send a cancel sig to the thread in case it's still there //int err = pthread_cancel ( t->m_tid ); //if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s", // mstrerror(err) ); // one less entry occupied //m_entriesUsed--; // debug msg //log("m_entriesUsed now %"INT32"",m_entriesUsed); // one more returned //m_returned++; // clear the g_errno in case set by a previous callback //g_errno = 0; // launch as many threads as we can before calling the // callback since this may hog the CPU like Msg20 does //g_threads.launchThreads(); g_errno = 0; //g_loop.startBlockedCpuTimer(); //only allow a quickpoll if we are nice. //g_loop.canQuickPoll(t->m_niceness); makeCallback ( t ); //int64_t took = gettimeofdayInMilliseconds()-startTime; //if(took > 8 && maxNiceness > 0) { // if(g_conf.m_sequentialProfiling) // log(LOG_TIMING, // "admin: Threads spent %"INT64" ms to callback " // "%"INT32" callbacks, nice: %"INT32"", // took, numCallbacks, maxNiceness); // g_threads.m_needBottom = true; // maxNiceness = 0; //} // clear errno again g_errno = 0; if ( g_conf.m_logDebugThread ) { int64_t now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done1. " "active=%"INT32" " "time since queued = %"UINT64" ms " "time since launch = %"UINT64" ms " "time since pre-exit = %"UINT64" ms " "time since exit = %"UINT64" ms", (PTRTYPE)t, getThreadType() , (int32_t)(m_launched - m_returned) , (uint64_t)(now - t->m_queuedTime), (uint64_t)(now - t->m_launchedTime), (uint64_t)(now - t->m_preExitTime) , (uint64_t)(now - t->m_exitTime) ); } } //since we need finer grained control in loop, we no longer collect //the callbacks, sort, then call them. we now call them right away //that way we can break out if we start taking too int32_t and //give control back to udpserver. return numCallbacks != 0; } void makeCallback ( ThreadEntry *t ) { // sanity check - if in a high niceness callback, we should // only be calling niceness 0 callbacks here // no, this is only called from sleep wrappers originating from // Loop.cpp, so we should be ok //if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; } // save it int32_t saved = g_niceness; // log it now if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: enter thread callback t=0x%"PTRFMT" " //"type=%s " "state=0x%"PTRFMT" " "nice=%"INT32"", (PTRTYPE)t, //getThreadType(), (PTRTYPE)t->m_state, (int32_t)t->m_niceness); // time it? int64_t start; if ( g_conf.m_maxCallbackDelay >= 0 ) start = gettimeofdayInMillisecondsLocal(); // then set it if ( t->m_niceness >= 1 ) g_niceness = 1; else g_niceness = 0; t->m_callback ( t->m_state , t ); // time it? if ( g_conf.m_maxCallbackDelay >= 0 ) { int64_t elapsed = gettimeofdayInMillisecondsLocal() - start; if ( elapsed >= g_conf.m_maxCallbackDelay ) log("threads: Took %"INT64" ms to call " "thread callback niceness=%"INT32"", elapsed,(int32_t)saved); } // log it now if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread ) log(LOG_DEBUG,"loop: exit thread callback t=0x%"PTRFMT" " //"type=%s " "nice=%"INT32"", (PTRTYPE)t, //getThreadType(), (int32_t)t->m_niceness); // restore global niceness g_niceness = saved; } bool Threads::cleanUp ( ThreadEntry *t , int32_t maxNiceness ) { bool didSomething = false; loop: // assume no more cleanup needed m_needsCleanup = false; //m_needBottom = false; // debug msg //log("cleanUp"); // debug msg //log("cleaning up exited threads and calling callbacks"); for ( int32_t i = 0 ; i < m_numQueues ; i++ ) { didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness ); // . if we broke from the loop //if(m_needBottom) maxNiceness = 0; } // . loop more if we got a new one // . thread will set this when about to exit // . waitpid() may be interrupted by a SIGCHLD and not get his pid if ( m_needsCleanup ) goto loop; return didSomething; } // . cleans up any threads that have exited // . their m_isDone should be set to true // . don't process threads whose niceness is > maxNiceness bool ThreadQueue::cleanUp ( ThreadEntry *tt , int32_t maxNiceness ) { // call all callbacks after all threads are cleaned up void (* callbacks[64])(void *state,ThreadEntry *); void *states [64]; int64_t times [64]; int64_t times2 [64]; int64_t times3 [64]; int64_t times4 [64]; ThreadEntry *tids [64]; int64_t startTime = gettimeofdayInMilliseconds(); top: int32_t numCallbacks = 0; // loop through candidates for ( int32_t i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) { // point to it ThreadEntry *t = &m_entries[i]; // skip if not qualified if ( t->m_niceness > maxNiceness ) { //if(t->m_isDone) { // g_threads.m_needBottom = true; // //g_threads.m_needsCleanup = true; //} continue; } // must be occupied to be done (sanity check) if ( ! t->m_isOccupied ) continue; // skip if not launched yet if ( ! t->m_isLaunched ) continue; // . we were segfaulting right here before because the thread // was setting t->m_pid and at this point it was not // set so t->m_pid was a bogus value // . thread may have seg faulted, in which case sigbadhandler() // in Loop.cpp will get it and set errno to 0x7fffffff #ifndef PTHREADS // MDW: i hafta take this out because the errno_location thing // is not working on the newer gcc if ( ! t->m_isDone && t->m_pid >= 0 && s_errnos [t->m_pid] == 0x7fffffff ) { log("thread: Got abnormal thread termination. Seems " "like the thread might have cored."); s_errnos[t->m_pid] = 0; goto again; } #endif // skip if not done yet if ( ! t->m_isDone ) continue; #ifdef PTHREADS if ( t->m_needsJoin ) { // . join up with that thread // . damn, sometimes he can block forever on his // call to sigqueue(), int32_t status = pthread_join ( t->m_joinTid , NULL ); if ( status != 0 ) { log("threads: " "pthread_join2 %"INT32" = %s (%"INT32")", (int32_t)t->m_joinTid,mstrerror(status), status); } // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined2 with " "t=0x%"PTRFMT" " "jointid=0x%"XINT32".", (PTRTYPE)t,(int32_t)t->m_joinTid); } #else again: int status ; pid_t pid = waitpid ( t->m_pid , &status , 0 ); int err = errno; // debug the waitpid if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: Waiting for " "t=0x%"PTRFMT" pid=%"INT32".", (PTRTYPE)t,(int32_t)t->m_pid); // bitch and continue if join failed if ( pid != t->m_pid ) { // waitpid() gets interrupted by various signals so // we need to repeat (SIGCHLD?) if ( err == EINTR ) goto again; log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.", (int32_t)t->m_pid,(int32_t)pid,mstrerror(err)); continue; } // if status not 0 then process got abnormal termination if ( ! WIFEXITED(status) ) { if ( WIFSIGNALED(status) ) log("thread: Child process (pid=%i) exited " "from unhandled signal number %"INT32".", pid,(int32_t)WTERMSIG(status)); else log("thread: Child process (pid=%i) exited " "for unknown reason." , pid ); } //mfree ( t->m_stack , STACK_SIZE , "Threads" ); g_threads.returnStack ( t->m_si ); t->m_stack = NULL; // re-protect this stack mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE, PROT_NONE ); #endif // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".", (int32_t)t->m_pid,(int32_t)t->m_pid); // we may get cleaned up and re-used and our niceness reassignd // right after set m_isDone to true, so save niceness int32_t niceness = t->m_niceness; char qnum = t->m_qnum; ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum]; if ( tq != this ) { char *xx = NULL; *xx = 0; } // get read size before cleaning it up -- it could get nuked int32_t rs = 0; bool isWrite = false; if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) { //FileState *fs = (FileState *)t->m_state; rs = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( niceness <= 0) tq->m_hiReturned++; else if ( niceness == 1) tq->m_mdReturned++; else if ( niceness >= 2) tq->m_loReturned++; // deal with the tiers for disk threads based on read sizes if ( tq->m_threadType == DISK_THREAD ) { // writes are special cases if ( isWrite ) m_writesReturned++; if ( rs >= 0 && niceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) tq->m_loReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_loReturnedMed++; else tq->m_loReturnedSma++; } else if ( rs >= 0 && niceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) tq->m_mdReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_mdReturnedMed++; else tq->m_mdReturnedSma++; } else if ( rs >= 0 ) { if ( rs > g_conf.m_medReadSize ) tq->m_hiReturnedBig++; else if ( rs > g_conf.m_smaReadSize ) tq->m_hiReturnedMed++; else tq->m_hiReturnedSma++; } } // . we should count down here, not in the master thread // . solves Problem #2 ? // . TODO: this is not necessaruly atomic we should set // t->m_aboutToExit to true so cleanUp can periodically set // m_returned to what it should be!!! //g_threads.m_threadQueues[qnum].m_returned++; // now count him as returned m_returned++; // prepare for relaunch if we were cancelled if ( t->m_isCancelled ) { t->m_isCancelled = false; t->m_isLaunched = false; t->m_isDone = false; log("thread: Thread cancelled. Preparing thread " "for relaunch"); continue; } // debug msg //log("[%"UINT32"] CLEANING UP THREAD type=%"INT32", numLaunched=%"INT32"", // m_entries[i].m_tid , m_threadType , m_launched ); // remove it // debug msg //log("CLN TID=%"UINT32" t=%"UINT32"",(int32_t)t->m_tid , (int32_t)t); //log("thread callback for tid=%"UINT32"",(int32_t)t->m_tid ); // . save important stuff before freeing up the ThreadEntry // for possible take over. // . calling the callback may launch a thread which may // claim THIS thread entry, t //void (* callback)(void *state); //callback = t->m_callback; //void *state = t->m_state; callbacks [ numCallbacks ] = t->m_callback; states [ numCallbacks ] = t->m_state; times [ numCallbacks ] = t->m_queuedTime; times2 [ numCallbacks ] = t->m_launchedTime; times3 [ numCallbacks ] = t->m_preExitTime; times4 [ numCallbacks ] = t->m_exitTime; tids [ numCallbacks ] = t; numCallbacks++; // SOLUTION: before calling the callback which may launch // another thread with this same tid, thus causing an error, // we should set these to false first: // not running any more t->m_isLaunched = false; // not occupied any more t->m_isOccupied = false; // do we have to decrement top if ( m_top == i + 1 ) while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied) m_top--; // send a cancel sig to the thread in case it's still there //int err = pthread_cancel ( t->m_tid ); //if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s", // mstrerror(err) ); // one less entry occupied //m_entriesUsed--; // debug msg //log("m_entriesUsed now %"INT32"",m_entriesUsed); // one more returned //m_returned++; // clear the g_errno in case set by a previous callback //g_errno = 0; // launch as many threads as we can before calling the // callback since this may hog the CPU like Msg20 does //g_threads.launchThreads(); g_errno = 0; makeCallback ( t ); // int64_t took = gettimeofdayInMilliseconds()-startTime; // if(took > 8 && maxNiceness > 0) { // if(g_conf.m_sequentialProfiling) // log(LOG_TIMING, // "admin: Threads spent %"INT64" ms to callback " // "%"INT32" callbacks, nice: %"INT32"", // took, numCallbacks, maxNiceness); // g_threads.m_needBottom = true; // maxNiceness = 0; // } // clear errno again g_errno = 0; if ( g_conf.m_logDebugThread ) { int64_t now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done2. " "active=%"INT32" " "time since queued = %"UINT64" ms " "time since launch = %"UINT64" ms " "time since pre-exit = %"UINT64" ms " "time since exit = %"UINT64" ms", (PTRTYPE)t, getThreadType() , (int32_t)(m_launched - m_returned) , (uint64_t)(now - t->m_queuedTime), (uint64_t)(now - t->m_launchedTime), (uint64_t)(now - t->m_preExitTime) , (uint64_t)(now - t->m_exitTime) ); } // calling thread callback //log("calling thread id %"INT32" callback", (int32_t)(t->m_tid)); // first call it's callback //callback ( state ); // clear after just in case //g_errno = 0; // debug msg //log("CLN2 TID=%"UINT32" t=%"INT32"",(int32_t)t->m_tid ,(int32_t)t); // return now if tt was specified //if ( tt ) return; } int64_t took2 = gettimeofdayInMilliseconds()-startTime; if(numCallbacks > 0 && took2 > 5) log(LOG_DEBUG, "threads: took %"INT64" ms to callback %"INT32" " "callbacks, nice: %"INT32"", took2, numCallbacks, maxNiceness); //since we need finer grained control in loop, we no longer collect //the callbacks, sort, then call them. we now call them right away //that way we can break out if we start taking too int32_t and //give control back to udpserver. return numCallbacks != 0; // print out that we got them if ( g_conf.m_logDebugThread ) { int64_t now = gettimeofdayInMilliseconds(); for ( int32_t i = 0 ; i < numCallbacks ; i++ ) log(LOG_DEBUG,"thread: [tid=%"PTRFMT"] %s done3. " "active=%"INT32" " "time since queued = %"UINT64" ms " "time since launch = %"UINT64" ms " "time since pre-exit = %"UINT64" ms " "time since exit = %"UINT64" ms", (PTRTYPE)tids[i], getThreadType() , (int32_t)(m_launched - m_returned) , (uint64_t)(now - times [i]), (uint64_t)(now - times2[i]) , (uint64_t)(now - times3[i]) , (uint64_t)(now - times4[i]) ); } // . before calling callbacks, launch any other threads waiting in // this queue // . TODO: break into parts, cleanup, launch, call callbacks //while ( launchThread() ); // . sort callbacks by queued time // . do bubble sort cuz usually it's not too many threads we cleaned up bool flag ; void (* tmpCallback)(void *state,ThreadEntry *t); int64_t tmpTime; void *tmpState; int64_t now = gettimeofdayInMilliseconds(); bubble: flag = false; for ( int32_t i = 1 ; i < numCallbacks ; i++ ) { if ( times[i] >= times[i-1] ) continue; tmpTime = times [i ]; tmpState = states [i ]; tmpCallback = callbacks[i ]; times [i ] = times [i-1]; states [i ] = states [i-1]; tids [i ] = tids [i-1]; callbacks [i ] = callbacks[i-1]; times [i-1] = tmpTime; states [i-1] = tmpState; callbacks [i-1] = tmpCallback; flag = true; } if ( flag ) goto bubble; // call the callbacks now in order of oldest queued time first for ( int32_t i = 0 ; i < numCallbacks ; i++ ) { g_errno = 0; callbacks[i] ( states[i] , NULL ); } int64_t took = gettimeofdayInMilliseconds()-now; if(numCallbacks > 0 && took > 5) log(LOG_TIMING, "admin: took %"INT64" ms to callback %"INT32" " "callbacks, nice: %"INT32"", took, numCallbacks, maxNiceness); #if 0 //I think this is more efficient, for now we'll just use the old way //theres no reason to sort these, lets just find the top one //and call it. do it again it until we've called all of them. int32_t newNumCallbacks = numCallbacks; for ( int32_t i = 0 ; i < numCallbacks ; i++ ) { int64_t maxTime = 0; int32_t maxNdx = 0; for ( int32_t j = 0 ; j < newNumCallbacks ; j++ ) { if(maxTime >= times[i]) { maxTime = times[i]; maxNdx = i; } } g_errno = 0; callbacks[maxNdx] ( states[maxNdx] ); //copy last one into called slot and decrement newNumCallbacks--; times [maxNdx] = times [newNumCallbacks]; states [maxNdx] = states [newNumCallbacks]; callbacks [maxNdx] = callbacks[newNumCallbacks]; } #endif g_errno = 0; // close more threads if we were limited by callbacks[] size if ( numCallbacks >= 64 ) goto top; //return true if we called something back; //returns wrong value if numCallbacks was exactly 64, not too serious... return numCallbacks != 0; } // used by UdpServer to see if it should call a low priority callback int32_t Threads::getNumActiveHighPriorityCpuThreads() { ThreadQueue *q ; int32_t hiActive = 0 ; q = &g_threads.m_threadQueues[INTERSECT_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[MERGE_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; return hiActive; } // used by UdpServer to see if it should call a low priority callback int32_t Threads::getNumActiveHighPriorityThreads() { ThreadQueue *q ; int32_t hiActive = 0 ; q = &g_threads.m_threadQueues[DISK_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[INTERSECT_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; q = &g_threads.m_threadQueues[MERGE_THREAD]; hiActive += q->m_hiLaunched - q->m_hiReturned; return hiActive; } // . returns false if no thread launched // . returns true if thread was launched // . sets g_errno on error // . don't launch a low priority thread if a high priority thread is running // . i.e. don't launch a high niceness thread if a low niceness is running bool ThreadQueue::launchThread2 ( ThreadEntry *te ) { // debug msg //log("trying to launch for type=%"INT32"",(int32_t)m_threadType); // clean up any threads that have exited //cleanUp (); // if no entries then nothing to launch if ( m_top <= 0 ) return false; // or if no stacks left, don't even try if ( s_head == -1 ) return false; // . how many threads are active now? // . NOTE: not perfectly thread safe here int64_t active = m_launched - m_returned ; // debug msg if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: launchThread: active=%"INT64" max=%"INT32".", active,m_maxLaunched); // return if the max is already launched if ( active >= m_maxLaunched ) return false; // do not launch a low priority merge, intersect or filter thread if we // have high priority cpu threads already going on. this way a // low priority spider thread will not launch if a high priority // cpu-based thread of any kind (right now just MERGE or INTERSECT) // is already running. int32_t hiActive2 = g_threads.getNumActiveHighPriorityCpuThreads() ; // return log("MAX. %"INT32" are launched. %"INT32" now in queue.", // active , m_entriesUsed ); // . sanity check // . a thread can NOT call this //if ( getpid() != s_pid ) { // fprintf(stderr,"thread: launchThread: bad engineer\n"); // ::exit(-1); //} //int64_t now = gettimeofdayInMilliseconds(); int64_t now = -1LL; // pick thread with lowest niceness first int32_t minNiceness = 0x7fffffff; int64_t maxWait = -1; int32_t mini = -1; bool minIsWrite = false; int32_t lowest = 0x7fffffff; int32_t highest = 0; // . now base our active thread counts on niceness AND read sizes // . this is only used for DISK_THREADs // . loActive* includes niceness >= 1 int32_t loActiveBig = m_loLaunchedBig - m_loReturnedBig; int32_t loActiveMed = m_loLaunchedMed - m_loReturnedMed; int32_t loActiveSma = m_loLaunchedSma - m_loReturnedSma; int32_t mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig; int32_t mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed; int32_t mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma; int32_t hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig; int32_t hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed; int32_t hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma; int32_t activeWrites = m_writesLaunched - m_writesReturned; // how many niceness=2 threads are currently running now? int64_t loActive = m_loLaunched - m_loReturned; int64_t mdActive = m_mdLaunched - m_mdReturned; //int64_t hiActive = m_hiLaunched - m_hiReturned; int32_t total = loActive + mdActive; int32_t max = g_conf.m_spiderMaxDiskThreads; if ( max <= 0 ) max = 1; // hi priority max // JAB: warning abatement //int64_t hiActive = m_hiLaunched - m_hiReturned; // i dunno what the point of this was... so i commented it out //int32_t max2 = g_conf.m_queryMaxDiskThreads ; //if ( max2 <= 0 ) max2 = 1; // only do this check if we're a addlists/instersect thread queue //if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false; // loop through candidates for ( int32_t i = 0 ; i < m_top ; i++ ) { // skip if not occupied if ( ! m_entries[i].m_isOccupied ) continue; int32_t niceness = m_entries[i].m_niceness; // get lowest niceness level of launched threads if ( m_entries[i].m_isLaunched ) { // if he's done, skip him if ( m_entries[i].m_isDone ) continue; // get the highest niceness for all that are launched if ( niceness > highest ) highest = niceness; // get the lowest niceness for all that are launched if ( niceness < lowest ) lowest = niceness; // continue now since it's already launched continue; } // . these filters really make it so the spider does not // impact query response time //if ( niceness >= 1 && hiActive > 0 ) continue; // don't consider any lows if one already running //if ( niceness >= 2 && loActive > 0 ) continue; // don't consider any lows if a hi already running //if ( niceness >= 2 && hiActive > 0 ) continue; // don't consider any mediums if one already running //if ( niceness == 1 && mdActive > 0 ) continue; // don't consider any mediums if a hi already running //if ( niceness == 1 && hiActive > 0 ) continue; //if ( m_threadType == DISK_THREAD ) { // if ( niceness >= 1 && hiActive > 0 ) continue; // if ( niceness >= 2 && loActive >= max ) continue; // if ( niceness == 1 && mdActive >= max ) continue; //} // treat niceness 1 as niceness 2 for ranking purposes // IFF we're not too backlogged with file merges. i.e. // IFF we're merging faster than we're dumping. // only merges and dumps have niceness 1 really. // spider disk reads are all niceness 2. // Now Rdb::addList just refuses to add data if we have too // many unmerged files on disk! // now we use niceness 1 for "real merges" so those reads take // priority over spider build reads. (8/14/12) //if(niceness == 1 /*&& g_numUrgentMerges <= 0*/) niceness = 2; // if he doesn't beat or tie us, skip him if ( niceness > minNiceness ) continue; // no more than "max" medium and low priority threads should // be active/launched at any one time if ( niceness >= 1 && total >= max ) continue; // int16_tcut ThreadEntry *t = &m_entries[i]; // what is this guy's read size? // the filestate provided could have been //FileState *fs ; int32_t readSize = 0 ; bool isWrite = false; if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) { //fs = (FileState *)m_entries[i].m_state; readSize = t->m_bytesToGo; isWrite = t->m_doWrite ; } if ( isWrite && activeWrites > g_conf.m_maxWriteThreads ) continue; if ( m_threadType == MERGE_THREAD || m_threadType == INTERSECT_THREAD || m_threadType == FILTER_THREAD ) if ( niceness > 0 && hiActive2 > 0 ) continue; // how many threads can be launched for this readSize/niceness? if ( niceness >= 1 && m_threadType == DISK_THREAD ) { if ( readSize > g_conf.m_medReadSize ) { if ( loActiveBig + mdActiveBig >= g_conf.m_spiderMaxBigDiskThreads ) continue; } else if ( readSize > g_conf.m_smaReadSize ) { if ( loActiveMed + mdActiveMed >= g_conf.m_spiderMaxMedDiskThreads ) continue; } else if ( loActiveSma + mdActiveSma >= g_conf.m_spiderMaxSmaDiskThreads ) continue; } else if ( niceness < 1 && m_threadType == DISK_THREAD ) { if ( readSize > g_conf.m_medReadSize ) { if ( hiActiveBig >= g_conf.m_queryMaxBigDiskThreads ) continue; } else if ( readSize > g_conf.m_smaReadSize ) { if ( hiActiveMed >= g_conf.m_queryMaxMedDiskThreads ) continue; } else if ( hiActiveSma >= g_conf.m_queryMaxSmaDiskThreads ) continue; } // be lazy with this since it uses a significant amount of cpu if ( now == -1LL ) now = gettimeofdayInMilliseconds(); // how int32_t has this entry been waiting in the queue to launch? int64_t waited = now - m_entries[i].m_queuedTime ; // adjust "waited" if it originally had a niceness of 1 if ( m_entries[i].m_niceness >= 1 ) { // save threads gain precedence if ( m_threadType == SAVETREE_THREAD ) waited += 49999999; else if ( m_threadType == UNLINK_THREAD ) waited += 39999999; else if ( m_threadType == MERGE_THREAD ) waited += 29999999; else if ( m_threadType == INTERSECT_THREAD ) waited += 29999999; else if ( m_threadType == FILTER_THREAD ) waited += 9999999; // if its a write thread... do it quick else if ( isWrite ) waited += 19999999; // . if it has waited more than 500 ms it needs // to launch now... // . these values seem to be VERY well tuned on // my production machines, but they may have to // be tuned for other machines? TODO. // . they should auto-tune so when merge is more // important it should starve the other spider reads else if ( waited >= 500 ) waited += 9999999; // it hurts for these guys to wait that int32_t else waited *= 4; } // is real merge? if ( m_entries[i].m_niceness == 1 ) waited += 19999999; // watch out for clock skew if ( waited < 0 ) waited = 0; // if tied, the lowest time wins if ( niceness == minNiceness && waited <= maxWait ) continue; // we got a new winner mini = i; minNiceness = niceness; maxWait = waited; minIsWrite = isWrite; } // if no candidate, bail honorably if ( mini == -1 ) return false; // . if we've got an urgent thread (niceness=1) going on, do not allow // niceness=2 threads to launch // . actually, don't launch *ANY* if doing an urgent merge even if // no niceness=1 thread currently launched // . CAUTION! when titledb is merging it dumps tfndb and if tfndb // goes urgent,make sure it dumps out with niceness 1, otherwise // this will freeze us!! since titledb won't be able to continue // merging until tfndb finishes dumping... // . Now Rdb::addList just refuses to add data if we have too // many unmerged files on disk! Let others still read from us, // just not add to us... //if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 ) // return false; // if we're urgent, don't launch a niceness 2 unless he's waited // at least 200ms in the queue //if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 ) // return false; // . if the thread to launch has niceness > lowest launched then bail // . i.e. don't launch a low-priority thread if we have highs running // . we no longer let a niceness of 1 prevent a niceness of 2 from // launching, this way we can launch merge threads at a niceness // of 1 w/o hurting the spidering too much, but still giving the // merge some preferential treatment over the disk so we don't // RdbDump data faster than we can finish the merge. if ( minNiceness > lowest && lowest < 1 ) return false; // . don't launch a low priority thread if there's a high launched // from any ThreadQueue // . hi priority is niceness <= 0, med/lo priority is niceness >= 1 //int64_t hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned; // now just limit it to this queue... so you can launch a low // merge thread even if there's a high disk read going on //if ( minNiceness >= 1 && hiActive > 0 ) return false; // if we're low priority and suspended is true then bail //if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false; // if we're going to launch a high priority thread then CANCEL // any low priority disk-read threads already in progress //if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads (); // . let's cancel ALL low priority threads if we're launching a high // . hi priority is niceness <= 0, low priority is niceness >= 1 //int64_t loActive = g_threads.m_loLaunched - g_threads.m_loReturned; int32_t realNiceness = m_entries[mini].m_niceness; //if ( realNiceness <= 0 && (loActive + mdActive) > 0 ) // // this actually cancels medium priority threads, too // g_threads.cancelLowPriorityThreads(); // point to winning entry ThreadEntry *t = &m_entries[mini]; // if descriptor was closed, just return error now, we // cannot try to re-open because the file might have been // unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid // FileState ptr because it does its own saving, so check for NULLs. FileState *fs = (FileState *)t->m_state; bool allocated = false; if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) { // allocate the read buffer here! if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) { int32_t need = t->m_bytesToGo + fs->m_allocOff; char *p = (char *) mmalloc ( need , "ThreadReadBuf" ); if ( p ) { fs->m_buf = p + fs->m_allocOff; fs->m_allocBuf = p; fs->m_allocSize = need; allocated = true; } else log("thread: read buf alloc failed for %"INT32" " "bytes.",need); // just let the BigFile::readWrite_r() handle the // error for the NULL read buf } // . otherwise, they are intact, so get the real fds // . we know the stored File is still around because of that bool doWrite = fs->m_doWrite; BigFile *bb = fs->m_this; fs->m_fd1 = bb->getfd (fs->m_filenum1, !doWrite, &fs->m_vfd1); fs->m_fd2 = bb->getfd (fs->m_filenum2, !doWrite, &fs->m_vfd2); // is this bad? if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s", fs->m_fd1,bb->m_baseFilename); if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.", fs->m_fd2,bb->m_baseFilename); fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 ); fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 ); } // count it as launched now, before we actually launch it m_launched++; // priority-based GLOBAL & LOCAL launch count if ( realNiceness <= 0 ) m_hiLaunched++; else if ( realNiceness == 1 ) m_mdLaunched++; else if ( realNiceness >= 2 ) m_loLaunched++; // deal with the tiers for disk threads based on read sizes if ( m_threadType == DISK_THREAD ) { // writes are special cases if ( minIsWrite ) m_writesLaunched++; //FileState *fs = (FileState *)m_entries[mini].m_state; int32_t rs = t->m_bytesToGo; // 0; //if ( fs ) rs = fs->m_bytesToGo; if ( realNiceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) m_loLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_loLaunchedMed++; else m_loLaunchedSma++; } else if ( realNiceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) m_mdLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_mdLaunchedMed++; else m_mdLaunchedSma++; } else { if ( rs > g_conf.m_medReadSize ) m_hiLaunchedBig++; else if ( rs > g_conf.m_smaReadSize ) m_hiLaunchedMed++; else m_hiLaunchedSma++; } } // debug msg //if ( m_threadType == 0 ) // log("creating thread, t=%"UINT32" state=%"UINT32" launched = %"INT32"", // t , (int32_t)t->m_state , m_launched ); // and set the flag t->m_isLaunched = true; // . launch it // . this sets the pthread_t ptr for identificatoin // . returns false on error //pthread_t tmp; loop: // debug msg if ( g_conf.m_logDebugThread ) { active = m_launched - m_returned ; int64_t now = gettimeofdayInMilliseconds(); log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] launched %s thread. " "active=%"INT64" " "niceness=%"INT32". waited %"UINT64" ms in queue.", (PTRTYPE)t, getThreadType(), active, realNiceness, now - t->m_queuedTime); } // be lazy with this since it uses a significant amount of cpu if ( now == -1LL ) now = gettimeofdayInMilliseconds(); //t->m_launchedTime = g_now; t->m_launchedTime = now; // loop2: // spawn the thread int32_t count = 0; pid_t pid; #ifndef PTHREADS //int status; //int ret; // random failure test //if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; } // malloc twice the size t->m_stackSize = STACK_SIZE; //t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" ); int32_t si = g_threads.getStack ( ); if ( si < 0 ) { log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer."); goto hadError; } t->m_si = si; t->m_stack = s_stackPtrs [ si ]; // UNprotect the whole stack so we can use it mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE , PROT_READ | PROT_WRITE ); // clear g_errno g_errno = 0; // . make another process // . do not use sig handlers, so if a child process gets any unhandled // signal (like SEGV) it will just exit pid = clone ( startUp , t->m_stack + t->m_stackSize , CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND | SIGCHLD , (void *)t ); // . we set the pid because we are the one that checks it! // . if we just let him do it, when we check in cleanup routine // we can get an uninitialized pid t->m_pid = pid; // might as well bitch if we should here if ( s_bad ) { log(LOG_LOGIC,"thread: PID received: %"INT32" > %"INT32". Bad.", s_badPid, (int32_t)MAX_PID); //char *xx = NULL; *xx = 0; } // wait for him //ret = waitpid ( -1*pid , &status , 0 ); //if ( ret != pid ) // log("waitpid(pid=%"INT32"): ret=%"INT32" err=%s", // (int32_t)pid,(int32_t)ret,mstrerror(errno)); // check if he's done //if ( ! t->m_isDone ) log("NOT DONE"); // set the pid //t->m_pid = pid; // error? if ( pid == (pid_t)-1 ) g_errno = errno; // // now use pthreads again... are they stable yet? // #else // assume it does not go through t->m_needsJoin = false; // pthread inherits our sigmask, so don't let it handle sigalrm // signals in Loop.cpp, it'll screw things up. that handler // is only meant to be called by the main process. if we end up // double calling it, this thread may think g_callback is non-null // then it gets set to NULL, then the thread cores! seen it... sigset_t sigs; sigemptyset ( &sigs ); sigaddset ( &sigs , SIGALRM ); sigaddset ( &sigs , SIGVTALRM ); if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 ) log("threads: failed to block sig"); // this returns 0 on success, or the errno otherwise g_errno = pthread_create ( &t->m_joinTid , &s_attr, startUp2 , t) ; if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 ) log("threads: failed to unblock sig"); #endif // we're back from pthread_create if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: Back from clone " "t=0x%"PTRFMT" pid=%"INT32".", (PTRTYPE)t,(int32_t)pid); // return true on successful creation of the thread if ( g_errno == 0 ) { // good stuff, the thread needs a join now t->m_needsJoin = true; if ( count > 0 ) log("thread: Call to clone looped %"INT32" times.",count); return true; } //#undef usleep // forever loop if ( g_errno == EAGAIN ) { if ( count++ == 0 ) log("thread: Call to clone had error: %s.", mstrerror(g_errno)); //usleep(1); //goto loop2; goto hadError; } // debug msg // log("created tid=%"INT32"",(int32_t)t->m_tid); // return true; //} // do again if g_errno is AGAIN if ( g_errno == EINTR ) { log("thread: Call to clone was interrupted. Trying again."); goto loop; } //#ifndef _PTHREADS_ hadError: //#endif if ( g_errno ) log("thread: pthread_create had error = %s", mstrerror(g_errno)); // it didn't launch, did it? dec the count. m_launched--; // priority-based LOCAL & GLOBAL launch counts if ( realNiceness <= 0 ) m_hiLaunched--; else if ( realNiceness == 1 ) m_mdLaunched--; else if ( realNiceness >= 2 ) m_loLaunched--; // . deal with the tiers for disk threads based on read sizes // . WARNING: we cannot easily change tiers dynamically // because it will throw these counts off if ( m_threadType == DISK_THREAD ) { // writes are special cases if ( minIsWrite ) m_writesLaunched--; //FileState *fs = (FileState *)m_entries[mini].m_state; int32_t rs = t->m_bytesToGo; // 0; //if ( fs ) rs = fs->m_bytesToGo; if ( realNiceness >= 2 ) { if ( rs > g_conf.m_medReadSize ) m_loLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_loLaunchedMed--; else m_loLaunchedSma--; } else if ( realNiceness >= 1 ) { if ( rs > g_conf.m_medReadSize ) m_mdLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_mdLaunchedMed--; else m_mdLaunchedSma--; } else { if ( rs > g_conf.m_medReadSize ) m_hiLaunchedBig--; else if ( rs > g_conf.m_smaReadSize ) m_hiLaunchedMed--; else m_hiLaunchedSma--; } } // unset the flag t->m_isLaunched = false; // bail on other errors log("thread: Call to clone had error: %s.", mstrerror(g_errno)); // correction on this error log("thread: Try not using so much memory. " "memused now =%"INT64".",g_mem.getUsedMem()); // free allocated buffer if ( allocated ) { mfree ( fs->m_allocBuf , fs->m_allocSize , "ThreadReadBuf" ); fs->m_buf = NULL; } // i'm not sure return value matters at this point? the thread // is queued and hopefully will launch at some point return false; // if this is the direct thread request do not call callback, just // return false, otherwise we get into an unexpected loop thingy if ( t == te ) return log("thread: Returning false."); // do it blocking log("thread: Calling without thread. This will crash many times. " "Please fix it."); // return false so caller will re-do without thread! // so BigFile::readwrite() will retry without thread and we won't // get into a wierd loop thingy if ( te ) return false; // uint64_t profilerStart,profilerEnd; // uint64_t statStart,statEnd; //if (g_conf.m_profilingEnabled){ // address=(int32_t)t->m_startRoutine; // g_profiler.startTimer(address, __PRETTY_FUNCTION__); //} t->m_startRoutine ( t->m_state , t ); //if (g_conf.m_profilingEnabled) { // if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__)) // log(LOG_WARN,"admin: Couldn't add the fn %"INT32"", // (int32_t)address); //} t->m_exitTime = gettimeofdayInMilliseconds(); // flag it for cleanup t->m_isDone = true; t->m_isLaunched = true; // clean it up cleanUp ( t , 200/*maxNiceness thread can have to be cleaned up*/ ); // ignore error g_errno = 0; // we kinda launched one, so say true here return true; // false; } #ifndef PTHREADS static bool s_firstTime = true; #endif // threads start up with cacnellation deferred until pthreads_testcancel() // is called, but we never call that int startUp ( void *state ) { // get thread entry ThreadEntry *t = (ThreadEntry *)state; // no! now parent does since he is the one that needs to check it // in the cleanup routine // remember the pid //t->m_pid = getpid(); // . sanity check // . a thread can NOT call this #ifndef PTHREADS if ( getpid() == s_pid ) log("thread: Thread has same pid %i as main process.",s_pid); #endif // the cleanup handler //pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state ); // our signal set sigset_t set; sigemptyset(&set); //sigaddset(&set, SIGHUP); // we need this here so if we break the gb process with gdb it // does not kill the child processes when it sends out the SIGINT. sigaddset(&set, SIGINT); // ignore the real time signal, man... //sigaddset(&set, GB_SIGRTMIN); //pthread_sigmask(SIG_BLOCK, &set, NULL); #ifndef PTHREADS sigprocmask(SIG_BLOCK, &set, NULL); #else pthread_sigmask(SIG_BLOCK,&set,NULL); #endif // . what this lwp's priority be? // . can range from -20 to +20 // . the lower p, the more cpu time it gets // . this is really the niceness, not the priority int p ; // currently our niceness ranges from -1 to 2 for us if ( t->m_niceness == 2 ) p = 19 ; else if ( t->m_niceness == 1 ) p = 10 ; else p = 0 ; // remember the tid //t->m_tid = pthread_self(); // debug if ( g_conf.m_logDebugThread ) log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "in startup pid=%"INT32" pppid=%"INT32"", (PTRTYPE)t,(int32_t)getpidtid(),(int32_t)getppid()); // debug msg //fprintf(stderr,"new thread tid=%"INT32" pid=%"INT32"\n", // (int32_t)t->m_tid,(int32_t)t->m_pid); // . set this process's priority // . setpriority() is only used for SCHED_OTHER threads //if ( pthread_setschedprio ( getpidtid() , p ) ) { #ifndef PTHREADS if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) { // do we even support logging from a thread? if ( s_firstTime ) { log("thread: Call to " "setpriority(%"UINT32",%i) in thread " "failed: %s. This " "message will not be repeated.", (uint32_t)getpidtid(),p,mstrerror(errno)); s_firstTime = false; } errno = 0; } #endif /* sched_param sp; int pp; int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ; if ( err ) log("thread: startUp: pthread_getschedparam: %s", mstrerror(err)); // adjust the priority p = 1; sp.sched_priority = p; // and reassign err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ; log("thread: startUp: pthread_setschedparam(%"INT32"): %s", p,mstrerror(err)); */ // somehow, it works ok when we have this print statement delay!!! //fprintf(stderr,"thread pid = %"INT32"\n",(int32_t)getpid()); // . call the startRoutine // . IMPORTANT: this can NEVER do non-blocking stuff t->m_startRoutine ( t->m_state , t ); // pop it off //pthread_cleanup_pop ( 1 /*execute handler?*/ ); // . now throw it on g_loop's sigqueue // . the first 4 bytes of t->m_state should be t->m_callback // . no! just use 1 to tell Loop to call g_threads.cleanUp() // . TODO: pass in a ptr to cleanUpWrapper() instead of "t" // . sival_int is only 4 bytes on a 64 bit arch... sigval_t svt; svt.sival_int = 1;//(int64_t)t ; //(int)(t->m_state); // fd; // set exit time int64_t now = gettimeofdayInMilliseconds(); t->m_preExitTime = now; t->m_exitTime = now; if ( g_conf.m_logDebugThread ) { log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] " "done with startup pid=%"INT32"", (PTRTYPE)t,(int32_t)getpidtid()); } // . now mark thread as ready for removal // . do this BEFORE queing the signal since we're still a thread!!! // . cleanUp() will take care of the rest // . cleanUp() will call pthread_join on us! t->m_isDone = true; // let Loop.cpp's sigHandler_r call g_thread.cleanUp() g_threads.m_needsCleanup = true; //if(t->m_niceness > 0) g_threads.m_needBottom = true; // . send the signal // . if queue is full g_loop will get a SIGIO and call // g_threads.cleanUp()/launchThreads() in it's doPoll() routine // . we reserve GB_SIGRTMIN itself for unblocked interrupts for // UdpServer // . damn, it seems that if the queue is full pthread_join is unable // to detach threads.... so sleep until it clears up // . HEY! process is supposed to send us an ECHLD signal? right? //sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ; // . it does not send us a signal automatically, so we must do it! // . i noticed during the linkdb rebuild we were not getting the signal //sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ; // i verified this breaks select() in Loop.cpp out of it's sleep //fprintf(stderr,"threads sending SIGCHLD\n"); // try a sigchld now! doesn't it already do this? no... sigqueue ( s_pid, SIGCHLD, svt ) ; return 0; } // pthread_create uses this one void *startUp2 ( void *state ) { startUp ( state ); return NULL; } // watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we // could be in a signal handler here void ThreadQueue::suspendLowPriorityThreads() { // disable for now //return; // just return if already suspended if ( m_isLowPrioritySuspended ) return; // log it //log("ThreadQueue:: suspending low priority threads!!!!!!"); // set the flag so low priority threads won't be launched m_isLowPrioritySuspended = true; // . cancel any outstanding low priority threads that are running // . no, only cancel if we run another disk thread // . this will happen above in ThreadQueue::launchThread() //cancelLowPriorityThreads(); } // this is called by UdpServer::destroySlot() and should NOT be in a sig handlr void ThreadQueue::resumeLowPriorityThreads() { // disable for now //return; // bail if no need if ( ! m_isLowPrioritySuspended ) return; // turn em back on m_isLowPrioritySuspended = false; // just in case, though //if ( g_inSigHandler ) { // log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig " // "handler."); // return; //} // try to start up some threads then g_threads.launchThreads(); } void ThreadQueue::print ( ) { // loop through candidates for ( int32_t i = 0 ; i < m_top ; i++ ) { ThreadEntry *t = &m_entries[i]; // print it log(LOG_INIT,"thread: address=%"PTRFMT" " "pid=%u state=%"PTRFMT" " "occ=%i done=%i lnch=%i", (PTRTYPE)t , t->m_pid , (PTRTYPE)t->m_state , t->m_isOccupied , t->m_isDone , t->m_isLaunched ); } } const char *ThreadQueue::getThreadType ( ) { const char *s = "unknown"; if ( m_threadType == DISK_THREAD ) s = "disk"; if ( m_threadType == MERGE_THREAD ) s = "merge"; if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists"; if ( m_threadType == FILTER_THREAD ) s = "filter"; if ( m_threadType == SAVETREE_THREAD ) s = "savetree"; if ( m_threadType == UNLINK_THREAD ) s = "unlink"; if ( m_threadType == GENERIC_THREAD ) s = "generic"; return s; } #include "BigFile.h" // FileState class int32_t Threads::getDiskThreadLoad ( int32_t maxNiceness , int32_t *totalToRead ) { ThreadQueue *q = &m_threadQueues[DISK_THREAD]; ThreadEntry *e = q->m_entries; int32_t top = q->m_top; *totalToRead = 0; int32_t n = 0; // we really can't suspend threads cuz they might have the // mutex lock so we just cancel the disk threads here then for ( int32_t i = 0 ; i < top ; i++ ) { // get entry ThreadEntry *t = &e[i]; // skip if not occupied if ( ! t->m_isOccupied ) continue; // skip if it's nicer than what we want if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue; // skip if already done if ( t->m_isDone ) continue; // cast state data FileState *fs = (FileState *) t->m_state; // sometimes NULL, like from Sync.cpp's call if ( ! fs ) continue; // only remove read operations, since write operations get // the fd up front if ( t->m_doWrite ) continue; // how many byte to do //int32_t todo = fs->m_bytesToGo - fs->m_bytesDone; int32_t todo = t->m_bytesToGo; // multiply by 2 if a write if ( t->m_doWrite ) todo *= 2; // add to total bytes to read *totalToRead += todo; // count the thread n++; } return n; } // when a BigFile is removed, much like we remove its pages from DiskPageCache // we also remove any unlaunched reads/writes on it from the thread queue. void ThreadQueue::removeThreads ( BigFile *bf ) { // did the BigFile get hosed? that means our BigFile was // unlinked or closed before we got a chance to launch the // thread. int32_t maxi = -1; for ( int32_t i = 0 ; i < m_top ; i++ ) { ThreadEntry *t = &m_entries[i]; // skip if not occupied if ( ! t->m_isOccupied ) continue; // get the filestate FileState *fs = (FileState *)t->m_state; // skip if NULL if ( ! fs ) continue; // skip if not match if ( fs->m_this != (void *)bf ) continue; // . let it finish writing if it is a write thread // . otherwise, if we are exiting, we could free the // buffer being written and cause the thread to core... if ( fs->m_doWrite ) { log(LOG_INFO,"disk: Not removing write thread."); continue; } // . should we really? if we renamed the file to another, // we need to recompute the offsets to read, etc.. so we // should fail up to Msg5 with EFILECLOSED or something... // . i think we did a rename and it got the same fd, and since // we did not remove the launched or done threads after the // rename, we're not sure if they read from the newly renamed // file or not, and our read offset was for the old file... // . at least set the error flag for doneWrapper() fs->m_errno2 = EFILECLOSED; // log it logf(LOG_INFO,"disk: Removing/flagging operation in thread " "queue. fs=0x%"PTRFMT"", (PTRTYPE)fs); // skip if already done if ( t->m_isDone ) continue; // skip if launched if ( t->m_isLaunched ) continue; // note in the log it is launched log(LOG_INFO,"disk: Thread is launched."); // tell donewrapper what happened fs->m_errno = EFILECLOSED; g_errno = EFILECLOSED; // note it //log(LOG_INFO,"disk: Removing operation from thread queue."); // remove it from the thread queue t->m_isDone = true; t->m_isLaunched = false; t->m_isOccupied = false; // keep track maxi = i; makeCallback ( t ); } // do we have to decrement top if ( m_top == maxi + 1 ) while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--; g_errno = 0; } void Threads::printState() { int64_t now = gettimeofdayInMilliseconds(); for ( int32_t i = 0 ; i < m_numQueues; i++ ) { ThreadQueue *q = &m_threadQueues[i]; int32_t loActive = q->m_loLaunched - q->m_loReturned; int32_t mdActive = q->m_mdLaunched - q->m_mdReturned; int32_t hiActive = q->m_hiLaunched - q->m_hiReturned; int32_t total = loActive + mdActive + hiActive; if( total == 0) continue; log(LOG_TIMING, "admin: Thread counts: type:%s " "%"INT32":low %"INT32":med %"INT32":high %"INT32":total", q->getThreadType(), loActive, mdActive, hiActive, total); for ( int32_t j = 0 ; j < q->m_top ; j++ ) { ThreadEntry *t = &q->m_entries[j]; if(!t->m_isOccupied) continue; if(t->m_isDone) { log(LOG_TIMING, "admin: Thread -done- " "nice: %"INT32" " "totalTime: %"INT64" (ms) " "queuedTime: %"INT64"(ms) " "runTime: %"INT64"(ms) " "cleanup: %"INT64"(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, t->m_launchedTime - t->m_queuedTime, t->m_exitTime - t->m_launchedTime, now - t->m_exitTime, g_profiler. getFnName((PTRTYPE)t->m_callback)); continue; } if(t->m_isLaunched) { log(LOG_TIMING, "admin: Thread -launched- " "nice: %"INT32" " "totalTime: %"INT64"(ms) " "queuedTime: %"INT64"(ms) " "runTime: %"INT64"(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, t->m_launchedTime - t->m_queuedTime, now - t->m_launchedTime, g_profiler. getFnName((PTRTYPE)t->m_callback)); continue; } log(LOG_TIMING, "admin: Thread -queued- " "nice: %"INT32" " "queueTime: %"INT64"(ms) " "callback:%s", t->m_niceness, now - t->m_queuedTime, g_profiler.getFnName((PTRTYPE)t->m_callback)); } } }