open-source-search-engine/Threads.cpp
Matt Wells e1dbaf81e9 count threads whose callback has not been called
as 'outstanding' for purposes of shutting down so
we can update the rdbmap after having written the rdblist
to disk in rdbdump.cpp.
2015-05-03 10:58:23 -07:00

2715 lines
87 KiB
C++

#include "gb-include.h"
#include "BigFile.h"
#include "Threads.h"
#include "Errno.h"
#include "Loop.h"
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/types.h> // getuid()/pid_t/getpid()
#include <sys/wait.h> // waitpid()
#include "Rdb.h" // g_mergeUrgent
#include <sched.h> // clone()
//#include "Msg16.h" // g_pid g_ticker
#include "XmlDoc.h" // g_pid g_ticker
#include "Profiler.h"
#include "Stats.h"
#include "Process.h"
// try using pthreads again
//#define PTHREADS
// use these stubs so libplotter.a works
#ifndef PTHREADS
int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; }
int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; }
#else
#include <pthread.h>
//pthread_attr_t s_attr;
#endif
// main process id (or thread id if using pthreads)
//static pid_t s_pid = (pid_t) -1;
// on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit:
static pthread_t s_pid = (pthread_t) -1;
//pid_t getpidtid() {
// on 64-bit architectures pthread_t is 64 bit and pid_t is 32 bit:
pthread_t getpidtid() {
#ifdef PTHREADS
// gettid() is a bit newer so not in our libc32...
// so kinda fake it. return the "thread" id, not process id.
// Threads::amThread() should still work based on thread ids because
// the main process has a unique thread id as well.
return pthread_self();
#else
return (pthread_t)getpid();
#endif
}
// BIG PROBLEM:
// When doing pthread_join it doesn't always ensure a thread doesn't go
// zombie. It seems like when SIGIOs are generated by sigqueue() because
// of a full signal queue, threads start going zombie on me.
// PROBLEM #1: with using the "state" to identify a thread in the queue.
// Often a caller make s a disk access using a certain "state" var.
// He gets control back when cleanUp() calls t->m_callback. And he launches
// another read thread in there, which calls Threads::exit(state) before
// the original thread entry's m_isOccupied is set to false. So
// Threads::exit(state) mistakenly picks the thread entry from the former
// thread picks it has the same state as its thread!
// PROBLEM #2: technically, a thread in Threads::exit() can set its m_done
// bit then send the signal. When the Loop sig handler sees the done bit is
// set it decrements the global thread count even though the thread may
// not have actually exited yet!! I spawned like 1,000 threads this way!!!!!
// a global class extern'd in .h file
Threads g_threads;
// . call this before calling a ThreadEntry's m_startRoutine
// . allows us to set the priority of the thread
int startUp ( void *state ) ;
void *startUp2 ( void *state ) ;
// JAB: warning abatement
//static void launchThreadsWrapper ( int fd , void *state );
static void killStalledFiltersWrapper ( int fd , void *state );
static void makeCallback ( ThreadEntry *t ) ;
// is the caller a thread?
bool Threads::amThread () {
if ( s_pid == (pthread_t)-1 ) return false;
#ifdef PTHREADS
// gettid() is a bit newer so not in our libc32...
// so kinda fake it. return the "thread" id, not process id.
// Threads::amThread() should still work based on thread ids because
// the main process has a unique thread id as well.
return (pthread_self() != s_pid);
#else
return ( getpidtid() != s_pid );
#endif
}
#ifndef PTHREADS
static int32_t s_bad = 0;
static int32_t s_badPid = -1;
#endif
#define MAX_PID 32767
#ifndef PTHREADS
static int s_errno ;
static int s_errnos [ MAX_PID + 1 ];
// this was improvised from linuxthreads/errno.c
//#define CURRENT_STACK_FRAME ({ char __csf; &__csf; })
// WARNING: you MUST compile with -DREENTRANT for this to work
int *__errno_location (void) {
int32_t pid = (int32_t) getpid();
//if ( pid == s_pid ) return &g_errno;
if ( pid <= (int32_t)MAX_PID ) return &s_errnos[pid];
s_bad++;
s_badPid = pid;
return &s_errno;
}
#endif
// this also limit the maximum number of outstanding (live) threads
#define MAX_STACKS 20
// stack must be page aligned for mprotect
#define THRPAGESIZE 8192
// how much of stack to use as guard space
#define GUARDSIZE (32*1024)
// . crashed in saving with 800k, so try 1M
// . must be multiple of THRPAGESIZE
//#define STACK_SIZE ((512+256) * 1024)
// pthread_create() cores in calloc() if we don't make STACK_SIZE bigger:
#define STACK_SIZE ((512+256+1024) * 1024)
// jeta was having some problems, but i don't think they were related to
// this stack size of 512k, but i will try boosting to 800k anyway.
//#define STACK_SIZE (512 * 1024)
// 256k was not enough, try 512k
//#define STACK_SIZE (256 * 1024)
// at 128k (and even 200k) some threads do not return! why?? there's
// obviously some stack overwriting going on!
//#define STACK_SIZE (128 * 1024)
static char *s_stackAlloc = NULL;
static int32_t s_stackAllocSize;
//#ifndef PTHREADS
static char *s_stack = NULL;
static int32_t s_stackSize;
static char *s_stackPtrs [ MAX_STACKS ];
//#endif
static int32_t s_next [ MAX_STACKS ];
static int32_t s_head ;
// returns NULL if none left
int32_t Threads::getStack ( ) {
if ( s_head == -1 ) return -1;
int32_t i = s_head;
s_head = s_next [ s_head ];
return i;
}
void Threads::returnStack ( int32_t si ) {
if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; }
s_next[si] = s_head;
s_head = si;
}
void Threads::reset ( ) {
if ( s_stackAlloc ) {
mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE);
mfree ( s_stackAlloc , s_stackAllocSize,
"ThreadStack");
}
s_stackAlloc = NULL;
for ( int32_t i = 0 ; i < MAX_THREAD_QUEUES ; i++ )
m_threadQueues[i].reset();
}
Threads::Threads ( ) {
m_numQueues = 0;
m_initialized = false;
}
void Threads::setPid ( ) {
// set s_pid to the main process id
#ifdef PTHREADS
// on 64bit arch pid is 32 bit and pthread_t is 64 bit
s_pid = (pthread_t)pthread_self();
//log(LOG_INFO,
// "threads: main process THREAD id = %"UINT32"",(int32_t unsigned)s_pid);
pthread_t tid = pthread_self();
sched_param param;
int policy;
// scheduling parameters of target thread
pthread_getschedparam ( tid, &policy, &param);
//log(LOG_INFO,
// "threads: min/max thread priority settings = %"INT32"/%"INT32" (policy=%"INT32")",
// (int32_t)sched_get_priority_min(policy),
// (int32_t)sched_get_priority_max(policy),
// (int32_t)policy);
#else
s_pid = getpid();
#endif
}
bool Threads::init ( ) {
if ( m_initialized ) return true;
m_initialized = true;
m_needsCleanup = false;
//m_needBottom = false;
// sanity check
//if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; }
if ( sizeof(pid_t) > sizeof(pthread_t) ) { char *xx=NULL;*xx=0; }
setPid();
#ifdef _STACK_GROWS_UP
return log("thread: Stack growing up not supported.");
#endif
//g_conf.m_logDebugThread = true;
// . damn, this only applies to fork() calls, i guess
// . a quick a dirty way to restrict # of threads so we don't explode
/*
struct rlimit lim;
lim.rlim_max = 100;
if ( setrlimit(RLIMIT_NPROC,&lim) )
log("thread::init: setrlimit: %s", mstrerror(errno) );
else
log("thread::init: set max number of processes to 100");
*/
// allow threads until disabled
m_disabled = false;
// # of low priority threads launched and returned
//m_hiLaunched = 0;
//m_hiReturned = 0;
//m_loLaunched = 0;
//m_loReturned = 0;
//int32_t m_queryMaxBigDiskThreads ; // > 1M read
//int32_t m_queryMaxMedDiskThreads ; // 100k - 1M read
//int32_t m_queryMaxSmaDiskThreads ; // < 100k per read
// categorize the disk read sizes by these here
g_conf.m_bigReadSize = 0x7fffffff;
g_conf.m_medReadSize = 1000000;
g_conf.m_smaReadSize = 100000;
// . register a sleep wrapper to launch threads every 30ms
// . somtimes a bunch of threads mark themselves as done and the
// cleanUp() handler sees them as all still launched so it doesn't
// launch any new ones
//if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper))
// return log("thread: Failed to initialize timer callback.");
if ( ! g_loop.registerSleepCallback(1000,NULL,
killStalledFiltersWrapper,0))
return log("thread: Failed to initialize timer callback2.");
// debug
//log("thread: main process has pid=%"INT32"",(int32_t)s_pid);
// . set priority of the main process to 0
// . setpriority() only applies to SCHED_OTHER threads
// . priority of threads with niceness 0 will be 0
// . priority of threads with niceness 1 will be 10
// . priority of threads with niceness 2 will be 20
// . see 'man sched_setscheduler' for detail scheduling info
// . no need to call getpid(), 0 for pid means the current process
#ifndef PTHREADS
if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 )
log("thread: Call to setpriority failed: %s.",
mstrerror(errno));
#endif
// make multiplier higher for raids, can do more seeks
//int32_t m = 1;
//#ifdef _LARS_
//m = 3;
//#endif
// register the types of threads here instead of in main.cpp
//if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/))
// try running blaster with 5 threads and you'll
// . see like a double degrade in performance for some reason!!
// . TODO: why?
// . well, this should be controlled g_conf.m_maxSpiderDiskThreads
// for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for
// niceness 0 and below disk threads
// . 100 maxThreads out at a time, 32000 can be queued
if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000))
return log("thread: Failed to register thread type." );
// . these are used by Msg5 to merge what it reads from disk
// . i raised it from 1 to 2 and got better response time from Msg10
// . i leave one open in case one is used for doing a big merge
// with high niceness cuz it would hold up high priority ones!
// . TODO: is there a better way? cancel it when UdpServer calls
// Threads::suspendLowPriorityThreads() ?
// . this used to be 2 but now defaults to 10 in Parms.cpp. i found
// i have less int32_t gray lines in the performance graph when i
// did that on trinity.
int32_t max2 = g_conf.m_maxCpuMergeThreads;
if ( max2 < 1 ) max2 = 1;
if ( ! g_threads.registerType ( MERGE_THREAD , max2,1000) )
return log("thread: Failed to register thread type." );
// will raising this from 1 to 2 make it faster too?
// i raised since global specs new servers have 2 (hyperthreaded?) cpus
int32_t max = g_conf.m_maxCpuThreads;
if ( max < 1 ) max = 1;
if ( ! g_threads.registerType ( INTERSECT_THREAD,max,200) )
return log("thread: Failed to register thread type." );
// filter thread spawned to call popen() to filter an http reply
if ( ! g_threads.registerType ( FILTER_THREAD , 1/*maxThreads*/,300) )
return log("thread: Failed to register thread type." );
// RdbTree uses this to save itself
if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) )
return log("thread: Failed to register thread type." );
// . File.cpp spawns a rename thread for doing renames and unlinks
// . doing a tight merge on titldb can be ~250 unlinks
// . MDW up from 1 to 30 max, after doing a ddump on 3000+ collections
// it was taking forever to go one at a time through the unlink
// thread queue. seemed like a 1 second space between unlinks.
// 1/23/1014
if ( ! g_threads.registerType ( UNLINK_THREAD,30/*maxThreads*/,3000) )
return log("thread: Failed to register thread type." );
// generic multipurpose
if ( ! g_threads.registerType (GENERIC_THREAD,100/*maxThreads*/,100) )
return log("thread: Failed to register thread type." );
// for call SSL_accept() which blocks for 10ms even when socket
// is non-blocking...
//if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100))
// return log("thread: Failed to register thread type." );
//#ifndef PTHREADS
// sanity check
if ( GUARDSIZE >= STACK_SIZE )
return log("thread: Stack guard size too big.");
// not more than this outstanding
int32_t maxThreads = 0;
for ( int32_t i = 0 ; i < m_numQueues ; i++ )
maxThreads += m_threadQueues[i].m_maxLaunched;
// limit to stack we got
if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS;
// allocate the stack space
s_stackAllocSize = STACK_SIZE * maxThreads + THRPAGESIZE ;
// clear stack to help check for overwrites
s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" );
if ( ! s_stackAlloc )
return log("thread: Unable to allocate %"INT32" bytes for thread "
"stacks.", s_stackAllocSize);
log(LOG_INIT,"thread: Using %"INT32" bytes for %"INT32" thread stacks.",
s_stackAllocSize,maxThreads);
// align
s_stack = (char *)(((uint64_t)s_stackAlloc+THRPAGESIZE-1)&~(THRPAGESIZE-1));
// new size
s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc);
// protect the whole stack while not in use
if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) )
log("thread: Call to mprotect failed: %s.",mstrerror(errno));
// test
//s_stack[0] = 1;
// init the linked list
for ( int32_t i = 0 ; i < MAX_STACKS ; i++ ) {
if ( i == MAX_STACKS - 1 ) s_next[i] = -1;
else s_next[i] = i + 1;
s_stackPtrs[i] = s_stack + STACK_SIZE * i;
}
s_head = 0;
// don't do real time stuff for now
return true;
//#else
// . keep stack size small, 128k
// . if we get problems, we'll increase this to 256k
// . seems like it grows dynamically from 4K to up to 2M as needed
//if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) )
// return log("thread: init: pthread_attr_setstacksize: %s",
// mstrerror(errno));
//pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED );
//pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM );
// return true;
//#endif
}
// all types should be registered in main.cpp before any threads launch
bool Threads::registerType ( char type , int32_t maxThreads , int32_t maxEntries ) {
// return false and set g_errno if no more room
if ( m_numQueues >= MAX_THREAD_QUEUES ) {
g_errno = EBUFTOOSMALL;
return log(LOG_LOGIC,"thread: registerType: Too many thread "
"queues");
}
// initialize the ThreadQueue class for this type
if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries))
return false;
// we got one more queue now
m_numQueues++;
return true;
}
int32_t Threads::getNumThreadsOutOrQueued() {
int32_t n = 0;
for ( int32_t i = 0 ; i < m_numQueues ; i++ ) {
// skip INTERSECT_THREAD, used to intersect termlists to
// resolve a query. i've seen these get stuck in an infinite
// loop sometimes.
if ( i == INTERSECT_THREAD ) continue;
// skip filter threads, those get stuck sometimes
if ( i == FILTER_THREAD ) continue;
// tally up all other threads
n += m_threadQueues[i].getNumThreadsOutOrQueued();
}
return n;
}
// . returns false (and may set errno) if failed to launch a thread
// . returns true if thread added to queue successfully
// . may be launched instantly or later depending on # of threads in the queue
bool Threads::call ( char type ,
int32_t niceness ,
void *state ,
void (* callback )(void *state,ThreadEntry *t) ,
void *(* startRoutine)(void *state,ThreadEntry *t) ) {
// debug
//return false;
#ifdef _VALGRIND_
return false;
#endif
g_errno = 0;
// don't spawn any if disabled
if ( m_disabled ) return false;
if ( ! g_conf.m_useThreads ) return false;
if ( type == DISK_THREAD && ! g_conf.m_useThreadsForDisk )
return false;
if ( type == MERGE_THREAD && ! g_conf.m_useThreadsForIndexOps )
return false;
if ( type == INTERSECT_THREAD && ! g_conf.m_useThreadsForIndexOps )
return false;
if ( type == FILTER_THREAD && ! g_conf.m_useThreadsForSystemCalls )
return false;
if ( ! m_initialized && ! init() )
return log("db: Threads init failed." );
// . sanity check
// . a thread can NOT call this
//if ( getpid() != s_pid ) {
// fprintf(stderr,"thread: call: bad engineer\n");
// ::exit(-1);
//}
// don't launch for now
//return false;
// . sanity check
// . ensure top 4 bytes of state is the callback
//if ( *(int32_t *)state != (int32_t)callback ) {
// g_errno = EBADENGINEER;
// sleep(50000);
// return log("thread: call: top 4 bytes of state != callback");
//}
// debug msg
//log("adding thread to queue, type=%"INT32"",(int32_t)type);
// find the type
int32_t i;
for ( i = 0 ; i < m_numQueues ; i++ )
if ( m_threadQueues[i].m_threadType == type ) break;
// bitch if type not added via registerType() call
if ( i == m_numQueues ) {
g_errno = EBADENGINEER;
return log(LOG_LOGIC,"thread: addtoqueue: Unregistered "
"thread type %"INT32"",(int32_t)type);
}
// debug msg
//log("thread: call: adding entry for thread");
// . add to this queue
// . returns NULL and sets g_errno on error
ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state,
callback,startRoutine);
if ( ! t ) return log("thread: Failed to add entry to thread pool: "
"%s.",mstrerror(g_errno));
// debug msg
//log("added");
// clear g_errno
//g_errno = 0;
// . try to launch as many threads as we can
// . this sets g_errno on error
// . if it has an error, just ignore it, our thread is queued
m_threadQueues[i].launchThread2 ( NULL );
//if ( ! m_threadQueues[i].launchThread2 ( t ) && g_errno ) {
// log("thread: failed thread launch: %s",mstrerror(g_errno));
// return false;
//}
// return false if there was an error launching the thread
//if ( g_errno ) return false;
// clear g_errno
g_errno = 0;
// success
return true;
}
// static void launchThreadsWrapper ( int fd , void *state ) {
// // debug msg
// //if ( g_conf.m_timingDebugEnabled )
// // log("thread: launchThreadsWrapper: entered");
// // clean up
// g_threads.cleanUp(NULL,1000);
// // and launch
// g_threads.launchThreads();
// }
static void killStalledFiltersWrapper ( int fd , void *state ) {
// bail if no pid
if ( g_pid == -1 ) return;
// . only kill after ticker reaches a count of 30
// . we are called once every second, so inc it each time
int32_t timeout = g_filterTimeout;
if ( timeout <= 0 ) timeout = 30;
if ( g_ticker++ < timeout ) return;
// debug
log("threads: killing stalled filter process of age %"INT32" "
"seconds and pid=%"INT32".",g_ticker,(int32_t)g_pid);
// kill him
int err = kill ( g_pid , 9 );
// don't kill again
g_pid = -1;
if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) );
}
// . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it
// is from when a thread exited
// . we put that signal there using sigqeueue() in Threads::exit()
// . this way another thread can be launched right away
int32_t Threads::launchThreads ( ) {
// try launching from each queue
int32_t numLaunched = 0;
for ( int32_t i = m_numQueues - 1 ; i >= 0 ; i-- ) {
// clear g_errno
g_errno = 0;
// launch as many threads as we can from queue #i
while ( m_threadQueues[i].launchThread2(NULL) ) numLaunched++;
// continue if no g_errno set
if ( ! g_errno ) continue;
// otherwise bitch about it
log("thread: Failed to launch thread: %s.",mstrerror(g_errno));
}
// clear g_errno
g_errno = 0;
return numLaunched;
}
// . will cancel currently running low priority threads
// . will prevent any low priority threads from launching
// . will only cancel disk threads for now
void Threads::suspendLowPriorityThreads() {
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS.");
// just cancel disk threads for now
for ( int32_t i = 0 ; i < m_numQueues; i++ )
m_threadQueues[i].suspendLowPriorityThreads();
}
void Threads::resumeLowPriorityThreads() {
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS.");
for ( int32_t i = 0 ; i < m_numQueues; i++ )
m_threadQueues[i].resumeLowPriorityThreads();
}
//void Threads::cancelLowPriorityThreads () {
// for ( int32_t i = 0 ; i < m_numQueues; i++ )
// m_threadQueues[i].cancelLowPriorityThreads();
//}
///////////////////////////////////////////////////////////////////////
// functions for ThreadQueue
///////////////////////////////////////////////////////////////////////
ThreadQueue::ThreadQueue ( ) {
m_entries = NULL;
m_entriesSize = 0;
}
void ThreadQueue::reset ( ) {
if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" );
m_entries = NULL;
m_top = 0;
}
bool ThreadQueue::init ( char threadType, int32_t maxThreads, int32_t maxEntries ) {
m_threadType = threadType;
m_launched = 0;
m_returned = 0;
m_maxLaunched = maxThreads;
// # of low priority threads launched and returned
m_hiLaunched = 0;
m_hiReturned = 0;
m_mdLaunched = 0;
m_mdReturned = 0;
m_loLaunched = 0;
m_loReturned = 0;
// we now count write threads so we can limit that
m_writesLaunched = 0;
m_writesReturned = 0;
// these are for disk threads, which we now limit based on read sizes
m_hiLaunchedBig = 0;
m_hiReturnedBig = 0;
m_mdLaunchedBig = 0;
m_mdReturnedBig = 0;
m_loLaunchedBig = 0;
m_loReturnedBig = 0;
m_hiLaunchedMed = 0;
m_hiReturnedMed = 0;
m_mdLaunchedMed = 0;
m_mdReturnedMed = 0;
m_loLaunchedMed = 0;
m_loReturnedMed = 0;
m_hiLaunchedSma = 0;
m_hiReturnedSma = 0;
m_mdLaunchedSma = 0;
m_mdReturnedSma = 0;
m_loLaunchedSma = 0;
m_loReturnedSma = 0;
//m_entriesUsed = 0;
m_top = 0;
m_isLowPrioritySuspended = false;
// alloc space for entries
m_maxEntries = maxEntries;
m_entriesSize = sizeof(ThreadEntry)*m_maxEntries;
m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" );
if ( ! m_entries ) return log("thread: Failed to allocate %"INT32" bytes "
"for thread queue.",m_entriesSize);
// debug msg
//log("INIT CALLED. setting all m_isDone to 1.");
// clear m_isOccupied et al for new guys
//for ( int32_t i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) {
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
m_entries[i].m_isOccupied = false;
m_entries[i].m_isLaunched = false;
m_entries[i].m_isDone = true;
m_entries[i].m_qnum = threadType;
m_entries[i].m_stack = NULL;
}
return true;
}
int32_t ThreadQueue::getNumThreadsOutOrQueued() {
// MDW: we also need to count threads that are returned but need their
// callback called so, in the case of RdbDump, the rdblist that was written
// to disk can update the rdbmap before it gets saved, so it doesn't get
// out of sync. Process.cpp calls .suspendMerge() to make sure that all
// merge operations are suspended as well.
int32_t n = 0;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( e->m_isOccupied ) n++;
}
return n;
/*
int32_t n = m_launched - m_returned;
for ( int32_t i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( ! e->m_isOccupied ) continue;
if ( e->m_isLaunched ) continue;
if ( e->m_isDone ) continue;
// do not count "reads", only count writes
//if ( m_threadType == DISK_THREAD && e->m_state ) {
// FileState *fs = (FileState *)e->m_state;
// if ( fs->m_doWrite ) continue;
//}
}
return n;
*/
}
// return NULL and set g_errno on error
ThreadEntry *ThreadQueue::addEntry ( int32_t niceness ,
void *state ,
void (* callback )(void *state,
ThreadEntry *t) ,
void *(* startRoutine)(void *state,
ThreadEntry *t) ) {
// if we are 90% full and niceness is > 0, knock it off
int32_t max = m_maxEntries;
if ( m_threadType == DISK_THREAD && niceness > 0 ) {
max = (m_maxEntries * 90) / 100;
if ( max <= 0 ) max = 1;
}
// debug test
//if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; }
// get first available entry, not in use
int32_t i;
//for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ )
for ( i = 0 ; i < max ; i++ )
if ( ! m_entries[i].m_isOccupied ) break;
// caution
//if ( i >= MAX_THREAD_ENTRIES ) {
if ( i >= max ) {
g_errno = ENOTHREADSLOTS;
static time_t s_time = 0;
time_t now = getTime();
if ( now - s_time > 5 ) {
log("thread: Could not add thread to queue. Already "
"have %"INT32" entries.",max);
s_time = now;
}
return NULL;
}
// debug msg
//fprintf(stderr,"addEntry my pid=%"UINT32"\n", (int32_t)getpid() );
// get an available entry
ThreadEntry *t = &m_entries [ i ];
// debug msg
//log("claiming entry state=%"UINT32", occupied=%"INT32"",(int32_t)t->m_state,
// (int32_t)t->m_isOccupied);
// stick it in
t->m_niceness = niceness;
t->m_state = state;
t->m_callback = callback;
t->m_startRoutine = startRoutine;
t->m_isOccupied = true;
t->m_isCancelled = false;
t->m_stack = NULL;
// debug msg
//log("setting t=%"UINT32" m_isDone to 0", (int32_t)t );
t->m_isDone = false;
t->m_isLaunched = false;
t->m_queuedTime = gettimeofdayInMilliseconds();
t->m_readyForBail = false;
t->m_allocBuf = NULL;
t->m_allocSize = 0;
t->m_errno = 0;
// and when the ohcrap callback gets called and the thread
// is cleaned up it will check the FileState readsize and
// m_isWrite to see which launch counts to decrement, so
// since FileState will be corrupted, we need to store
// this info directly into the thread entry.
if ( m_threadType == DISK_THREAD && t->m_state ) {
FileState *fs = (FileState *)t->m_state;
t->m_bytesToGo = fs->m_bytesToGo;
t->m_doWrite = fs->m_doWrite ;
}
else {
t->m_bytesToGo = 0;
t->m_doWrite = false;
}
// inc the used count
//m_entriesUsed++;
// debug msg
//log("m_entriesUsed now %"INT32"",m_entriesUsed);
// might have to inc top as well
if ( i == m_top ) m_top++;
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"queued %s thread for launch. "
"niceness=%"INT32". ", (PTRTYPE)t,
getThreadType(), (int32_t)niceness );
// success
return t;
}
int32_t Threads::timedCleanUp (int32_t maxTime, int32_t niceness) {
if ( ! m_needsCleanup ) return 0;
//if ( g_inSigHandler ) return 0;
int64_t startTime = gettimeofdayInMillisecondsLocal();
int64_t took = 0;
if ( niceness >= MAX_NICENESS ) m_needsCleanup = false;
//for ( int32_t i = -1 ; i <= niceness ; i++ ) {
for ( int32_t i = 0 ; i <= niceness ; i++ ) {
for ( int32_t j = 0 ; j < m_numQueues ; j++ )
m_threadQueues[j].timedCleanUp ( i );
launchThreads();
if ( maxTime < 0 ) continue;
took = startTime - gettimeofdayInMillisecondsLocal();
if ( took <= maxTime ) continue;
// ok, we have to cut if int16_t...
m_needsCleanup = true;
break;
}
return took;
}
bool Threads::isHittingFile ( BigFile *bf ) {
return m_threadQueues[DISK_THREAD].isHittingFile(bf);
}
bool ThreadQueue::isHittingFile ( BigFile *bf ) {
// loop through candidates
for ( int32_t i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// must not be done
if ( t->m_isDone ) continue;
// must be launched.. really??
//if ( ! t->m_isLaunched ) continue;
// must be a read
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
// int16_tcut
FileState *fs = (FileState *)t->m_state;
// get bigfile ptr
if ( fs->m_this == bf ) return true;
}
return false;
}
void Threads::bailOnReads ( ) {
m_threadQueues[DISK_THREAD].bailOnReads();
}
// Process.cpp calls these callbacks before their time in order to
// set EDISKSTUCK
void ThreadQueue::bailOnReads ( ) {
// note it
log("threads: bypassing read threads");
// loop through candidates
for ( int32_t i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
//if ( ! t->m_isLaunched ) continue;
// must be niceness 0
if ( t->m_niceness != 0 ) continue;
// must not be done
if ( t->m_isDone ) continue;
// must not have already called callback
if ( t->m_callback == ohcrap ) continue;
// must be a read
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
// int16_tcut
FileState *fs = (FileState *)t->m_state;
// do not stop writes
if ( fs->m_doWrite ) continue;
// must be niceness 0 too!
if ( fs->m_niceness != 0 ) continue;
// what is this? unlaunched...
//if ( t->m_pid == 0 ) continue;
// can only bail on a thread after it copies its FileState
// class into its stack so we can bypass it and free the
// original FileState without causing a core. if thread
// is not yet launched we have to call the callback here too
// otherwise it never gets launched until the disk is unstuck!
if ( ! t->m_readyForBail && t->m_isLaunched ) continue;
// set error
t->m_errno = EDISKSTUCK;
// set this too
g_errno = EDISKSTUCK;
// do not allow caller to free the alloc'd buf in case
// its read finally comes through!
t->m_allocBuf = fs->m_allocBuf;
t->m_allocSize = fs->m_allocSize;
fs->m_allocBuf = NULL;
fs->m_allocSize = 0;
// call it
t->m_callback ( t->m_state , t );
// do not re-call it...
t->m_callback = ohcrap;
// invalidate state (FileState usually)
t->m_state = NULL;
// do not launch if not yet launched
if ( t->m_isLaunched ) continue;
// delete him if not yet launched, otherwise we try to
// launch it later with a corrupted/unstable FileState...
// and that causes our launch counts to get out of whack i
// think...
t->m_isOccupied = false;
// note it
log("threads: bailing unlaunched thread");
// do we have to decrement top
if ( m_top == i + 1 )
while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
m_top--;
}
}
// BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to
// ohcrap() because it was taking too long to do its read and we prematurely
// called its callback above in bailOnReads(). In that case we still have to
// free the disk read buffer which was never used. And doneWrapper() in
// BigFile.cpp is never called.
void ohcrap ( void *state , ThreadEntry *t ) {
// free the read buffer here then
if ( t->m_allocBuf )
mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" );
log("threads: got one");
}
// . cleans up any threads that have exited
// . their m_isDone should be set to true
// . don't process threads whose niceness is > maxNiceness
bool ThreadQueue::timedCleanUp ( int32_t maxNiceness ) {
// top:
int32_t numCallbacks = 0;
// loop through candidates
for ( int32_t i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// skip if not qualified
if ( t->m_niceness > maxNiceness ) continue;
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
if ( ! t->m_isLaunched ) continue;
// . we were segfaulting right here before because the thread
// was setting t->m_pid and at this point it was not
// set so t->m_pid was a bogus value
// . thread may have seg faulted, in which case sigbadhandler()
// in Loop.cpp will get it and set errno to 0x7fffffff
#ifndef PTHREADS
// MDW: i hafta take this out because the errno_location thing
// is not working on the newer gcc
if ( ! t->m_isDone && t->m_pid >= 0 &&
s_errnos [t->m_pid] == 0x7fffffff ) {
log("thread: Got abnormal thread termination. Seems "
"like the thread might have cored.");
s_errnos[t->m_pid] = 0;
goto again;
}
#endif
// skip if not done yet
if ( ! t->m_isDone ) continue;
#ifdef PTHREADS
// if pthread_create() failed it returns the errno and we
// needsJoin is false, so do not try to join
// to a thread if we did not create it, lest pthread_join()
// cores
if ( t->m_needsJoin ) {
// . join up with that thread
// . damn, sometimes he can block forever on his
// call to sigqueue(),
int32_t status = pthread_join ( t->m_joinTid , NULL );
if ( status != 0 ) {
log("threads: pthread_join %"INT64" = %s (%"INT32")",
(int64_t)t->m_joinTid,mstrerror(status),
status);
}
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined1 with "
"t=0x%"PTRFMT" "
"jointid=0x%"XINT64".",
(PTRTYPE)t,(int64_t)t->m_joinTid);
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE ,
STACK_SIZE - GUARDSIZE,
PROT_NONE );
}
#else
again:
int status ;
pid_t pid = waitpid ( t->m_pid , &status , 0 );
int err = errno;
// debug the waitpid
if ( g_conf.m_logDebugThread || g_process.m_exiting )
log(LOG_DEBUG,"thread: Waiting for t=0x%"PTRFMT" "
"pid=%"INT32".",
(PTRTYPE)t,(int32_t)t->m_pid);
// bitch and continue if join failed
if ( pid != t->m_pid ) {
// waitpid() gets interrupted by various signals so
// we need to repeat (SIGCHLD?)
if ( err == EINTR ) goto again;
log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.",
(int32_t)t->m_pid,(int32_t)pid,mstrerror(err));
continue;
}
// if status not 0 then process got abnormal termination
if ( ! WIFEXITED(status) ) {
if ( WIFSIGNALED(status) )
log("thread: Child process (pid=%i) exited "
"from unhandled signal number %"INT32".",
pid,(int32_t)WTERMSIG(status));
else
log("thread: Child process (pid=%i) exited "
"for unknown reason." , pid );
}
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".",
(int32_t)t->m_pid,(int32_t)t->m_pid);
#endif
// we may get cleaned up and re-used and our niceness reassignd
// right after set m_isDone to true, so save niceness
int32_t niceness = t->m_niceness;
char qnum = t->m_qnum;
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
if ( tq != this ) { char *xx = NULL; *xx = 0; }
// get read size before cleaning it up -- it could get nuked
int32_t rs = 0;
bool isWrite = false;
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
//FileState *fs = (FileState *)t->m_state;
rs = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( niceness <= 0) tq->m_hiReturned++;
else if ( niceness == 1) tq->m_mdReturned++;
else if ( niceness >= 2) tq->m_loReturned++;
// deal with the tiers for disk threads based on read sizes
if ( tq->m_threadType == DISK_THREAD ) {
// writes are special cases
if ( isWrite ) m_writesReturned++;
if ( rs >= 0 && niceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_loReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_loReturnedMed++;
else
tq->m_loReturnedSma++;
}
else if ( rs >= 0 && niceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_mdReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_mdReturnedMed++;
else
tq->m_mdReturnedSma++;
}
else if ( rs >= 0 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_hiReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_hiReturnedMed++;
else
tq->m_hiReturnedSma++;
}
}
// now count him as returned
m_returned++;
// prepare for relaunch if we were cancelled
if ( t->m_isCancelled ) {
t->m_isCancelled = false;
t->m_isLaunched = false;
t->m_isDone = false;
log("thread: Thread cancelled. Preparing thread "
"for relaunch");
continue;
}
numCallbacks++;
// not running any more
t->m_isLaunched = false;
// not occupied any more
t->m_isOccupied = false;
// do we have to decrement top
if ( m_top == i + 1 )
while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
m_top--;
// send a cancel sig to the thread in case it's still there
//int err = pthread_cancel ( t->m_tid );
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
// mstrerror(err) );
// one less entry occupied
//m_entriesUsed--;
// debug msg
//log("m_entriesUsed now %"INT32"",m_entriesUsed);
// one more returned
//m_returned++;
// clear the g_errno in case set by a previous callback
//g_errno = 0;
// launch as many threads as we can before calling the
// callback since this may hog the CPU like Msg20 does
//g_threads.launchThreads();
g_errno = 0;
//g_loop.startBlockedCpuTimer();
//only allow a quickpoll if we are nice.
//g_loop.canQuickPoll(t->m_niceness);
makeCallback ( t );
//int64_t took = gettimeofdayInMilliseconds()-startTime;
//if(took > 8 && maxNiceness > 0) {
// if(g_conf.m_sequentialProfiling)
// log(LOG_TIMING,
// "admin: Threads spent %"INT64" ms to callback "
// "%"INT32" callbacks, nice: %"INT32"",
// took, numCallbacks, maxNiceness);
// g_threads.m_needBottom = true;
// maxNiceness = 0;
//}
// clear errno again
g_errno = 0;
if ( g_conf.m_logDebugThread ) {
int64_t now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done1. "
"active=%"INT32" "
"time since queued = %"UINT64" ms "
"time since launch = %"UINT64" ms "
"time since pre-exit = %"UINT64" ms "
"time since exit = %"UINT64" ms",
(PTRTYPE)t,
getThreadType() ,
(int32_t)(m_launched - m_returned) ,
(uint64_t)(now - t->m_queuedTime),
(uint64_t)(now - t->m_launchedTime),
(uint64_t)(now - t->m_preExitTime) ,
(uint64_t)(now - t->m_exitTime) );
}
}
//since we need finer grained control in loop, we no longer collect
//the callbacks, sort, then call them. we now call them right away
//that way we can break out if we start taking too long and
//give control back to udpserver.
return numCallbacks != 0;
}
void makeCallback ( ThreadEntry *t ) {
// sanity check - if in a high niceness callback, we should
// only be calling niceness 0 callbacks here
// no, this is only called from sleep wrappers originating from
// Loop.cpp, so we should be ok
//if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; }
// save it
int32_t saved = g_niceness;
// log it now
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: enter thread callback t=0x%"PTRFMT" "
//"type=%s "
"state=0x%"PTRFMT" "
"nice=%"INT32"",
(PTRTYPE)t,
//getThreadType(),
(PTRTYPE)t->m_state,
(int32_t)t->m_niceness);
// time it?
int64_t start;
if ( g_conf.m_maxCallbackDelay >= 0 )
start = gettimeofdayInMillisecondsLocal();
// then set it
if ( t->m_niceness >= 1 ) g_niceness = 1;
else g_niceness = 0;
t->m_callback ( t->m_state , t );
// time it?
if ( g_conf.m_maxCallbackDelay >= 0 ) {
int64_t elapsed = gettimeofdayInMillisecondsLocal() - start;
if ( elapsed >= g_conf.m_maxCallbackDelay )
log("threads: Took %"INT64" ms to call "
"thread callback niceness=%"INT32"",
elapsed,(int32_t)saved);
}
// log it now
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
log(LOG_DEBUG,"loop: exit thread callback t=0x%"PTRFMT" "
//"type=%s "
"nice=%"INT32"",
(PTRTYPE)t,
//getThreadType(),
(int32_t)t->m_niceness);
// restore global niceness
g_niceness = saved;
}
bool Threads::cleanUp ( ThreadEntry *t , int32_t maxNiceness ) {
bool didSomething = false;
loop:
// assume no more cleanup needed
m_needsCleanup = false;
//m_needBottom = false;
// debug msg
//log("cleanUp");
// debug msg
//log("cleaning up exited threads and calling callbacks");
for ( int32_t i = 0 ; i < m_numQueues ; i++ ) {
didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness );
// . if we broke from the loop
//if(m_needBottom) maxNiceness = 0;
}
// . loop more if we got a new one
// . thread will set this when about to exit
// . waitpid() may be interrupted by a SIGCHLD and not get his pid
if ( m_needsCleanup ) goto loop;
return didSomething;
}
// . cleans up any threads that have exited
// . their m_isDone should be set to true
// . don't process threads whose niceness is > maxNiceness
bool ThreadQueue::cleanUp ( ThreadEntry *tt , int32_t maxNiceness ) {
// call all callbacks after all threads are cleaned up
void (* callbacks[64])(void *state,ThreadEntry *);
void *states [64];
int64_t times [64];
int64_t times2 [64];
int64_t times3 [64];
int64_t times4 [64];
ThreadEntry *tids [64];
int64_t startTime = gettimeofdayInMilliseconds();
top:
int32_t numCallbacks = 0;
// loop through candidates
for ( int32_t i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// skip if not qualified
if ( t->m_niceness > maxNiceness ) {
//if(t->m_isDone) {
// g_threads.m_needBottom = true;
// //g_threads.m_needsCleanup = true;
//}
continue;
}
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
if ( ! t->m_isLaunched ) continue;
// . we were segfaulting right here before because the thread
// was setting t->m_pid and at this point it was not
// set so t->m_pid was a bogus value
// . thread may have seg faulted, in which case sigbadhandler()
// in Loop.cpp will get it and set errno to 0x7fffffff
#ifndef PTHREADS
// MDW: i hafta take this out because the errno_location thing
// is not working on the newer gcc
if ( ! t->m_isDone && t->m_pid >= 0 &&
s_errnos [t->m_pid] == 0x7fffffff ) {
log("thread: Got abnormal thread termination. Seems "
"like the thread might have cored.");
s_errnos[t->m_pid] = 0;
goto again;
}
#endif
// skip if not done yet
if ( ! t->m_isDone ) continue;
#ifdef PTHREADS
if ( t->m_needsJoin ) {
// . join up with that thread
// . damn, sometimes he can block forever on his
// call to sigqueue(),
int32_t status = pthread_join ( t->m_joinTid , NULL );
if ( status != 0 ) {
log("threads: "
"pthread_join2 %"INT64" = %s (%"INT32")",
(int64_t)t->m_joinTid,mstrerror(status),
status);
}
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined2 with "
"t=0x%"PTRFMT" "
"jointid=0x%"XINT64".",
(PTRTYPE)t,(int64_t)t->m_joinTid);
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE ,
STACK_SIZE - GUARDSIZE,
PROT_NONE );
}
#else
again:
int status ;
pid_t pid = waitpid ( t->m_pid , &status , 0 );
int err = errno;
// debug the waitpid
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: Waiting for "
"t=0x%"PTRFMT" pid=%"INT32".",
(PTRTYPE)t,(int32_t)t->m_pid);
// bitch and continue if join failed
if ( pid != t->m_pid ) {
// waitpid() gets interrupted by various signals so
// we need to repeat (SIGCHLD?)
if ( err == EINTR ) goto again;
log("thread: Call to waitpid(%"INT32") returned %"INT32": %s.",
(int32_t)t->m_pid,(int32_t)pid,mstrerror(err));
continue;
}
// if status not 0 then process got abnormal termination
if ( ! WIFEXITED(status) ) {
if ( WIFSIGNALED(status) )
log("thread: Child process (pid=%i) exited "
"from unhandled signal number %"INT32".",
pid,(int32_t)WTERMSIG(status));
else
log("thread: Child process (pid=%i) exited "
"for unknown reason." , pid );
}
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
#endif
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined with pid=%"INT32" pid=%"INT32".",
(int32_t)t->m_pid,(int32_t)t->m_pid);
// we may get cleaned up and re-used and our niceness reassignd
// right after set m_isDone to true, so save niceness
int32_t niceness = t->m_niceness;
char qnum = t->m_qnum;
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
if ( tq != this ) { char *xx = NULL; *xx = 0; }
// get read size before cleaning it up -- it could get nuked
int32_t rs = 0;
bool isWrite = false;
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
//FileState *fs = (FileState *)t->m_state;
rs = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( niceness <= 0) tq->m_hiReturned++;
else if ( niceness == 1) tq->m_mdReturned++;
else if ( niceness >= 2) tq->m_loReturned++;
// deal with the tiers for disk threads based on read sizes
if ( tq->m_threadType == DISK_THREAD ) {
// writes are special cases
if ( isWrite ) m_writesReturned++;
if ( rs >= 0 && niceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_loReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_loReturnedMed++;
else
tq->m_loReturnedSma++;
}
else if ( rs >= 0 && niceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_mdReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_mdReturnedMed++;
else
tq->m_mdReturnedSma++;
}
else if ( rs >= 0 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_hiReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_hiReturnedMed++;
else
tq->m_hiReturnedSma++;
}
}
// . we should count down here, not in the master thread
// . solves Problem #2 ?
// . TODO: this is not necessaruly atomic we should set
// t->m_aboutToExit to true so cleanUp can periodically set
// m_returned to what it should be!!!
//g_threads.m_threadQueues[qnum].m_returned++;
// now count him as returned
m_returned++;
// prepare for relaunch if we were cancelled
if ( t->m_isCancelled ) {
t->m_isCancelled = false;
t->m_isLaunched = false;
t->m_isDone = false;
log("thread: Thread cancelled. Preparing thread "
"for relaunch");
continue;
}
// debug msg
//log("[%"UINT32"] CLEANING UP THREAD type=%"INT32", numLaunched=%"INT32"",
// m_entries[i].m_tid , m_threadType , m_launched );
// remove it
// debug msg
//log("CLN TID=%"UINT32" t=%"UINT32"",(int32_t)t->m_tid , (int32_t)t);
//log("thread callback for tid=%"UINT32"",(int32_t)t->m_tid );
// . save important stuff before freeing up the ThreadEntry
// for possible take over.
// . calling the callback may launch a thread which may
// claim THIS thread entry, t
//void (* callback)(void *state);
//callback = t->m_callback;
//void *state = t->m_state;
callbacks [ numCallbacks ] = t->m_callback;
states [ numCallbacks ] = t->m_state;
times [ numCallbacks ] = t->m_queuedTime;
times2 [ numCallbacks ] = t->m_launchedTime;
times3 [ numCallbacks ] = t->m_preExitTime;
times4 [ numCallbacks ] = t->m_exitTime;
tids [ numCallbacks ] = t;
numCallbacks++;
// SOLUTION: before calling the callback which may launch
// another thread with this same tid, thus causing an error,
// we should set these to false first:
// not running any more
t->m_isLaunched = false;
// not occupied any more
t->m_isOccupied = false;
// do we have to decrement top
if ( m_top == i + 1 )
while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
m_top--;
// send a cancel sig to the thread in case it's still there
//int err = pthread_cancel ( t->m_tid );
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
// mstrerror(err) );
// one less entry occupied
//m_entriesUsed--;
// debug msg
//log("m_entriesUsed now %"INT32"",m_entriesUsed);
// one more returned
//m_returned++;
// clear the g_errno in case set by a previous callback
//g_errno = 0;
// launch as many threads as we can before calling the
// callback since this may hog the CPU like Msg20 does
//g_threads.launchThreads();
g_errno = 0;
makeCallback ( t );
// int64_t took = gettimeofdayInMilliseconds()-startTime;
// if(took > 8 && maxNiceness > 0) {
// if(g_conf.m_sequentialProfiling)
// log(LOG_TIMING,
// "admin: Threads spent %"INT64" ms to callback "
// "%"INT32" callbacks, nice: %"INT32"",
// took, numCallbacks, maxNiceness);
// g_threads.m_needBottom = true;
// maxNiceness = 0;
// }
// clear errno again
g_errno = 0;
if ( g_conf.m_logDebugThread ) {
int64_t now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] %s done2. "
"active=%"INT32" "
"time since queued = %"UINT64" ms "
"time since launch = %"UINT64" ms "
"time since pre-exit = %"UINT64" ms "
"time since exit = %"UINT64" ms",
(PTRTYPE)t,
getThreadType() ,
(int32_t)(m_launched - m_returned) ,
(uint64_t)(now - t->m_queuedTime),
(uint64_t)(now - t->m_launchedTime),
(uint64_t)(now - t->m_preExitTime) ,
(uint64_t)(now - t->m_exitTime) );
}
// calling thread callback
//log("calling thread id %"INT32" callback", (int32_t)(t->m_tid));
// first call it's callback
//callback ( state );
// clear after just in case
//g_errno = 0;
// debug msg
//log("CLN2 TID=%"UINT32" t=%"INT32"",(int32_t)t->m_tid ,(int32_t)t);
// return now if tt was specified
//if ( tt ) return;
}
int64_t took2 = gettimeofdayInMilliseconds()-startTime;
if(numCallbacks > 0 && took2 > 5)
log(LOG_DEBUG, "threads: took %"INT64" ms to callback %"INT32" "
"callbacks, nice: %"INT32"", took2, numCallbacks, maxNiceness);
//since we need finer grained control in loop, we no longer collect
//the callbacks, sort, then call them. we now call them right away
//that way we can break out if we start taking too long and
//give control back to udpserver.
return numCallbacks != 0;
// print out that we got them
if ( g_conf.m_logDebugThread ) {
int64_t now = gettimeofdayInMilliseconds();
for ( int32_t i = 0 ; i < numCallbacks ; i++ )
log(LOG_DEBUG,"thread: [tid=%"PTRFMT"] %s done3. "
"active=%"INT32" "
"time since queued = %"UINT64" ms "
"time since launch = %"UINT64" ms "
"time since pre-exit = %"UINT64" ms "
"time since exit = %"UINT64" ms",
(PTRTYPE)tids[i],
getThreadType() ,
(int32_t)(m_launched - m_returned) ,
(uint64_t)(now - times [i]),
(uint64_t)(now - times2[i]) ,
(uint64_t)(now - times3[i]) ,
(uint64_t)(now - times4[i]) );
}
// . before calling callbacks, launch any other threads waiting in
// this queue
// . TODO: break into parts, cleanup, launch, call callbacks
//while ( launchThread() );
// . sort callbacks by queued time
// . do bubble sort cuz usually it's not too many threads we cleaned up
bool flag ;
void (* tmpCallback)(void *state,ThreadEntry *t);
int64_t tmpTime;
void *tmpState;
int64_t now = gettimeofdayInMilliseconds();
bubble:
flag = false;
for ( int32_t i = 1 ; i < numCallbacks ; i++ ) {
if ( times[i] >= times[i-1] ) continue;
tmpTime = times [i ];
tmpState = states [i ];
tmpCallback = callbacks[i ];
times [i ] = times [i-1];
states [i ] = states [i-1];
tids [i ] = tids [i-1];
callbacks [i ] = callbacks[i-1];
times [i-1] = tmpTime;
states [i-1] = tmpState;
callbacks [i-1] = tmpCallback;
flag = true;
}
if ( flag ) goto bubble;
// call the callbacks now in order of oldest queued time first
for ( int32_t i = 0 ; i < numCallbacks ; i++ ) {
g_errno = 0;
callbacks[i] ( states[i] , NULL );
}
int64_t took = gettimeofdayInMilliseconds()-now;
if(numCallbacks > 0 && took > 5)
log(LOG_TIMING, "admin: took %"INT64" ms to callback %"INT32" "
"callbacks, nice: %"INT32"", took, numCallbacks, maxNiceness);
#if 0 //I think this is more efficient, for now we'll just use the old way
//theres no reason to sort these, lets just find the top one
//and call it. do it again it until we've called all of them.
int32_t newNumCallbacks = numCallbacks;
for ( int32_t i = 0 ; i < numCallbacks ; i++ ) {
int64_t maxTime = 0;
int32_t maxNdx = 0;
for ( int32_t j = 0 ; j < newNumCallbacks ; j++ ) {
if(maxTime >= times[i]) {
maxTime = times[i];
maxNdx = i;
}
}
g_errno = 0;
callbacks[maxNdx] ( states[maxNdx] );
//copy last one into called slot and decrement
newNumCallbacks--;
times [maxNdx] = times [newNumCallbacks];
states [maxNdx] = states [newNumCallbacks];
callbacks [maxNdx] = callbacks[newNumCallbacks];
}
#endif
g_errno = 0;
// close more threads if we were limited by callbacks[] size
if ( numCallbacks >= 64 ) goto top;
//return true if we called something back;
//returns wrong value if numCallbacks was exactly 64, not too serious...
return numCallbacks != 0;
}
// used by UdpServer to see if it should call a low priority callback
int32_t Threads::getNumActiveHighPriorityCpuThreads() {
ThreadQueue *q ;
int32_t hiActive = 0 ;
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[MERGE_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
return hiActive;
}
// used by UdpServer to see if it should call a low priority callback
int32_t Threads::getNumActiveHighPriorityThreads() {
ThreadQueue *q ;
int32_t hiActive = 0 ;
q = &g_threads.m_threadQueues[DISK_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[MERGE_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
return hiActive;
}
// . returns false if no thread launched
// . returns true if thread was launched
// . sets g_errno on error
// . don't launch a low priority thread if a high priority thread is running
// . i.e. don't launch a high niceness thread if a low niceness is running
bool ThreadQueue::launchThread2 ( ThreadEntry *te ) {
// debug msg
//log("trying to launch for type=%"INT32"",(int32_t)m_threadType);
// clean up any threads that have exited
//cleanUp ();
// if no entries then nothing to launch
if ( m_top <= 0 ) return false;
// or if no stacks left, don't even try
if ( s_head == -1 ) return false;
// . how many threads are active now?
// . NOTE: not perfectly thread safe here
int64_t active = m_launched - m_returned ;
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: launchThread: active=%"INT64" max=%"INT32".",
active,m_maxLaunched);
// return if the max is already launched
if ( active >= m_maxLaunched ) return false;
// do not launch a low priority merge, intersect or filter thread if we
// have high priority cpu threads already going on. this way a
// low priority spider thread will not launch if a high priority
// cpu-based thread of any kind (right now just MERGE or INTERSECT)
// is already running.
int32_t hiActive2 = g_threads.getNumActiveHighPriorityCpuThreads() ;
// return log("MAX. %"INT32" are launched. %"INT32" now in queue.",
// active , m_entriesUsed );
// . sanity check
// . a thread can NOT call this
//if ( getpid() != s_pid ) {
// fprintf(stderr,"thread: launchThread: bad engineer\n");
// ::exit(-1);
//}
//int64_t now = gettimeofdayInMilliseconds();
int64_t now = -1LL;
// pick thread with lowest niceness first
int32_t minNiceness = 0x7fffffff;
int64_t maxWait = -1;
int32_t mini = -1;
bool minIsWrite = false;
int32_t lowest = 0x7fffffff;
int32_t highest = 0;
// . now base our active thread counts on niceness AND read sizes
// . this is only used for DISK_THREADs
// . loActive* includes niceness >= 1
int32_t loActiveBig = m_loLaunchedBig - m_loReturnedBig;
int32_t loActiveMed = m_loLaunchedMed - m_loReturnedMed;
int32_t loActiveSma = m_loLaunchedSma - m_loReturnedSma;
int32_t mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig;
int32_t mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed;
int32_t mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma;
int32_t hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig;
int32_t hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed;
int32_t hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma;
int32_t activeWrites = m_writesLaunched - m_writesReturned;
// how many niceness=2 threads are currently running now?
int64_t loActive = m_loLaunched - m_loReturned;
int64_t mdActive = m_mdLaunched - m_mdReturned;
//int64_t hiActive = m_hiLaunched - m_hiReturned;
int32_t total = loActive + mdActive;
int32_t max = g_conf.m_spiderMaxDiskThreads;
if ( max <= 0 ) max = 1;
// hi priority max
// JAB: warning abatement
//int64_t hiActive = m_hiLaunched - m_hiReturned;
// i dunno what the point of this was... so i commented it out
//int32_t max2 = g_conf.m_queryMaxDiskThreads ;
//if ( max2 <= 0 ) max2 = 1;
// only do this check if we're a addlists/instersect thread queue
//if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false;
// loop through candidates
for ( int32_t i = 0 ; i < m_top ; i++ ) {
// skip if not occupied
if ( ! m_entries[i].m_isOccupied ) continue;
int32_t niceness = m_entries[i].m_niceness;
// get lowest niceness level of launched threads
if ( m_entries[i].m_isLaunched ) {
// if he's done, skip him
if ( m_entries[i].m_isDone ) continue;
// get the highest niceness for all that are launched
if ( niceness > highest ) highest = niceness;
// get the lowest niceness for all that are launched
if ( niceness < lowest ) lowest = niceness;
// continue now since it's already launched
continue;
}
// . these filters really make it so the spider does not
// impact query response time
//if ( niceness >= 1 && hiActive > 0 ) continue;
// don't consider any lows if one already running
//if ( niceness >= 2 && loActive > 0 ) continue;
// don't consider any lows if a hi already running
//if ( niceness >= 2 && hiActive > 0 ) continue;
// don't consider any mediums if one already running
//if ( niceness == 1 && mdActive > 0 ) continue;
// don't consider any mediums if a hi already running
//if ( niceness == 1 && hiActive > 0 ) continue;
//if ( m_threadType == DISK_THREAD ) {
// if ( niceness >= 1 && hiActive > 0 ) continue;
// if ( niceness >= 2 && loActive >= max ) continue;
// if ( niceness == 1 && mdActive >= max ) continue;
//}
// treat niceness 1 as niceness 2 for ranking purposes
// IFF we're not too backlogged with file merges. i.e.
// IFF we're merging faster than we're dumping.
// only merges and dumps have niceness 1 really.
// spider disk reads are all niceness 2.
// Now Rdb::addList just refuses to add data if we have too
// many unmerged files on disk!
// now we use niceness 1 for "real merges" so those reads take
// priority over spider build reads. (8/14/12)
//if(niceness == 1 /*&& g_numUrgentMerges <= 0*/) niceness = 2;
// if he doesn't beat or tie us, skip him
if ( niceness > minNiceness ) continue;
// no more than "max" medium and low priority threads should
// be active/launched at any one time
if ( niceness >= 1 && total >= max ) continue;
// int16_tcut
ThreadEntry *t = &m_entries[i];
// what is this guy's read size?
// the filestate provided could have been
//FileState *fs ;
int32_t readSize = 0 ;
bool isWrite = false;
if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) {
//fs = (FileState *)m_entries[i].m_state;
readSize = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( isWrite && activeWrites > g_conf.m_maxWriteThreads )
continue;
if ( m_threadType == MERGE_THREAD ||
m_threadType == INTERSECT_THREAD ||
m_threadType == FILTER_THREAD )
if ( niceness > 0 && hiActive2 > 0 )
continue;
// how many threads can be launched for this readSize/niceness?
if ( niceness >= 1 && m_threadType == DISK_THREAD ) {
if ( readSize > g_conf.m_medReadSize ) {
if ( loActiveBig + mdActiveBig >=
g_conf.m_spiderMaxBigDiskThreads )
continue;
}
else if ( readSize > g_conf.m_smaReadSize ) {
if ( loActiveMed + mdActiveMed >=
g_conf.m_spiderMaxMedDiskThreads )
continue;
}
else if ( loActiveSma + mdActiveSma >=
g_conf.m_spiderMaxSmaDiskThreads )
continue;
}
else if ( niceness < 1 && m_threadType == DISK_THREAD ) {
if ( readSize > g_conf.m_medReadSize ) {
if ( hiActiveBig >=
g_conf.m_queryMaxBigDiskThreads )
continue;
}
else if ( readSize > g_conf.m_smaReadSize ) {
if ( hiActiveMed >=
g_conf.m_queryMaxMedDiskThreads )
continue;
}
else if ( hiActiveSma >=
g_conf.m_queryMaxSmaDiskThreads )
continue;
}
// be lazy with this since it uses a significant amount of cpu
if ( now == -1LL ) now = gettimeofdayInMilliseconds();
// how long has this entry been waiting in the queue to launch?
int64_t waited = now - m_entries[i].m_queuedTime ;
// adjust "waited" if it originally had a niceness of 1
if ( m_entries[i].m_niceness >= 1 ) {
// save threads gain precedence
if ( m_threadType == SAVETREE_THREAD )
waited += 49999999;
else if ( m_threadType == UNLINK_THREAD )
waited += 39999999;
else if ( m_threadType == MERGE_THREAD )
waited += 29999999;
else if ( m_threadType == INTERSECT_THREAD )
waited += 29999999;
else if ( m_threadType == FILTER_THREAD )
waited += 9999999;
// if its a write thread... do it quick
else if ( isWrite )
waited += 19999999;
// . if it has waited more than 500 ms it needs
// to launch now...
// . these values seem to be VERY well tuned on
// my production machines, but they may have to
// be tuned for other machines? TODO.
// . they should auto-tune so when merge is more
// important it should starve the other spider reads
else if ( waited >= 500 )
waited += 9999999;
// it hurts for these guys to wait that int32_t
else waited *= 4;
}
// is real merge?
if ( m_entries[i].m_niceness == 1 )
waited += 19999999;
// watch out for clock skew
if ( waited < 0 ) waited = 0;
// if tied, the lowest time wins
if ( niceness == minNiceness && waited <= maxWait ) continue;
// we got a new winner
mini = i;
minNiceness = niceness;
maxWait = waited;
minIsWrite = isWrite;
}
// if no candidate, bail honorably
if ( mini == -1 ) return false;
// . if we've got an urgent thread (niceness=1) going on, do not allow
// niceness=2 threads to launch
// . actually, don't launch *ANY* if doing an urgent merge even if
// no niceness=1 thread currently launched
// . CAUTION! when titledb is merging it dumps tfndb and if tfndb
// goes urgent,make sure it dumps out with niceness 1, otherwise
// this will freeze us!! since titledb won't be able to continue
// merging until tfndb finishes dumping...
// . Now Rdb::addList just refuses to add data if we have too
// many unmerged files on disk! Let others still read from us,
// just not add to us...
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 )
// return false;
// if we're urgent, don't launch a niceness 2 unless he's waited
// at least 200ms in the queue
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 )
// return false;
// . if the thread to launch has niceness > lowest launched then bail
// . i.e. don't launch a low-priority thread if we have highs running
// . we no longer let a niceness of 1 prevent a niceness of 2 from
// launching, this way we can launch merge threads at a niceness
// of 1 w/o hurting the spidering too much, but still giving the
// merge some preferential treatment over the disk so we don't
// RdbDump data faster than we can finish the merge.
if ( minNiceness > lowest && lowest < 1 ) return false;
// . don't launch a low priority thread if there's a high launched
// from any ThreadQueue
// . hi priority is niceness <= 0, med/lo priority is niceness >= 1
//int64_t hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned;
// now just limit it to this queue... so you can launch a low
// merge thread even if there's a high disk read going on
//if ( minNiceness >= 1 && hiActive > 0 ) return false;
// if we're low priority and suspended is true then bail
//if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false;
// if we're going to launch a high priority thread then CANCEL
// any low priority disk-read threads already in progress
//if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads ();
// . let's cancel ALL low priority threads if we're launching a high
// . hi priority is niceness <= 0, low priority is niceness >= 1
//int64_t loActive = g_threads.m_loLaunched - g_threads.m_loReturned;
int32_t realNiceness = m_entries[mini].m_niceness;
//if ( realNiceness <= 0 && (loActive + mdActive) > 0 )
// // this actually cancels medium priority threads, too
// g_threads.cancelLowPriorityThreads();
// point to winning entry
ThreadEntry *t = &m_entries[mini];
// if descriptor was closed, just return error now, we
// cannot try to re-open because the file might have been
// unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid
// FileState ptr because it does its own saving, so check for NULLs.
FileState *fs = (FileState *)t->m_state;
bool allocated = false;
if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) {
// allocate the read buffer here!
if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) {
int32_t need = t->m_bytesToGo + fs->m_allocOff;
char *p = (char *) mmalloc ( need , "ThreadReadBuf" );
if ( p ) {
fs->m_buf = p + fs->m_allocOff;
fs->m_allocBuf = p;
fs->m_allocSize = need;
allocated = true;
}
else
log("thread: read buf alloc failed for %"INT32" "
"bytes.",need);
// just let the BigFile::readWrite_r() handle the
// error for the NULL read buf
}
// . otherwise, they are intact, so get the real fds
// . we know the stored File is still around because of that
bool doWrite = fs->m_doWrite;
BigFile *bb = fs->m_this;
fs->m_fd1 = bb->getfd (fs->m_filenum1, !doWrite, &fs->m_vfd1);
fs->m_fd2 = bb->getfd (fs->m_filenum2, !doWrite, &fs->m_vfd2);
// is this bad?
if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s",
fs->m_fd1,bb->m_baseFilename);
if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.",
fs->m_fd2,bb->m_baseFilename);
fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 );
fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 );
}
// count it as launched now, before we actually launch it
m_launched++;
// priority-based GLOBAL & LOCAL launch count
if ( realNiceness <= 0 ) m_hiLaunched++;
else if ( realNiceness == 1 ) m_mdLaunched++;
else if ( realNiceness >= 2 ) m_loLaunched++;
// deal with the tiers for disk threads based on read sizes
if ( m_threadType == DISK_THREAD ) {
// writes are special cases
if ( minIsWrite ) m_writesLaunched++;
//FileState *fs = (FileState *)m_entries[mini].m_state;
int32_t rs = t->m_bytesToGo; // 0;
//if ( fs ) rs = fs->m_bytesToGo;
if ( realNiceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
m_loLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_loLaunchedMed++;
else
m_loLaunchedSma++;
}
else if ( realNiceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
m_mdLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_mdLaunchedMed++;
else
m_mdLaunchedSma++;
}
else {
if ( rs > g_conf.m_medReadSize )
m_hiLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_hiLaunchedMed++;
else
m_hiLaunchedSma++;
}
}
// debug msg
//if ( m_threadType == 0 )
// log("creating thread, t=%"UINT32" state=%"UINT32" launched = %"INT32"",
// t , (int32_t)t->m_state , m_launched );
// and set the flag
t->m_isLaunched = true;
// . launch it
// . this sets the pthread_t ptr for identificatoin
// . returns false on error
//pthread_t tmp;
loop:
// debug msg
if ( g_conf.m_logDebugThread ) {
active = m_launched - m_returned ;
int64_t now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] launched %s thread. "
"active=%"INT64" "
"niceness=%"INT32". waited %"UINT64" ms in queue.",
(PTRTYPE)t, getThreadType(), active, realNiceness,
now - t->m_queuedTime);
}
// be lazy with this since it uses a significant amount of cpu
if ( now == -1LL ) now = gettimeofdayInMilliseconds();
//t->m_launchedTime = g_now;
t->m_launchedTime = now;
// loop2:
// spawn the thread
int32_t count = 0;
pid_t pid;
#ifndef PTHREADS
//int status;
//int ret;
// random failure test
//if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; }
// malloc twice the size
t->m_stackSize = STACK_SIZE;
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
int32_t si = g_threads.getStack ( );
if ( si < 0 ) {
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
goto hadError;
}
t->m_si = si;
t->m_stack = s_stackPtrs [ si ];
// UNprotect the whole stack so we can use it
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
PROT_READ | PROT_WRITE );
// clear g_errno
g_errno = 0;
// . make another process
// . do not use sig handlers, so if a child process gets any unhandled
// signal (like SEGV) it will just exit
pid = clone ( startUp , t->m_stack + t->m_stackSize ,
CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND |
SIGCHLD ,
(void *)t );
// . we set the pid because we are the one that checks it!
// . if we just let him do it, when we check in cleanup routine
// we can get an uninitialized pid
t->m_pid = pid;
// might as well bitch if we should here
if ( s_bad ) {
log(LOG_LOGIC,"thread: PID received: %"INT32" > %"INT32". Bad.",
s_badPid, (int32_t)MAX_PID);
//char *xx = NULL; *xx = 0;
}
// wait for him
//ret = waitpid ( -1*pid , &status , 0 );
//if ( ret != pid )
// log("waitpid(pid=%"INT32"): ret=%"INT32" err=%s",
// (int32_t)pid,(int32_t)ret,mstrerror(errno));
// check if he's done
//if ( ! t->m_isDone ) log("NOT DONE");
// set the pid
//t->m_pid = pid;
// error?
if ( pid == (pid_t)-1 ) g_errno = errno;
//
// now use pthreads again... are they stable yet?
//
#else
// assume it does not go through
t->m_needsJoin = false;
// pthread inherits our sigmask, so don't let it handle sigalrm
// signals in Loop.cpp, it'll screw things up. that handler
// is only meant to be called by the main process. if we end up
// double calling it, this thread may think g_callback is non-null
// then it gets set to NULL, then the thread cores! seen it...
// sigset_t sigs;
// sigemptyset ( &sigs );
// sigaddset ( &sigs , SIGALRM );
// sigaddset ( &sigs , SIGVTALRM );
// if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 )
// log("threads: failed to block sig");
// supply our own stack to make pthread_create() fast otherwise
// it has slowness issues with mmap()
// http://www.gossamer-threads.com/lists/linux/kernel/960227
t->m_stackSize = STACK_SIZE;
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
int32_t si = g_threads.getStack ( );
if ( si < 0 ) {
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
goto hadError;
}
t->m_si = si;
t->m_stack = s_stackPtrs [ si ];
// check it's aligned
if ( (uint64_t)(t->m_stack) & (THRPAGESIZE-1) ) {
char *xx=NULL;*xx=0; }
// UNprotect the whole stack so we can use it
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
PROT_READ | PROT_WRITE );
pthread_attr_t attr;
pthread_attr_init ( &attr );
pthread_attr_setstack ( &attr , t->m_stack , t->m_stackSize );
// debug
if ( g_conf.m_logDebugThread )
log("threads: pthread_create: "
"stack=%"PTRFMT" stacksize=%"INT64""
, (PTRTYPE)t->m_stack
, (int64_t)t->m_stackSize );
// this returns 0 on success, or the errno otherwise
g_errno = pthread_create ( &t->m_joinTid , &attr, startUp2 , t) ;
// if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 )
// log("threads: failed to unblock sig");
#endif
// we're back from pthread_create
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: Back from clone "
"t=0x%"PTRFMT" pid=%"INT32".",
(PTRTYPE)t,(int32_t)pid);
// return true on successful creation of the thread
if ( g_errno == 0 ) {
// good stuff, the thread needs a join now
t->m_needsJoin = true;
if ( count > 0 )
log("thread: Call to clone looped %"INT32" times.",count);
return true;
}
//#undef usleep
// forever loop
if ( g_errno == EAGAIN ) {
if ( count++ == 0 )
log("thread: Call to clone had error: %s.",
mstrerror(g_errno));
//usleep(1);
//goto loop2;
goto hadError;
}
// debug msg
// log("created tid=%"INT32"",(int32_t)t->m_tid);
// return true;
//}
// do again if g_errno is AGAIN
if ( g_errno == EINTR ) {
log("thread: Call to clone was interrupted. Trying again.");
goto loop;
}
//#ifndef _PTHREADS_
hadError:
//#endif
if ( g_errno )
log("thread: pthread_create had error = %s",
mstrerror(g_errno));
// it didn't launch, did it? dec the count.
m_launched--;
// priority-based LOCAL & GLOBAL launch counts
if ( realNiceness <= 0 ) m_hiLaunched--;
else if ( realNiceness == 1 ) m_mdLaunched--;
else if ( realNiceness >= 2 ) m_loLaunched--;
// . deal with the tiers for disk threads based on read sizes
// . WARNING: we cannot easily change tiers dynamically
// because it will throw these counts off
if ( m_threadType == DISK_THREAD ) {
// writes are special cases
if ( minIsWrite ) m_writesLaunched--;
//FileState *fs = (FileState *)m_entries[mini].m_state;
int32_t rs = t->m_bytesToGo; // 0;
//if ( fs ) rs = fs->m_bytesToGo;
if ( realNiceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
m_loLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_loLaunchedMed--;
else
m_loLaunchedSma--;
}
else if ( realNiceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
m_mdLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_mdLaunchedMed--;
else
m_mdLaunchedSma--;
}
else {
if ( rs > g_conf.m_medReadSize )
m_hiLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_hiLaunchedMed--;
else
m_hiLaunchedSma--;
}
}
// unset the flag
t->m_isLaunched = false;
// bail on other errors
log("thread: Call to clone had error: %s.", mstrerror(g_errno));
// correction on this error
log("thread: Try not using so much memory. "
"memused now =%"INT64".",g_mem.getUsedMem());
// free allocated buffer
if ( allocated ) {
mfree ( fs->m_allocBuf , fs->m_allocSize , "ThreadReadBuf" );
fs->m_buf = NULL;
}
// i'm not sure return value matters at this point? the thread
// is queued and hopefully will launch at some point
return false;
// if this is the direct thread request do not call callback, just
// return false, otherwise we get into an unexpected loop thingy
if ( t == te )
return log("thread: Returning false.");
// do it blocking
log("thread: Calling without thread. This will crash many times. "
"Please fix it.");
// return false so caller will re-do without thread!
// so BigFile::readwrite() will retry without thread and we won't
// get into a wierd loop thingy
if ( te ) return false;
// uint64_t profilerStart,profilerEnd;
// uint64_t statStart,statEnd;
//if (g_conf.m_profilingEnabled){
// address=(int32_t)t->m_startRoutine;
// g_profiler.startTimer(address, __PRETTY_FUNCTION__);
//}
t->m_startRoutine ( t->m_state , t );
//if (g_conf.m_profilingEnabled) {
// if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__))
// log(LOG_WARN,"admin: Couldn't add the fn %"INT32"",
// (int32_t)address);
//}
t->m_exitTime = gettimeofdayInMilliseconds();
// flag it for cleanup
t->m_isDone = true;
t->m_isLaunched = true;
// clean it up
cleanUp ( t , 200/*maxNiceness thread can have to be cleaned up*/ );
// ignore error
g_errno = 0;
// we kinda launched one, so say true here
return true; // false;
}
#ifndef PTHREADS
static bool s_firstTime = true;
#endif
// threads start up with cacnellation deferred until pthreads_testcancel()
// is called, but we never call that
int startUp ( void *state ) {
// get thread entry
ThreadEntry *t = (ThreadEntry *)state;
// no! now parent does since he is the one that needs to check it
// in the cleanup routine
// remember the pid
//t->m_pid = getpid();
// . sanity check
// . a thread can NOT call this
#ifndef PTHREADS
if ( getpid() == s_pid )
log("thread: Thread has same pid %i as main process.",s_pid);
#endif
// the cleanup handler
//pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state );
// our signal set
sigset_t set;
sigemptyset(&set);
//sigaddset(&set, SIGHUP);
// we need this here so if we break the gb process with gdb it
// does not kill the child processes when it sends out the SIGINT.
sigaddset(&set, SIGINT);
// ignore the real time signal, man...
//sigaddset(&set, GB_SIGRTMIN);
//pthread_sigmask(SIG_BLOCK, &set, NULL);
#ifndef PTHREADS
sigprocmask(SIG_BLOCK, &set, NULL);
#else
// turn these off in the thread
sigaddset ( &set , SIGALRM );
sigaddset ( &set , SIGVTALRM );
pthread_sigmask(SIG_BLOCK,&set,NULL);
#endif
// . what this lwp's priority be?
// . can range from -20 to +20
// . the lower p, the more cpu time it gets
// . this is really the niceness, not the priority
int p ;
// currently our niceness ranges from -1 to 2 for us
if ( t->m_niceness == 2 ) p = 19 ;
else if ( t->m_niceness == 1 ) p = 10 ;
else p = 0 ;
// remember the tid
//t->m_tid = pthread_self();
// debug
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"in startup pid=%"INT64" pppid=%"INT32"",
(PTRTYPE)t,(int64_t)getpidtid(),(int32_t)getppid());
// debug msg
//fprintf(stderr,"new thread tid=%"INT32" pid=%"INT32"\n",
// (int32_t)t->m_tid,(int32_t)t->m_pid);
// . set this process's priority
// . setpriority() is only used for SCHED_OTHER threads
//if ( pthread_setschedprio ( getpidtid() , p ) ) {
#ifndef PTHREADS
if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) {
// do we even support logging from a thread?
if ( s_firstTime ) {
log("thread: Call to "
"setpriority(%"UINT32",%i) in thread "
"failed: %s. This "
"message will not be repeated.",
(uint32_t)getpidtid(),p,mstrerror(errno));
s_firstTime = false;
}
errno = 0;
}
#endif
/*
sched_param sp;
int pp;
int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ;
if ( err ) log("thread: startUp: pthread_getschedparam: %s",
mstrerror(err));
// adjust the priority
p = 1;
sp.sched_priority = p;
// and reassign
err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ;
log("thread: startUp: pthread_setschedparam(%"INT32"): %s",
p,mstrerror(err));
*/
// somehow, it works ok when we have this print statement delay!!!
//fprintf(stderr,"thread pid = %"INT32"\n",(int32_t)getpid());
// . call the startRoutine
// . IMPORTANT: this can NEVER do non-blocking stuff
t->m_startRoutine ( t->m_state , t );
// pop it off
//pthread_cleanup_pop ( 1 /*execute handler?*/ );
// . now throw it on g_loop's sigqueue
// . the first 4 bytes of t->m_state should be t->m_callback
// . no! just use 1 to tell Loop to call g_threads.cleanUp()
// . TODO: pass in a ptr to cleanUpWrapper() instead of "t"
// . sival_int is only 4 bytes on a 64 bit arch...
sigval_t svt;
svt.sival_int = 1;//(int64_t)t ; //(int)(t->m_state); // fd;
// set exit time
int64_t now = gettimeofdayInMilliseconds();
t->m_preExitTime = now;
t->m_exitTime = now;
if ( g_conf.m_logDebugThread ) {
log(LOG_DEBUG,"thread: [t=0x%"PTRFMT"] "
"done with startup pid=%"INT64"",
(PTRTYPE)t,(int64_t)getpidtid());
}
// . now mark thread as ready for removal
// . do this BEFORE queing the signal since we're still a thread!!!
// . cleanUp() will take care of the rest
// . cleanUp() will call pthread_join on us!
t->m_isDone = true;
// let Loop.cpp's sigHandler_r call g_thread.cleanUp()
g_threads.m_needsCleanup = true;
//if(t->m_niceness > 0) g_threads.m_needBottom = true;
// . send the signal
// . if queue is full g_loop will get a SIGIO and call
// g_threads.cleanUp()/launchThreads() in it's doPoll() routine
// . we reserve GB_SIGRTMIN itself for unblocked interrupts for
// UdpServer
// . damn, it seems that if the queue is full pthread_join is unable
// to detach threads.... so sleep until it clears up
// . HEY! process is supposed to send us an ECHLD signal? right?
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
// . it does not send us a signal automatically, so we must do it!
// . i noticed during the linkdb rebuild we were not getting the signal
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
// i verified this breaks select() in Loop.cpp out of it's sleep
//fprintf(stderr,"threads sending SIGCHLD\n");
// try a sigchld now! doesn't it already do this? no...
// on 64bit arch pthread_t is 64bit and pid_t is 32bit
// i dont think this makes sense with pthreads any more, they don't
// use pid_t they use pthread_t
#ifndef PTHREADS
sigqueue ( (pid_t)(int64_t)s_pid, SIGCHLD, svt ) ;
#else
pthread_kill ( s_pid , SIGCHLD );
#endif
return 0;
}
// pthread_create uses this one
void *startUp2 ( void *state ) {
startUp ( state );
return NULL;
}
// watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we
// could be in a signal handler here
void ThreadQueue::suspendLowPriorityThreads() {
// disable for now
//return;
// just return if already suspended
if ( m_isLowPrioritySuspended ) return;
// log it
//log("ThreadQueue:: suspending low priority threads!!!!!!");
// set the flag so low priority threads won't be launched
m_isLowPrioritySuspended = true;
// . cancel any outstanding low priority threads that are running
// . no, only cancel if we run another disk thread
// . this will happen above in ThreadQueue::launchThread()
//cancelLowPriorityThreads();
}
// this is called by UdpServer::destroySlot() and should NOT be in a sig handlr
void ThreadQueue::resumeLowPriorityThreads() {
// disable for now
//return;
// bail if no need
if ( ! m_isLowPrioritySuspended ) return;
// turn em back on
m_isLowPrioritySuspended = false;
// just in case, though
//if ( g_inSigHandler ) {
// log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig "
// "handler.");
// return;
//}
// try to start up some threads then
g_threads.launchThreads();
}
void ThreadQueue::print ( ) {
// loop through candidates
for ( int32_t i = 0 ; i < m_top ; i++ ) {
ThreadEntry *t = &m_entries[i];
// print it
log(LOG_INIT,"thread: address=%"PTRFMT" "
"pid=%u state=%"PTRFMT" "
"occ=%i done=%i lnch=%i",
(PTRTYPE)t , t->m_pid ,
(PTRTYPE)t->m_state , t->m_isOccupied , t->m_isDone ,
t->m_isLaunched );
}
}
const char *ThreadQueue::getThreadType ( ) {
const char *s = "unknown";
if ( m_threadType == DISK_THREAD ) s = "disk";
if ( m_threadType == MERGE_THREAD ) s = "merge";
if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists";
if ( m_threadType == FILTER_THREAD ) s = "filter";
if ( m_threadType == SAVETREE_THREAD ) s = "savetree";
if ( m_threadType == UNLINK_THREAD ) s = "unlink";
if ( m_threadType == GENERIC_THREAD ) s = "generic";
return s;
}
#include "BigFile.h" // FileState class
int32_t Threads::getDiskThreadLoad ( int32_t maxNiceness , int32_t *totalToRead ) {
ThreadQueue *q = &m_threadQueues[DISK_THREAD];
ThreadEntry *e = q->m_entries;
int32_t top = q->m_top;
*totalToRead = 0;
int32_t n = 0;
// we really can't suspend threads cuz they might have the
// mutex lock so we just cancel the disk threads here then
for ( int32_t i = 0 ; i < top ; i++ ) {
// get entry
ThreadEntry *t = &e[i];
// skip if not occupied
if ( ! t->m_isOccupied ) continue;
// skip if it's nicer than what we want
if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue;
// skip if already done
if ( t->m_isDone ) continue;
// cast state data
FileState *fs = (FileState *) t->m_state;
// sometimes NULL, like from Sync.cpp's call
if ( ! fs ) continue;
// only remove read operations, since write operations get
// the fd up front
if ( t->m_doWrite ) continue;
// how many byte to do
//int32_t todo = fs->m_bytesToGo - fs->m_bytesDone;
int32_t todo = t->m_bytesToGo;
// multiply by 2 if a write
if ( t->m_doWrite ) todo *= 2;
// add to total bytes to read
*totalToRead += todo;
// count the thread
n++;
}
return n;
}
// when a BigFile is removed, much like we remove its pages from DiskPageCache
// we also remove any unlaunched reads/writes on it from the thread queue.
void ThreadQueue::removeThreads ( BigFile *bf ) {
// did the BigFile get hosed? that means our BigFile was
// unlinked or closed before we got a chance to launch the
// thread.
int32_t maxi = -1;
for ( int32_t i = 0 ; i < m_top ; i++ ) {
ThreadEntry *t = &m_entries[i];
// skip if not occupied
if ( ! t->m_isOccupied ) continue;
// get the filestate
FileState *fs = (FileState *)t->m_state;
// skip if NULL
if ( ! fs ) continue;
// skip if not match
if ( fs->m_this != (void *)bf ) continue;
// . let it finish writing if it is a write thread
// . otherwise, if we are exiting, we could free the
// buffer being written and cause the thread to core...
if ( fs->m_doWrite ) {
log(LOG_INFO,"disk: Not removing write thread.");
continue;
}
// . should we really? if we renamed the file to another,
// we need to recompute the offsets to read, etc.. so we
// should fail up to Msg5 with EFILECLOSED or something...
// . i think we did a rename and it got the same fd, and since
// we did not remove the launched or done threads after the
// rename, we're not sure if they read from the newly renamed
// file or not, and our read offset was for the old file...
// . at least set the error flag for doneWrapper()
fs->m_errno2 = EFILECLOSED;
// log it
logf(LOG_INFO,"disk: Removing/flagging operation in thread "
"queue. fs=0x%"PTRFMT"", (PTRTYPE)fs);
// skip if already done
if ( t->m_isDone ) continue;
// skip if launched
if ( t->m_isLaunched ) continue;
// note in the log it is launched
log(LOG_INFO,"disk: Thread is launched.");
// tell donewrapper what happened
fs->m_errno = EFILECLOSED;
g_errno = EFILECLOSED;
// note it
//log(LOG_INFO,"disk: Removing operation from thread queue.");
// remove it from the thread queue
t->m_isDone = true;
t->m_isLaunched = false;
t->m_isOccupied = false;
// keep track
maxi = i;
makeCallback ( t );
}
// do we have to decrement top
if ( m_top == maxi + 1 )
while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--;
g_errno = 0;
}
void Threads::printState() {
int64_t now = gettimeofdayInMilliseconds();
for ( int32_t i = 0 ; i < m_numQueues; i++ ) {
ThreadQueue *q = &m_threadQueues[i];
int32_t loActive = q->m_loLaunched - q->m_loReturned;
int32_t mdActive = q->m_mdLaunched - q->m_mdReturned;
int32_t hiActive = q->m_hiLaunched - q->m_hiReturned;
int32_t total = loActive + mdActive + hiActive;
if( total == 0) continue;
log(LOG_TIMING,
"admin: Thread counts: type:%s "
"%"INT32":low %"INT32":med %"INT32":high %"INT32":total",
q->getThreadType(), loActive, mdActive, hiActive, total);
for ( int32_t j = 0 ; j < q->m_top ; j++ ) {
ThreadEntry *t = &q->m_entries[j];
if(!t->m_isOccupied) continue;
if(t->m_isDone) {
log(LOG_TIMING,
"admin: Thread -done- "
"nice: %"INT32" "
"totalTime: %"INT64" (ms) "
"queuedTime: %"INT64"(ms) "
"runTime: %"INT64"(ms) "
"cleanup: %"INT64"(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
t->m_launchedTime - t->m_queuedTime,
t->m_exitTime - t->m_launchedTime,
now - t->m_exitTime,
g_profiler.
getFnName((PTRTYPE)t->m_callback));
continue;
}
if(t->m_isLaunched) {
log(LOG_TIMING,
"admin: Thread -launched- "
"nice: %"INT32" "
"totalTime: %"INT64"(ms) "
"queuedTime: %"INT64"(ms) "
"runTime: %"INT64"(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
t->m_launchedTime - t->m_queuedTime,
now - t->m_launchedTime,
g_profiler.
getFnName((PTRTYPE)t->m_callback));
continue;
}
log(LOG_TIMING,
"admin: Thread -queued- "
"nice: %"INT32" "
"queueTime: %"INT64"(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
g_profiler.getFnName((PTRTYPE)t->m_callback));
}
}
}