open-source-search-engine/Threads.cpp

2592 lines
82 KiB
C++
Raw Normal View History

2013-08-03 00:12:24 +04:00
#include "gb-include.h"
#include "BigFile.h"
#include "Threads.h"
#include "Errno.h"
#include "Loop.h"
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/types.h> // getuid()/pid_t/getpid()
#include <sys/wait.h> // waitpid()
#include "Rdb.h" // g_mergeUrgent
#include <sched.h> // clone()
//#include "Msg16.h" // g_pid g_ticker
#include "XmlDoc.h" // g_pid g_ticker
#include "Profiler.h"
#include "Stats.h"
#include "Process.h"
// try using pthreads again
2013-09-14 01:37:35 +04:00
//#define PTHREADS
2013-08-03 00:12:24 +04:00
// use these stubs so libplotter.a works
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
int pthread_mutex_lock (pthread_mutex_t *t ) { return 0; }
int pthread_mutex_unlock (pthread_mutex_t *t ) { return 0; }
#else
#include <pthread.h>
2013-08-03 00:12:24 +04:00
pthread_attr_t s_attr;
#endif
// main process id (or thread id if using pthreads)
static pid_t s_pid = (pid_t) -1;
pid_t getpidtid() {
2013-09-14 01:37:35 +04:00
#ifdef PTHREADS
// gettid() is a bit newer so not in our libc32...
// so kinda fake it. return the "thread" id, not process id.
// Threads::amThread() should still work based on thread ids because
// the main process has a unique thread id as well.
return (pid_t)pthread_self();
#else
return getpid();
#endif
}
2013-08-03 00:12:24 +04:00
// BIG PROBLEM:
// When doing pthread_join it doesn't always ensure a thread doesn't go
// zombie. It seems like when SIGIOs are generated by sigqueue() because
// of a full signal queue, threads start going zombie on me.
// PROBLEM #1: with using the "state" to identify a thread in the queue.
// Often a caller make s a disk access using a certain "state" var.
// He gets control back when cleanUp() calls t->m_callback. And he launches
// another read thread in there, which calls Threads::exit(state) before
// the original thread entry's m_isOccupied is set to false. So
// Threads::exit(state) mistakenly picks the thread entry from the former
// thread picks it has the same state as its thread!
// PROBLEM #2: technically, a thread in Threads::exit() can set its m_done
// bit then send the signal. When the Loop sig handler sees the done bit is
// set it decrements the global thread count even though the thread may
// not have actually exited yet!! I spawned like 1,000 threads this way!!!!!
// a global class extern'd in .h file
Threads g_threads;
// . call this before calling a ThreadEntry's m_startRoutine
// . allows us to set the priority of the thread
int startUp ( void *state ) ;
void *startUp2 ( void *state ) ;
// JAB: warning abatement
//static void launchThreadsWrapper ( int fd , void *state );
static void killStalledFiltersWrapper ( int fd , void *state );
static void makeCallback ( ThreadEntry *t ) ;
// is the caller a thread?
bool Threads::amThread () {
if ( s_pid == -1 ) return false;
return ( getpidtid() != s_pid );
2013-08-03 00:12:24 +04:00
}
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
static long s_bad = 0;
static long s_badPid = -1;
#endif
#define MAX_PID 32767
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
static int s_errno ;
static int s_errnos [ MAX_PID + 1 ];
2013-08-03 00:12:24 +04:00
// this was improvised from linuxthreads/errno.c
//#define CURRENT_STACK_FRAME ({ char __csf; &__csf; })
// WARNING: you MUST compile with -DREENTRANT for this to work
int *__errno_location (void) {
long pid = (long) getpid();
//if ( pid == s_pid ) return &g_errno;
if ( pid <= (long)MAX_PID ) return &s_errnos[pid];
s_bad++;
s_badPid = pid;
return &s_errno;
}
2013-08-03 00:12:24 +04:00
#endif
// this also limit the maximum number of outstanding (live) threads
#define MAX_STACKS 20
// stack must be page aligned for mprotect
#define PAGESIZE 8192
// how much of stack to use as guard space
#define GUARDSIZE (32*1024)
// . crashed in saving with 800k, so try 1M
// . must be multiple of PAGESIZE
#define STACK_SIZE ((512+256) * 1024)
// jeta was having some problems, but i don't think they were related to
// this stack size of 512k, but i will try boosting to 800k anyway.
//#define STACK_SIZE (512 * 1024)
// 256k was not enough, try 512k
//#define STACK_SIZE (256 * 1024)
// at 128k (and even 200k) some threads do not return! why?? there's
// obviously some stack overwriting going on!
//#define STACK_SIZE (128 * 1024)
static char *s_stackAlloc = NULL;
static long s_stackAllocSize;
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
static char *s_stack = NULL;
static long s_stackSize;
static char *s_stackPtrs [ MAX_STACKS ];
#endif
static long s_next [ MAX_STACKS ];
static long s_head ;
// returns NULL if none left
long Threads::getStack ( ) {
if ( s_head == -1 ) return -1;
long i = s_head;
s_head = s_next [ s_head ];
return i;
}
void Threads::returnStack ( long si ) {
if ( s_head == -1 ) { s_head = si; s_next[si] = -1; return; }
s_next[si] = s_head;
s_head = si;
}
void Threads::reset ( ) {
if ( s_stackAlloc ) {
mprotect(s_stackAlloc, s_stackAllocSize, PROT_READ|PROT_WRITE);
mfree ( s_stackAlloc , s_stackAllocSize,
"ThreadStack");
}
s_stackAlloc = NULL;
for ( long i = 0 ; i < MAX_THREAD_QUEUES ; i++ )
m_threadQueues[i].reset();
}
2014-04-07 02:57:38 +04:00
Threads::Threads ( ) {
m_numQueues = 0;
m_initialized = false;
}
2014-04-11 10:04:00 +04:00
void Threads::setPid ( ) {
// set s_pid to the main process id
2013-09-14 01:37:35 +04:00
#ifdef PTHREADS
s_pid = pthread_self();
2014-04-07 02:57:38 +04:00
//log(LOG_INFO,
// "threads: main process THREAD id = %lu",(long unsigned)s_pid);
pthread_t tid = pthread_self();
sched_param param;
int policy;
// scheduling parameters of target thread
pthread_getschedparam ( tid, &policy, &param);
2014-04-07 02:57:38 +04:00
//log(LOG_INFO,
// "threads: min/max thread priority settings = %li/%li (policy=%li)",
// (long)sched_get_priority_min(policy),
// (long)sched_get_priority_max(policy),
// (long)policy);
#else
2013-08-03 00:12:24 +04:00
s_pid = getpid();
#endif
2014-04-11 10:04:00 +04:00
}
bool Threads::init ( ) {
if ( m_initialized ) return true;
m_initialized = true;
m_needsCleanup = false;
//m_needBottom = false;
// sanity check
if ( sizeof(pthread_t) > sizeof(pid_t) ) { char *xx=NULL;*xx=0; }
2014-04-11 10:04:00 +04:00
setPid();
2013-08-03 00:12:24 +04:00
#ifdef _STACK_GROWS_UP
return log("thread: Stack growing up not supported.");
#endif
//g_conf.m_logDebugThread = true;
// . damn, this only applies to fork() calls, i guess
// . a quick a dirty way to restrict # of threads so we don't explode
/*
struct rlimit lim;
lim.rlim_max = 100;
if ( setrlimit(RLIMIT_NPROC,&lim) )
log("thread::init: setrlimit: %s", mstrerror(errno) );
else
log("thread::init: set max number of processes to 100");
*/
// allow threads until disabled
m_disabled = false;
// # of low priority threads launched and returned
//m_hiLaunched = 0;
//m_hiReturned = 0;
//m_loLaunched = 0;
//m_loReturned = 0;
//long m_queryMaxBigDiskThreads ; // > 1M read
//long m_queryMaxMedDiskThreads ; // 100k - 1M read
//long m_queryMaxSmaDiskThreads ; // < 100k per read
// categorize the disk read sizes by these here
g_conf.m_bigReadSize = 0x7fffffff;
g_conf.m_medReadSize = 1000000;
g_conf.m_smaReadSize = 100000;
// . register a sleep wrapper to launch threads every 30ms
// . somtimes a bunch of threads mark themselves as done and the
// cleanUp() handler sees them as all still launched so it doesn't
// launch any new ones
//if ( ! g_loop.registerSleepCallback(30,NULL,launchThreadsWrapper))
// return log("thread: Failed to initialize timer callback.");
if ( ! g_loop.registerSleepCallback(1000,NULL,
killStalledFiltersWrapper,0))
return log("thread: Failed to initialize timer callback2.");
// debug
//log("thread: main process has pid=%li",(long)s_pid);
// . set priority of the main process to 0
// . setpriority() only applies to SCHED_OTHER threads
// . priority of threads with niceness 0 will be 0
// . priority of threads with niceness 1 will be 10
// . priority of threads with niceness 2 will be 20
// . see 'man sched_setscheduler' for detail scheduling info
// . no need to call getpid(), 0 for pid means the current process
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
if ( setpriority ( PRIO_PROCESS, getpid() , 0 ) < 0 )
log("thread: Call to setpriority failed: %s.",
mstrerror(errno));
2013-09-14 01:37:35 +04:00
#endif
2013-08-03 00:12:24 +04:00
// make multiplier higher for raids, can do more seeks
//long m = 1;
//#ifdef _LARS_
//m = 3;
//#endif
// register the types of threads here instead of in main.cpp
//if ( ! g_threads.registerType ( DISK_THREAD ,m*20/*maxThreads*/))
// try running blaster with 5 threads and you'll
// . see like a double degrade in performance for some reason!!
// . TODO: why?
// . well, this should be controlled g_conf.m_maxSpiderDiskThreads
// for niceness 1+ threads, and g_conf.m_maxPriorityDiskThreads for
// niceness 0 and below disk threads
// . 100 maxThreads out at a time, 32000 can be queued
if ( ! g_threads.registerType ( DISK_THREAD ,100/*maxThreads*/,32000))
return log("thread: Failed to register thread type." );
// . these are used by Msg5 to merge what it reads from disk
// . i raised it from 1 to 2 and got better response time from Msg10
// . i leave one open in case one is used for doing a big merge
// with high niceness cuz it would hold up high priority ones!
// . TODO: is there a better way? cancel it when UdpServer calls
// Threads::suspendLowPriorityThreads() ?
// . this used to be 2 but now defaults to 10 in Parms.cpp. i found
// i have less long gray lines in the performance graph when i
// did that on trinity.
long max2 = g_conf.m_maxCpuMergeThreads;
if ( max2 < 1 ) max2 = 1;
if ( ! g_threads.registerType ( MERGE_THREAD , max2,1000) )
2013-08-03 00:12:24 +04:00
return log("thread: Failed to register thread type." );
// will raising this from 1 to 2 make it faster too?
// i raised since global specs new servers have 2 (hyperthreaded?) cpus
long max = g_conf.m_maxCpuThreads;
if ( max < 1 ) max = 1;
if ( ! g_threads.registerType ( INTERSECT_THREAD,max,200) )
return log("thread: Failed to register thread type." );
// filter thread spawned to call popen() to filter an http reply
if ( ! g_threads.registerType ( FILTER_THREAD , 1/*maxThreads*/,300) )
return log("thread: Failed to register thread type." );
// RdbTree uses this to save itself
if ( ! g_threads.registerType ( SAVETREE_THREAD,1/*maxThreads*/,100) )
return log("thread: Failed to register thread type." );
// . File.cpp spawns a rename thread for doing renames and unlinks
// . doing a tight merge on titldb can be ~250 unlinks
// . MDW up from 1 to 30 max, after doing a ddump on 3000+ collections
// it was taking forever to go one at a time through the unlink
// thread queue. seemed like a 1 second space between unlinks.
// 1/23/1014
if ( ! g_threads.registerType ( UNLINK_THREAD,30/*maxThreads*/,3000) )
2013-08-03 00:12:24 +04:00
return log("thread: Failed to register thread type." );
// generic multipurpose
if ( ! g_threads.registerType (GENERIC_THREAD,100/*maxThreads*/,100) )
return log("thread: Failed to register thread type." );
2013-11-16 01:47:05 +04:00
// for call SSL_accept() which blocks for 10ms even when socket
// is non-blocking...
//if (!g_threads.registerType (SSLACCEPT_THREAD,20/*maxThreads*/,100))
// return log("thread: Failed to register thread type." );
2013-08-03 00:12:24 +04:00
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
// sanity check
if ( GUARDSIZE >= STACK_SIZE )
return log("thread: Stack guard size too big.");
// not more than this outstanding
long maxThreads = 0;
for ( long i = 0 ; i < m_numQueues ; i++ )
maxThreads += m_threadQueues[i].m_maxLaunched;
// limit to stack we got
if ( maxThreads > MAX_STACKS ) maxThreads = MAX_STACKS;
// allocate the stack space
s_stackAllocSize = STACK_SIZE * maxThreads + PAGESIZE ;
// clear stack to help check for overwrites
s_stackAlloc = (char *) mcalloc ( s_stackAllocSize , "ThreadStack" );
if ( ! s_stackAlloc )
return log("thread: Unable to allocate %li bytes for thread "
"stacks.", s_stackAllocSize);
log(LOG_INIT,"thread: Using %li bytes for %li thread stacks.",
s_stackAllocSize,maxThreads);
// align
s_stack = (char *)(((int) s_stackAlloc + PAGESIZE-1) & ~(PAGESIZE-1));
// new size
s_stackSize = s_stackAllocSize - (s_stack - s_stackAlloc);
// protect the whole stack while not in use
if ( mprotect ( s_stack , s_stackSize , PROT_NONE ) )
log("thread: Call to mprotect failed: %s.",mstrerror(errno));
// test
//s_stack[0] = 1;
// init the linked list
for ( long i = 0 ; i < MAX_STACKS ; i++ ) {
if ( i == MAX_STACKS - 1 ) s_next[i] = -1;
else s_next[i] = i + 1;
s_stackPtrs[i] = s_stack + STACK_SIZE * i;
}
s_head = 0;
// don't do real time stuff for now
return true;
#else
// . keep stack size small, 128k
// . if we get problems, we'll increase this to 256k
// . seems like it grows dynamically from 4K to up to 2M as needed
//if ( pthread_attr_setstacksize ( &s_attr , (size_t)128*1024 ) )
// return log("thread: init: pthread_attr_setstacksize: %s",
// mstrerror(errno));
//pthread_attr_setschedparam ( &s_attr , PTHREAD_EXPLICIT_SCHED );
//pthread_attr_setscope ( &s_attr , PTHREAD_SCOPE_SYSTEM );
return true;
#endif
}
// all types should be registered in main.cpp before any threads launch
bool Threads::registerType ( char type , long maxThreads , long maxEntries ) {
// return false and set g_errno if no more room
if ( m_numQueues >= MAX_THREAD_QUEUES ) {
g_errno = EBUFTOOSMALL;
return log(LOG_LOGIC,"thread: registerType: Too many thread "
"queues");
}
// initialize the ThreadQueue class for this type
if ( ! m_threadQueues[m_numQueues].init(type,maxThreads,maxEntries))
return false;
// we got one more queue now
m_numQueues++;
return true;
}
long Threads::getNumThreadsOutOrQueued() {
long n = 0;
for ( long i = 0 ; i < m_numQueues ; i++ ) {
// skip INTERSECT_THREAD, used to intersect termlists to
// resolve a query. i've seen these get stuck in an infinite
// loop sometimes.
if ( i == INTERSECT_THREAD ) continue;
// skip filter threads, those get stuck sometimes
if ( i == FILTER_THREAD ) continue;
// tally up all other threads
n += m_threadQueues[i].getNumThreadsOutOrQueued();
}
return n;
}
// . returns false (and may set errno) if failed to launch a thread
// . returns true if thread added to queue successfully
// . may be launched instantly or later depending on # of threads in the queue
bool Threads::call ( char type ,
long niceness ,
void *state ,
void (* callback )(void *state,ThreadEntry *t) ,
void *(* startRoutine)(void *state,ThreadEntry *t) ) {
// debug
//return false;
#ifdef _VALGRIND_
return false;
#endif
// don't spawn any if disabled
if ( m_disabled ) return false;
if ( ! g_conf.m_useThreads ) return false;
2014-04-07 02:57:38 +04:00
if ( ! m_initialized && ! init() )
return log("db: Threads init failed." );
2013-08-03 00:12:24 +04:00
// . sanity check
// . a thread can NOT call this
//if ( getpid() != s_pid ) {
// fprintf(stderr,"thread: call: bad engineer\n");
// ::exit(-1);
//}
// don't launch for now
//return false;
// . sanity check
// . ensure top 4 bytes of state is the callback
//if ( *(long *)state != (long)callback ) {
// g_errno = EBADENGINEER;
// sleep(50000);
// return log("thread: call: top 4 bytes of state != callback");
//}
// debug msg
//log("adding thread to queue, type=%li",(long)type);
// find the type
long i;
for ( i = 0 ; i < m_numQueues ; i++ )
if ( m_threadQueues[i].m_threadType == type ) break;
// bitch if type not added via registerType() call
if ( i == m_numQueues ) {
g_errno = EBADENGINEER;
return log(LOG_LOGIC,"thread: addtoqueue: Unregistered "
2014-04-07 02:57:38 +04:00
"thread type %li",(long)type);
2013-08-03 00:12:24 +04:00
}
// debug msg
//log("thread: call: adding entry for thread");
// . add to this queue
// . returns NULL and sets g_errno on error
ThreadEntry *t = m_threadQueues[i].addEntry(niceness,state,
callback,startRoutine);
if ( ! t ) return log("thread: Failed to add entry to thread pool: "
"%s.",mstrerror(g_errno));
// debug msg
//log("added");
// clear g_errno
//g_errno = 0;
// . try to launch as many threads as we can
// . this sets g_errno on error
// . if it has an error, just ignore it, our thread is queued
m_threadQueues[i].launchThread2 ( NULL );
//if ( ! m_threadQueues[i].launchThread2 ( t ) && g_errno ) {
// log("thread: failed thread launch: %s",mstrerror(g_errno));
// return false;
//}
2013-08-03 00:12:24 +04:00
// return false if there was an error launching the thread
//if ( g_errno ) return false;
// clear g_errno
g_errno = 0;
// success
return true;
}
// static void launchThreadsWrapper ( int fd , void *state ) {
// // debug msg
// //if ( g_conf.m_timingDebugEnabled )
// // log("thread: launchThreadsWrapper: entered");
// // clean up
// g_threads.cleanUp(NULL,1000);
// // and launch
// g_threads.launchThreads();
// }
static void killStalledFiltersWrapper ( int fd , void *state ) {
// bail if no pid
if ( g_pid == -1 ) return;
// . only kill after ticker reaches a count of 30
// . we are called once every second, so inc it each time
long timeout = g_filterTimeout;
if ( timeout <= 0 ) timeout = 30;
if ( g_ticker++ < timeout ) return;
// debug
log("threads: killing stalled filter process of age %li "
"seconds and pid=%li.",g_ticker,(long)g_pid);
// kill him
int err = kill ( g_pid , 9 );
// don't kill again
g_pid = -1;
if ( err != 0 ) log("threads: kill filter: %s", mstrerror(err) );
}
// . called by g_loop in Loop.cpp after getting a SI_QUEUE signal that it
// is from when a thread exited
// . we put that signal there using sigqeueue() in Threads::exit()
// . this way another thread can be launched right away
long Threads::launchThreads ( ) {
// try launching from each queue
long numLaunched = 0;
for ( long i = m_numQueues - 1 ; i >= 0 ; i-- ) {
// clear g_errno
g_errno = 0;
// launch as many threads as we can from queue #i
while ( m_threadQueues[i].launchThread2(NULL) ) numLaunched++;
2013-08-03 00:12:24 +04:00
// continue if no g_errno set
if ( ! g_errno ) continue;
// otherwise bitch about it
log("thread: Failed to launch thread: %s.",mstrerror(g_errno));
}
// clear g_errno
g_errno = 0;
return numLaunched;
}
// . will cancel currently running low priority threads
// . will prevent any low priority threads from launching
// . will only cancel disk threads for now
void Threads::suspendLowPriorityThreads() {
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: SUSPENDING LOW-PRIORITY THREADS.");
// just cancel disk threads for now
for ( long i = 0 ; i < m_numQueues; i++ )
m_threadQueues[i].suspendLowPriorityThreads();
}
void Threads::resumeLowPriorityThreads() {
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: RESUMING LOW-PRIORITY THREADS.");
for ( long i = 0 ; i < m_numQueues; i++ )
m_threadQueues[i].resumeLowPriorityThreads();
}
//void Threads::cancelLowPriorityThreads () {
// for ( long i = 0 ; i < m_numQueues; i++ )
// m_threadQueues[i].cancelLowPriorityThreads();
//}
///////////////////////////////////////////////////////////////////////
// functions for ThreadQueue
///////////////////////////////////////////////////////////////////////
ThreadQueue::ThreadQueue ( ) {
m_entries = NULL;
m_entriesSize = 0;
}
void ThreadQueue::reset ( ) {
if ( m_entries ) mfree ( m_entries , m_entriesSize , "Threads" );
m_entries = NULL;
m_top = 0;
}
bool ThreadQueue::init ( char threadType, long maxThreads, long maxEntries ) {
m_threadType = threadType;
m_launched = 0;
m_returned = 0;
m_maxLaunched = maxThreads;
// # of low priority threads launched and returned
m_hiLaunched = 0;
m_hiReturned = 0;
m_mdLaunched = 0;
m_mdReturned = 0;
m_loLaunched = 0;
m_loReturned = 0;
// we now count write threads so we can limit that
m_writesLaunched = 0;
m_writesReturned = 0;
// these are for disk threads, which we now limit based on read sizes
m_hiLaunchedBig = 0;
m_hiReturnedBig = 0;
m_mdLaunchedBig = 0;
m_mdReturnedBig = 0;
m_loLaunchedBig = 0;
m_loReturnedBig = 0;
m_hiLaunchedMed = 0;
m_hiReturnedMed = 0;
m_mdLaunchedMed = 0;
m_mdReturnedMed = 0;
m_loLaunchedMed = 0;
m_loReturnedMed = 0;
m_hiLaunchedSma = 0;
m_hiReturnedSma = 0;
m_mdLaunchedSma = 0;
m_mdReturnedSma = 0;
m_loLaunchedSma = 0;
m_loReturnedSma = 0;
//m_entriesUsed = 0;
m_top = 0;
m_isLowPrioritySuspended = false;
// alloc space for entries
m_maxEntries = maxEntries;
m_entriesSize = sizeof(ThreadEntry)*m_maxEntries;
m_entries = (ThreadEntry *)mmalloc ( m_entriesSize , "Threads" );
if ( ! m_entries ) return log("thread: Failed to allocate %li bytes "
"for thread queue.",m_entriesSize);
// debug msg
//log("INIT CALLED. setting all m_isDone to 1.");
// clear m_isOccupied et al for new guys
//for ( long i = 0 ; i < MAX_THREAD_ENTRIES ; i++ ) {
for ( long i = 0 ; i < m_maxEntries ; i++ ) {
m_entries[i].m_isOccupied = false;
m_entries[i].m_isLaunched = false;
m_entries[i].m_isDone = true;
m_entries[i].m_qnum = threadType;
m_entries[i].m_stack = NULL;
}
return true;
}
long ThreadQueue::getNumThreadsOutOrQueued() {
long n = m_launched - m_returned;
for ( long i = 0 ; i < m_maxEntries ; i++ ) {
ThreadEntry *e = &m_entries[i];
if ( ! e->m_isOccupied ) continue;
if ( e->m_isLaunched ) continue;
if ( e->m_isDone ) continue;
// do not count "reads", only count writes
//if ( m_threadType == DISK_THREAD && e->m_state ) {
// FileState *fs = (FileState *)e->m_state;
// if ( fs->m_doWrite ) continue;
//}
}
return n;
}
// return NULL and set g_errno on error
ThreadEntry *ThreadQueue::addEntry ( long niceness ,
void *state ,
void (* callback )(void *state,
ThreadEntry *t) ,
void *(* startRoutine)(void *state,
ThreadEntry *t) ) {
// if we are 90% full and niceness is > 0, knock it off
long max = m_maxEntries;
if ( m_threadType == DISK_THREAD && niceness > 0 ) {
max = (m_maxEntries * 90) / 100;
if ( max <= 0 ) max = 1;
}
// debug test
//if ( rand() %10 == 1 ) { g_errno = ENOTHREADSLOTS; return NULL; }
// get first available entry, not in use
long i;
//for ( i = 0 ; i < MAX_THREAD_ENTRIES ; i++ )
for ( i = 0 ; i < max ; i++ )
if ( ! m_entries[i].m_isOccupied ) break;
// caution
//if ( i >= MAX_THREAD_ENTRIES ) {
if ( i >= max ) {
g_errno = ENOTHREADSLOTS;
static time_t s_time = 0;
time_t now = getTime();
if ( now - s_time > 5 ) {
log("thread: Could not add thread to queue. Already "
"have %li entries.",max);
s_time = now;
}
return NULL;
}
// debug msg
//fprintf(stderr,"addEntry my pid=%lu\n", (long)getpid() );
// get an available entry
ThreadEntry *t = &m_entries [ i ];
// debug msg
//log("claiming entry state=%lu, occupied=%li",(long)t->m_state,
// (long)t->m_isOccupied);
// stick it in
t->m_niceness = niceness;
t->m_state = state;
t->m_callback = callback;
t->m_startRoutine = startRoutine;
t->m_isOccupied = true;
t->m_isCancelled = false;
t->m_stack = NULL;
// debug msg
//log("setting t=%lu m_isDone to 0", (long)t );
t->m_isDone = false;
t->m_isLaunched = false;
t->m_queuedTime = gettimeofdayInMilliseconds();
t->m_readyForBail = false;
t->m_allocBuf = NULL;
t->m_allocSize = 0;
t->m_errno = 0;
// and when the ohcrap callback gets called and the thread
// is cleaned up it will check the FileState readsize and
// m_isWrite to see which launch counts to decrement, so
// since FileState will be corrupted, we need to store
// this info directly into the thread entry.
if ( m_threadType == DISK_THREAD && t->m_state ) {
FileState *fs = (FileState *)t->m_state;
t->m_bytesToGo = fs->m_bytesToGo;
t->m_doWrite = fs->m_doWrite ;
}
else {
t->m_bytesToGo = 0;
t->m_doWrite = false;
}
// inc the used count
//m_entriesUsed++;
// debug msg
//log("m_entriesUsed now %li",m_entriesUsed);
// might have to inc top as well
if ( i == m_top ) m_top++;
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%lx] queued %s thread for launch. "
"niceness=%lu. ", (unsigned long)t,
getThreadType(), niceness );
2013-08-03 00:12:24 +04:00
// success
return t;
}
long Threads::timedCleanUp (long maxTime, long niceness) {
if ( ! m_needsCleanup ) return 0;
//if ( g_inSigHandler ) return 0;
long long startTime = gettimeofdayInMillisecondsLocal();
long long took = 0;
if ( niceness >= MAX_NICENESS ) m_needsCleanup = false;
//for ( long i = -1 ; i <= niceness ; i++ ) {
for ( long i = 0 ; i <= niceness ; i++ ) {
for ( long j = 0 ; j < m_numQueues ; j++ )
m_threadQueues[j].timedCleanUp ( i );
launchThreads();
took = startTime - gettimeofdayInMillisecondsLocal();
if ( took <= maxTime ) continue;
// ok, we have to cut if short...
m_needsCleanup = true;
break;
}
return took;
}
bool Threads::isHittingFile ( BigFile *bf ) {
return m_threadQueues[DISK_THREAD].isHittingFile(bf);
}
bool ThreadQueue::isHittingFile ( BigFile *bf ) {
// loop through candidates
for ( long i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// must not be done
if ( t->m_isDone ) continue;
// must be launched.. really??
//if ( ! t->m_isLaunched ) continue;
// must be a read
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
// shortcut
FileState *fs = (FileState *)t->m_state;
// get bigfile ptr
if ( fs->m_this == bf ) return true;
}
return false;
}
void Threads::bailOnReads ( ) {
m_threadQueues[DISK_THREAD].bailOnReads();
}
// Process.cpp calls these callbacks before their time in order to
// set EDISKSTUCK
void ThreadQueue::bailOnReads ( ) {
// note it
log("threads: bypassing read threads");
// loop through candidates
for ( long i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
//if ( ! t->m_isLaunched ) continue;
// must be niceness 0
if ( t->m_niceness != 0 ) continue;
// must not be done
if ( t->m_isDone ) continue;
// must not have already called callback
if ( t->m_callback == ohcrap ) continue;
// must be a read
if ( t->m_startRoutine != readwriteWrapper_r ) continue;
// shortcut
FileState *fs = (FileState *)t->m_state;
// do not stop writes
if ( fs->m_doWrite ) continue;
// must be niceness 0 too!
if ( fs->m_niceness != 0 ) continue;
// what is this? unlaunched...
//if ( t->m_pid == 0 ) continue;
// can only bail on a thread after it copies its FileState
// class into its stack so we can bypass it and free the
// original FileState without causing a core. if thread
// is not yet launched we have to call the callback here too
// otherwise it never gets launched until the disk is unstuck!
if ( ! t->m_readyForBail && t->m_isLaunched ) continue;
// set error
t->m_errno = EDISKSTUCK;
// set this too
g_errno = EDISKSTUCK;
// do not allow caller to free the alloc'd buf in case
// its read finally comes through!
t->m_allocBuf = fs->m_allocBuf;
t->m_allocSize = fs->m_allocSize;
fs->m_allocBuf = NULL;
fs->m_allocSize = 0;
// call it
t->m_callback ( t->m_state , t );
// do not re-call it...
t->m_callback = ohcrap;
// invalidate state (FileState usually)
t->m_state = NULL;
// do not launch if not yet launched
if ( t->m_isLaunched ) continue;
// delete him if not yet launched, otherwise we try to
// launch it later with a corrupted/unstable FileState...
// and that causes our launch counts to get out of whack i
// think...
t->m_isOccupied = false;
// note it
log("threads: bailing unlaunched thread");
// do we have to decrement top
if ( m_top == i + 1 )
while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
m_top--;
}
}
// BigFile.cpp's readwriteWrapper_r() ThreadEntry::m_callback gets set to
// ohcrap() because it was taking too long to do its read and we prematurely
// called its callback above in bailOnReads(). In that case we still have to
// free the disk read buffer which was never used. And doneWrapper() in
// BigFile.cpp is never called.
void ohcrap ( void *state , ThreadEntry *t ) {
// free the read buffer here then
if ( t->m_allocBuf )
mfree ( t->m_allocBuf , t->m_allocSize , "RdbScan" );
log("threads: got one");
}
// . cleans up any threads that have exited
// . their m_isDone should be set to true
// . don't process threads whose niceness is > maxNiceness
bool ThreadQueue::timedCleanUp ( long maxNiceness ) {
// top:
long numCallbacks = 0;
// loop through candidates
for ( long i = 0 ; i < m_top; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// skip if not qualified
if ( t->m_niceness > maxNiceness ) continue;
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
if ( ! t->m_isLaunched ) continue;
// . we were segfaulting right here before because the thread
// was setting t->m_pid and at this point it was not
// set so t->m_pid was a bogus value
// . thread may have seg faulted, in which case sigbadhandler()
// in Loop.cpp will get it and set errno to 0x7fffffff
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
// MDW: i hafta take this out because the errno_location thing
// is not working on the newer gcc
if ( ! t->m_isDone && t->m_pid >= 0 &&
s_errnos [t->m_pid] == 0x7fffffff ) {
log("thread: Got abnormal thread termination. Seems "
"like the thread might have cored.");
s_errnos[t->m_pid] = 0;
goto again;
}
#endif
// skip if not done yet
if ( ! t->m_isDone ) continue;
2013-09-14 01:37:35 +04:00
#ifdef PTHREADS
2013-08-03 00:12:24 +04:00
// if pthread_create() failed it returns the errno and we
// needsJoin is false, so do not try to join
// to a thread if we did not create it, lest pthread_join()
// cores
if ( t->m_needsJoin ) {
// . join up with that thread
// . damn, sometimes he can block forever on his
// call to sigqueue(),
long status = pthread_join ( t->m_joinTid , NULL );
if ( status != 0 ) {
log("threads: pthread_join %li = %s (%li)",
(long)t->m_joinTid,mstrerror(status),
status);
}
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined1 with t=0x%lx "
"jointid=0x%lx.",
(long)t,(long)t->m_joinTid);
}
2013-08-03 00:12:24 +04:00
#else
again:
int status ;
pid_t pid = waitpid ( t->m_pid , &status , 0 );
int err = errno;
// debug the waitpid
if ( g_conf.m_logDebugThread || g_process.m_exiting )
log(LOG_DEBUG,"thread: Waiting for t=0x%lx pid=%li.",
(unsigned long)t,(long)t->m_pid);
2013-08-03 00:12:24 +04:00
// bitch and continue if join failed
if ( pid != t->m_pid ) {
// waitpid() gets interrupted by various signals so
// we need to repeat (SIGCHLD?)
if ( err == EINTR ) goto again;
log("thread: Call to waitpid(%li) returned %li: %s.",
(long)t->m_pid,(long)pid,mstrerror(err));
continue;
}
// if status not 0 then process got abnormal termination
if ( ! WIFEXITED(status) ) {
if ( WIFSIGNALED(status) )
log("thread: Child process (pid=%i) exited "
"from unhandled signal number %li.",
pid,(long)WTERMSIG(status));
else
log("thread: Child process (pid=%i) exited "
"for unknown reason." , pid );
}
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined with pid=%li pid=%li.",
(long)t->m_pid,(long)t->m_pid);
#endif
2013-08-03 00:12:24 +04:00
// we may get cleaned up and re-used and our niceness reassignd
// right after set m_isDone to true, so save niceness
long niceness = t->m_niceness;
char qnum = t->m_qnum;
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
if ( tq != this ) { char *xx = NULL; *xx = 0; }
// get read size before cleaning it up -- it could get nuked
long rs = 0;
bool isWrite = false;
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
//FileState *fs = (FileState *)t->m_state;
rs = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( niceness <= 0) tq->m_hiReturned++;
else if ( niceness == 1) tq->m_mdReturned++;
else if ( niceness >= 2) tq->m_loReturned++;
// deal with the tiers for disk threads based on read sizes
if ( tq->m_threadType == DISK_THREAD ) {
// writes are special cases
if ( isWrite ) m_writesReturned++;
if ( rs >= 0 && niceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_loReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_loReturnedMed++;
else
tq->m_loReturnedSma++;
}
else if ( rs >= 0 && niceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_mdReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_mdReturnedMed++;
else
tq->m_mdReturnedSma++;
}
else if ( rs >= 0 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_hiReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_hiReturnedMed++;
else
tq->m_hiReturnedSma++;
}
}
// now count him as returned
m_returned++;
// prepare for relaunch if we were cancelled
if ( t->m_isCancelled ) {
t->m_isCancelled = false;
t->m_isLaunched = false;
t->m_isDone = false;
log("thread: Thread cancelled. Preparing thread "
"for relaunch");
continue;
}
numCallbacks++;
// not running any more
t->m_isLaunched = false;
// not occupied any more
t->m_isOccupied = false;
// do we have to decrement top
if ( m_top == i + 1 )
while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
m_top--;
// send a cancel sig to the thread in case it's still there
//int err = pthread_cancel ( t->m_tid );
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
// mstrerror(err) );
// one less entry occupied
//m_entriesUsed--;
// debug msg
//log("m_entriesUsed now %li",m_entriesUsed);
// one more returned
//m_returned++;
// clear the g_errno in case set by a previous callback
//g_errno = 0;
// launch as many threads as we can before calling the
// callback since this may hog the CPU like Msg20 does
//g_threads.launchThreads();
g_errno = 0;
//g_loop.startBlockedCpuTimer();
//only allow a quickpoll if we are nice.
//g_loop.canQuickPoll(t->m_niceness);
makeCallback ( t );
//long long took = gettimeofdayInMilliseconds()-startTime;
//if(took > 8 && maxNiceness > 0) {
// if(g_conf.m_sequentialProfiling)
// log(LOG_TIMING,
// "admin: Threads spent %lli ms to callback "
// "%li callbacks, nice: %li",
// took, numCallbacks, maxNiceness);
// g_threads.m_needBottom = true;
// maxNiceness = 0;
//}
// clear errno again
g_errno = 0;
if ( g_conf.m_logDebugThread ) {
long long now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%lx] %s done1. "
2013-08-03 00:12:24 +04:00
"active=%li "
"time since queued = %llu ms "
"time since launch = %llu ms "
"time since pre-exit = %llu ms "
"time since exit = %llu ms",
(unsigned long)t,
2013-08-03 00:12:24 +04:00
getThreadType() ,
(long)(m_launched - m_returned) ,
now - t->m_queuedTime,
now - t->m_launchedTime,
now - t->m_preExitTime ,
now - t->m_exitTime );
}
}
//since we need finer grained control in loop, we no longer collect
//the callbacks, sort, then call them. we now call them right away
//that way we can break out if we start taking too long and
//give control back to udpserver.
return numCallbacks != 0;
}
void makeCallback ( ThreadEntry *t ) {
// sanity check - if in a high niceness callback, we should
// only be calling niceness 0 callbacks here
// no, this is only called from sleep wrappers originating from
// Loop.cpp, so we should be ok
//if ( g_niceness==0 && t->m_niceness ) { char *xx=NULL;*xx=0; }
// save it
long saved = g_niceness;
// log it now
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: enter thread callback t=0x%lx "
//"type=%s "
"state=0x%lx "
"nice=%li",
(long)t,
//getThreadType(),
(long)t->m_state,
(long)t->m_niceness);
2013-08-03 00:12:24 +04:00
// time it?
long long start;
if ( g_conf.m_maxCallbackDelay >= 0 )
start = gettimeofdayInMillisecondsLocal();
// then set it
if ( t->m_niceness >= 1 ) g_niceness = 1;
else g_niceness = 0;
2013-08-03 00:12:24 +04:00
t->m_callback ( t->m_state , t );
// time it?
if ( g_conf.m_maxCallbackDelay >= 0 ) {
long long elapsed = gettimeofdayInMillisecondsLocal() - start;
if ( elapsed >= g_conf.m_maxCallbackDelay )
log("threads: Took %lli ms to call "
"thread callback niceness=%li",
elapsed,(long)saved);
}
// log it now
if ( g_conf.m_logDebugLoop || g_conf.m_logDebugThread )
log(LOG_DEBUG,"loop: exit thread callback t=0x%lx "
//"type=%s "
"nice=%li",
(long)t,
//getThreadType(),
(long)t->m_niceness);
2013-08-03 00:12:24 +04:00
// restore global niceness
g_niceness = saved;
}
bool Threads::cleanUp ( ThreadEntry *t , long maxNiceness ) {
bool didSomething = false;
loop:
// assume no more cleanup needed
m_needsCleanup = false;
//m_needBottom = false;
// debug msg
//log("cleanUp");
// debug msg
//log("cleaning up exited threads and calling callbacks");
for ( long i = 0 ; i < m_numQueues ; i++ ) {
didSomething |= m_threadQueues[i].cleanUp( t , maxNiceness );
// . if we broke from the loop
//if(m_needBottom) maxNiceness = 0;
}
// . loop more if we got a new one
// . thread will set this when about to exit
// . waitpid() may be interrupted by a SIGCHLD and not get his pid
if ( m_needsCleanup ) goto loop;
return didSomething;
}
// . cleans up any threads that have exited
// . their m_isDone should be set to true
// . don't process threads whose niceness is > maxNiceness
bool ThreadQueue::cleanUp ( ThreadEntry *tt , long maxNiceness ) {
// call all callbacks after all threads are cleaned up
void (* callbacks[64])(void *state,ThreadEntry *);
void *states [64];
long long times [64];
long long times2 [64];
long long times3 [64];
long long times4 [64];
ThreadEntry *tids [64];
long long startTime = gettimeofdayInMilliseconds();
top:
long numCallbacks = 0;
// loop through candidates
for ( long i = 0 ; i < m_top && numCallbacks < 64 ; i++ ) {
// point to it
ThreadEntry *t = &m_entries[i];
// skip if not qualified
if ( t->m_niceness > maxNiceness ) {
//if(t->m_isDone) {
// g_threads.m_needBottom = true;
// //g_threads.m_needsCleanup = true;
//}
continue;
}
// must be occupied to be done (sanity check)
if ( ! t->m_isOccupied ) continue;
// skip if not launched yet
if ( ! t->m_isLaunched ) continue;
// . we were segfaulting right here before because the thread
// was setting t->m_pid and at this point it was not
// set so t->m_pid was a bogus value
// . thread may have seg faulted, in which case sigbadhandler()
// in Loop.cpp will get it and set errno to 0x7fffffff
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
// MDW: i hafta take this out because the errno_location thing
// is not working on the newer gcc
if ( ! t->m_isDone && t->m_pid >= 0 &&
s_errnos [t->m_pid] == 0x7fffffff ) {
log("thread: Got abnormal thread termination. Seems "
"like the thread might have cored.");
s_errnos[t->m_pid] = 0;
goto again;
}
#endif
// skip if not done yet
if ( ! t->m_isDone ) continue;
2013-09-14 01:37:35 +04:00
#ifdef PTHREADS
2013-08-03 00:12:24 +04:00
2013-11-17 05:34:06 +04:00
if ( t->m_needsJoin ) {
// . join up with that thread
// . damn, sometimes he can block forever on his
// call to sigqueue(),
long status = pthread_join ( t->m_joinTid , NULL );
if ( status != 0 ) {
log("threads: pthread_join2 %li = %s (%li)",
(long)t->m_joinTid,mstrerror(status),
status);
}
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined2 with t=0x%lx "
"jointid=0x%lx.",
(long)t,(long)t->m_joinTid);
2013-08-03 00:12:24 +04:00
}
#else
again:
int status ;
pid_t pid = waitpid ( t->m_pid , &status , 0 );
int err = errno;
// debug the waitpid
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: Waiting for t=0x%lx pid=%li.",
2013-08-03 00:12:24 +04:00
(long)t,(long)t->m_pid);
// bitch and continue if join failed
if ( pid != t->m_pid ) {
// waitpid() gets interrupted by various signals so
// we need to repeat (SIGCHLD?)
if ( err == EINTR ) goto again;
log("thread: Call to waitpid(%li) returned %li: %s.",
(long)t->m_pid,(long)pid,mstrerror(err));
continue;
}
// if status not 0 then process got abnormal termination
if ( ! WIFEXITED(status) ) {
if ( WIFSIGNALED(status) )
log("thread: Child process (pid=%i) exited "
"from unhandled signal number %li.",
pid,(long)WTERMSIG(status));
else
log("thread: Child process (pid=%i) exited "
"for unknown reason." , pid );
}
//mfree ( t->m_stack , STACK_SIZE , "Threads" );
g_threads.returnStack ( t->m_si );
t->m_stack = NULL;
// re-protect this stack
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE,
PROT_NONE );
#endif
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: joined with pid=%li pid=%li.",
(long)t->m_pid,(long)t->m_pid);
// we may get cleaned up and re-used and our niceness reassignd
// right after set m_isDone to true, so save niceness
long niceness = t->m_niceness;
char qnum = t->m_qnum;
ThreadQueue *tq = &g_threads.m_threadQueues[(int)qnum];
if ( tq != this ) { char *xx = NULL; *xx = 0; }
// get read size before cleaning it up -- it could get nuked
long rs = 0;
bool isWrite = false;
if ( tq->m_threadType == DISK_THREAD ) { // && t->m_state ) {
//FileState *fs = (FileState *)t->m_state;
rs = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( niceness <= 0) tq->m_hiReturned++;
else if ( niceness == 1) tq->m_mdReturned++;
else if ( niceness >= 2) tq->m_loReturned++;
// deal with the tiers for disk threads based on read sizes
if ( tq->m_threadType == DISK_THREAD ) {
// writes are special cases
if ( isWrite ) m_writesReturned++;
if ( rs >= 0 && niceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_loReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_loReturnedMed++;
else
tq->m_loReturnedSma++;
}
else if ( rs >= 0 && niceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_mdReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_mdReturnedMed++;
else
tq->m_mdReturnedSma++;
}
else if ( rs >= 0 ) {
if ( rs > g_conf.m_medReadSize )
tq->m_hiReturnedBig++;
else if ( rs > g_conf.m_smaReadSize )
tq->m_hiReturnedMed++;
else
tq->m_hiReturnedSma++;
}
}
// . we should count down here, not in the master thread
// . solves Problem #2 ?
// . TODO: this is not necessaruly atomic we should set
// t->m_aboutToExit to true so cleanUp can periodically set
// m_returned to what it should be!!!
//g_threads.m_threadQueues[qnum].m_returned++;
// now count him as returned
m_returned++;
// prepare for relaunch if we were cancelled
if ( t->m_isCancelled ) {
t->m_isCancelled = false;
t->m_isLaunched = false;
t->m_isDone = false;
log("thread: Thread cancelled. Preparing thread "
"for relaunch");
continue;
}
// debug msg
//log("[%lu] CLEANING UP THREAD type=%li, numLaunched=%li",
// m_entries[i].m_tid , m_threadType , m_launched );
// remove it
// debug msg
//log("CLN TID=%lu t=%lu",(long)t->m_tid , (long)t);
//log("thread callback for tid=%lu",(long)t->m_tid );
// . save important stuff before freeing up the ThreadEntry
// for possible take over.
// . calling the callback may launch a thread which may
// claim THIS thread entry, t
//void (* callback)(void *state);
//callback = t->m_callback;
//void *state = t->m_state;
callbacks [ numCallbacks ] = t->m_callback;
states [ numCallbacks ] = t->m_state;
times [ numCallbacks ] = t->m_queuedTime;
times2 [ numCallbacks ] = t->m_launchedTime;
times3 [ numCallbacks ] = t->m_preExitTime;
times4 [ numCallbacks ] = t->m_exitTime;
tids [ numCallbacks ] = t;
numCallbacks++;
// SOLUTION: before calling the callback which may launch
// another thread with this same tid, thus causing an error,
// we should set these to false first:
// not running any more
t->m_isLaunched = false;
// not occupied any more
t->m_isOccupied = false;
// do we have to decrement top
if ( m_top == i + 1 )
while (m_top > 0 && ! m_entries[m_top-1].m_isOccupied)
m_top--;
// send a cancel sig to the thread in case it's still there
//int err = pthread_cancel ( t->m_tid );
//if ( err != 0 ) log("thread: cleanUp: pthread_cancel: %s",
// mstrerror(err) );
// one less entry occupied
//m_entriesUsed--;
// debug msg
//log("m_entriesUsed now %li",m_entriesUsed);
// one more returned
//m_returned++;
// clear the g_errno in case set by a previous callback
//g_errno = 0;
// launch as many threads as we can before calling the
// callback since this may hog the CPU like Msg20 does
//g_threads.launchThreads();
g_errno = 0;
makeCallback ( t );
// long long took = gettimeofdayInMilliseconds()-startTime;
// if(took > 8 && maxNiceness > 0) {
// if(g_conf.m_sequentialProfiling)
// log(LOG_TIMING,
// "admin: Threads spent %lli ms to callback "
// "%li callbacks, nice: %li",
// took, numCallbacks, maxNiceness);
// g_threads.m_needBottom = true;
// maxNiceness = 0;
// }
// clear errno again
g_errno = 0;
if ( g_conf.m_logDebugThread ) {
long long now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%lx] %s done2. "
2013-08-03 00:12:24 +04:00
"active=%li "
"time since queued = %llu ms "
"time since launch = %llu ms "
"time since pre-exit = %llu ms "
"time since exit = %llu ms",
(unsigned long)t,
2013-08-03 00:12:24 +04:00
getThreadType() ,
(long)(m_launched - m_returned) ,
now - t->m_queuedTime,
now - t->m_launchedTime,
now - t->m_preExitTime ,
now - t->m_exitTime );
}
// calling thread callback
//log("calling thread id %li callback", (long)(t->m_tid));
// first call it's callback
//callback ( state );
// clear after just in case
//g_errno = 0;
// debug msg
//log("CLN2 TID=%lu t=%li",(long)t->m_tid ,(long)t);
// return now if tt was specified
//if ( tt ) return;
}
long long took2 = gettimeofdayInMilliseconds()-startTime;
if(numCallbacks > 0 && took2 > 5)
log(LOG_DEBUG, "threads: took %lli ms to callback %li "
"callbacks, nice: %li", took2, numCallbacks, maxNiceness);
//since we need finer grained control in loop, we no longer collect
//the callbacks, sort, then call them. we now call them right away
//that way we can break out if we start taking too long and
//give control back to udpserver.
return numCallbacks != 0;
// print out that we got them
if ( g_conf.m_logDebugThread ) {
long long now = gettimeofdayInMilliseconds();
for ( long i = 0 ; i < numCallbacks ; i++ )
log(LOG_DEBUG,"thread: [tid=%lu] %s done3. "
2013-08-03 00:12:24 +04:00
"active=%li "
"time since queued = %llu ms "
"time since launch = %llu ms "
"time since pre-exit = %llu ms "
"time since exit = %llu ms",
(unsigned long)tids[i],
2013-08-03 00:12:24 +04:00
getThreadType() ,
(long)(m_launched - m_returned) ,
now - times [i],
now - times2[i] ,
now - times3[i] ,
now - times4[i] );
}
// . before calling callbacks, launch any other threads waiting in
// this queue
// . TODO: break into parts, cleanup, launch, call callbacks
//while ( launchThread() );
// . sort callbacks by queued time
// . do bubble sort cuz usually it's not too many threads we cleaned up
bool flag ;
void (* tmpCallback)(void *state,ThreadEntry *t);
long long tmpTime;
void *tmpState;
long long now = gettimeofdayInMilliseconds();
bubble:
flag = false;
for ( long i = 1 ; i < numCallbacks ; i++ ) {
if ( times[i] >= times[i-1] ) continue;
tmpTime = times [i ];
tmpState = states [i ];
tmpCallback = callbacks[i ];
times [i ] = times [i-1];
states [i ] = states [i-1];
tids [i ] = tids [i-1];
callbacks [i ] = callbacks[i-1];
times [i-1] = tmpTime;
states [i-1] = tmpState;
callbacks [i-1] = tmpCallback;
flag = true;
}
if ( flag ) goto bubble;
// call the callbacks now in order of oldest queued time first
for ( long i = 0 ; i < numCallbacks ; i++ ) {
g_errno = 0;
callbacks[i] ( states[i] , NULL );
}
long long took = gettimeofdayInMilliseconds()-now;
if(numCallbacks > 0 && took > 5)
log(LOG_TIMING, "admin: took %lli ms to callback %li "
"callbacks, nice: %li", took, numCallbacks, maxNiceness);
#if 0 //I think this is more efficient, for now we'll just use the old way
//theres no reason to sort these, lets just find the top one
//and call it. do it again it until we've called all of them.
long newNumCallbacks = numCallbacks;
for ( long i = 0 ; i < numCallbacks ; i++ ) {
long long maxTime = 0;
long maxNdx = 0;
for ( long j = 0 ; j < newNumCallbacks ; j++ ) {
if(maxTime >= times[i]) {
maxTime = times[i];
maxNdx = i;
}
}
g_errno = 0;
callbacks[maxNdx] ( states[maxNdx] );
//copy last one into called slot and decrement
newNumCallbacks--;
times [maxNdx] = times [newNumCallbacks];
states [maxNdx] = states [newNumCallbacks];
callbacks [maxNdx] = callbacks[newNumCallbacks];
}
#endif
g_errno = 0;
// close more threads if we were limited by callbacks[] size
if ( numCallbacks >= 64 ) goto top;
//return true if we called something back;
//returns wrong value if numCallbacks was exactly 64, not too serious...
return numCallbacks != 0;
}
// used by UdpServer to see if it should call a low priority callback
long Threads::getNumActiveHighPriorityCpuThreads() {
ThreadQueue *q ;
long hiActive = 0 ;
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[MERGE_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
return hiActive;
}
// used by UdpServer to see if it should call a low priority callback
long Threads::getNumActiveHighPriorityThreads() {
ThreadQueue *q ;
long hiActive = 0 ;
q = &g_threads.m_threadQueues[DISK_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[INTERSECT_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
q = &g_threads.m_threadQueues[MERGE_THREAD];
hiActive += q->m_hiLaunched - q->m_hiReturned;
return hiActive;
}
// . returns false if no thread launched
// . returns true if thread was launched
// . sets g_errno on error
// . don't launch a low priority thread if a high priority thread is running
// . i.e. don't launch a high niceness thread if a low niceness is running
bool ThreadQueue::launchThread2 ( ThreadEntry *te ) {
2013-08-03 00:12:24 +04:00
// debug msg
//log("trying to launch for type=%li",(long)m_threadType);
// clean up any threads that have exited
//cleanUp ();
// if no entries then nothing to launch
if ( m_top <= 0 ) return false;
// or if no stacks left, don't even try
if ( s_head == -1 ) return false;
// . how many threads are active now?
// . NOTE: not perfectly thread safe here
long long active = m_launched - m_returned ;
// debug msg
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: launchThread: active=%lli max=%li.",
active,m_maxLaunched);
// return if the max is already launched
if ( active >= m_maxLaunched ) return false;
// do not launch a low priority merge, intersect or filter thread if we
2013-08-03 00:12:24 +04:00
// have high priority cpu threads already going on. this way a
// low priority spider thread will not launch if a high priority
// cpu-based thread of any kind (right now just MERGE or INTERSECT)
// is already running.
long hiActive2 = g_threads.getNumActiveHighPriorityCpuThreads() ;
// return log("MAX. %li are launched. %li now in queue.",
// active , m_entriesUsed );
// . sanity check
// . a thread can NOT call this
//if ( getpid() != s_pid ) {
// fprintf(stderr,"thread: launchThread: bad engineer\n");
// ::exit(-1);
//}
//long long now = gettimeofdayInMilliseconds();
long long now = -1LL;
// pick thread with lowest niceness first
long minNiceness = 0x7fffffff;
long long maxWait = -1;
long mini = -1;
bool minIsWrite = false;
long lowest = 0x7fffffff;
long highest = 0;
// . now base our active thread counts on niceness AND read sizes
// . this is only used for DISK_THREADs
// . loActive* includes niceness >= 1
long loActiveBig = m_loLaunchedBig - m_loReturnedBig;
long loActiveMed = m_loLaunchedMed - m_loReturnedMed;
long loActiveSma = m_loLaunchedSma - m_loReturnedSma;
long mdActiveBig = m_mdLaunchedBig - m_mdReturnedBig;
long mdActiveMed = m_mdLaunchedMed - m_mdReturnedMed;
long mdActiveSma = m_mdLaunchedSma - m_mdReturnedSma;
long hiActiveBig = m_hiLaunchedBig - m_hiReturnedBig;
long hiActiveMed = m_hiLaunchedMed - m_hiReturnedMed;
long hiActiveSma = m_hiLaunchedSma - m_hiReturnedSma;
long activeWrites = m_writesLaunched - m_writesReturned;
// how many niceness=2 threads are currently running now?
long long loActive = m_loLaunched - m_loReturned;
long long mdActive = m_mdLaunched - m_mdReturned;
//long long hiActive = m_hiLaunched - m_hiReturned;
long total = loActive + mdActive;
long max = g_conf.m_spiderMaxDiskThreads;
if ( max <= 0 ) max = 1;
// hi priority max
// JAB: warning abatement
//long long hiActive = m_hiLaunched - m_hiReturned;
// i dunno what the point of this was... so i commented it out
//long max2 = g_conf.m_queryMaxDiskThreads ;
//if ( max2 <= 0 ) max2 = 1;
// only do this check if we're a addlists/instersect thread queue
2013-08-03 00:12:24 +04:00
//if (m_threadType == INTERSECT_THREAD&& hiActive >= max2)return false;
// loop through candidates
for ( long i = 0 ; i < m_top ; i++ ) {
// skip if not occupied
if ( ! m_entries[i].m_isOccupied ) continue;
long niceness = m_entries[i].m_niceness;
// get lowest niceness level of launched threads
if ( m_entries[i].m_isLaunched ) {
// if he's done, skip him
if ( m_entries[i].m_isDone ) continue;
// get the highest niceness for all that are launched
if ( niceness > highest ) highest = niceness;
// get the lowest niceness for all that are launched
if ( niceness < lowest ) lowest = niceness;
// continue now since it's already launched
continue;
}
// . these filters really make it so the spider does not
// impact query response time
//if ( niceness >= 1 && hiActive > 0 ) continue;
// don't consider any lows if one already running
//if ( niceness >= 2 && loActive > 0 ) continue;
// don't consider any lows if a hi already running
//if ( niceness >= 2 && hiActive > 0 ) continue;
// don't consider any mediums if one already running
//if ( niceness == 1 && mdActive > 0 ) continue;
// don't consider any mediums if a hi already running
//if ( niceness == 1 && hiActive > 0 ) continue;
//if ( m_threadType == DISK_THREAD ) {
// if ( niceness >= 1 && hiActive > 0 ) continue;
// if ( niceness >= 2 && loActive >= max ) continue;
// if ( niceness == 1 && mdActive >= max ) continue;
//}
// treat niceness 1 as niceness 2 for ranking purposes
// IFF we're not too backlogged with file merges. i.e.
// IFF we're merging faster than we're dumping.
// only merges and dumps have niceness 1 really.
// spider disk reads are all niceness 2.
// Now Rdb::addList just refuses to add data if we have too
// many unmerged files on disk!
// now we use niceness 1 for "real merges" so those reads take
// priority over spider build reads. (8/14/12)
//if(niceness == 1 /*&& g_numUrgentMerges <= 0*/) niceness = 2;
// if he doesn't beat or tie us, skip him
if ( niceness > minNiceness ) continue;
// no more than "max" medium and low priority threads should
// be active/launched at any one time
if ( niceness >= 1 && total >= max ) continue;
// shortcut
ThreadEntry *t = &m_entries[i];
// what is this guy's read size?
// the filestate provided could have been
//FileState *fs ;
long readSize = 0 ;
bool isWrite = false;
if ( m_threadType == DISK_THREAD ){//&&m_entries[i].m_state ) {
//fs = (FileState *)m_entries[i].m_state;
readSize = t->m_bytesToGo;
isWrite = t->m_doWrite ;
}
if ( isWrite && activeWrites > g_conf.m_maxWriteThreads )
continue;
if ( m_threadType == MERGE_THREAD ||
m_threadType == INTERSECT_THREAD ||
m_threadType == FILTER_THREAD )
if ( niceness > 0 && hiActive2 > 0 )
continue;
// how many threads can be launched for this readSize/niceness?
if ( niceness >= 1 && m_threadType == DISK_THREAD ) {
if ( readSize > g_conf.m_medReadSize ) {
if ( loActiveBig + mdActiveBig >=
g_conf.m_spiderMaxBigDiskThreads )
continue;
}
else if ( readSize > g_conf.m_smaReadSize ) {
if ( loActiveMed + mdActiveMed >=
g_conf.m_spiderMaxMedDiskThreads )
continue;
}
else if ( loActiveSma + mdActiveSma >=
g_conf.m_spiderMaxSmaDiskThreads )
continue;
}
else if ( niceness < 1 && m_threadType == DISK_THREAD ) {
if ( readSize > g_conf.m_medReadSize ) {
if ( hiActiveBig >=
g_conf.m_queryMaxBigDiskThreads )
continue;
}
else if ( readSize > g_conf.m_smaReadSize ) {
if ( hiActiveMed >=
g_conf.m_queryMaxMedDiskThreads )
continue;
}
else if ( hiActiveSma >=
g_conf.m_queryMaxSmaDiskThreads )
continue;
}
// be lazy with this since it uses a significant amount of cpu
if ( now == -1LL ) now = gettimeofdayInMilliseconds();
// how long has this entry been waiting in the queue to launch?
long long waited = now - m_entries[i].m_queuedTime ;
// adjust "waited" if it originally had a niceness of 1
if ( m_entries[i].m_niceness >= 1 ) {
// save threads gain precedence
if ( m_threadType == SAVETREE_THREAD )
waited += 49999999;
else if ( m_threadType == UNLINK_THREAD )
waited += 39999999;
else if ( m_threadType == MERGE_THREAD )
waited += 29999999;
else if ( m_threadType == INTERSECT_THREAD )
waited += 29999999;
else if ( m_threadType == FILTER_THREAD )
waited += 9999999;
// if its a write thread... do it quick
else if ( isWrite )
waited += 19999999;
// . if it has waited more than 500 ms it needs
// to launch now...
// . these values seem to be VERY well tuned on
// my production machines, but they may have to
// be tuned for other machines? TODO.
// . they should auto-tune so when merge is more
// important it should starve the other spider reads
else if ( waited >= 500 )
waited += 9999999;
// it hurts for these guys to wait that long
else waited *= 4;
}
// is real merge?
if ( m_entries[i].m_niceness == 1 )
waited += 19999999;
// watch out for clock skew
if ( waited < 0 ) waited = 0;
// if tied, the lowest time wins
if ( niceness == minNiceness && waited <= maxWait ) continue;
// we got a new winner
mini = i;
minNiceness = niceness;
maxWait = waited;
minIsWrite = isWrite;
}
// if no candidate, bail honorably
if ( mini == -1 ) return false;
// . if we've got an urgent thread (niceness=1) going on, do not allow
// niceness=2 threads to launch
// . actually, don't launch *ANY* if doing an urgent merge even if
// no niceness=1 thread currently launched
// . CAUTION! when titledb is merging it dumps tfndb and if tfndb
// goes urgent,make sure it dumps out with niceness 1, otherwise
// this will freeze us!! since titledb won't be able to continue
// merging until tfndb finishes dumping...
// . Now Rdb::addList just refuses to add data if we have too
// many unmerged files on disk! Let others still read from us,
// just not add to us...
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 ) // && lowest <= 1 )
// return false;
// if we're urgent, don't launch a niceness 2 unless he's waited
// at least 200ms in the queue
//if ( minNiceness >= 2 && g_numUrgentMerges > 0 && maxWait < 200 )
// return false;
// . if the thread to launch has niceness > lowest launched then bail
// . i.e. don't launch a low-priority thread if we have highs running
// . we no longer let a niceness of 1 prevent a niceness of 2 from
// launching, this way we can launch merge threads at a niceness
// of 1 w/o hurting the spidering too much, but still giving the
// merge some preferential treatment over the disk so we don't
// RdbDump data faster than we can finish the merge.
if ( minNiceness > lowest && lowest < 1 ) return false;
// . don't launch a low priority thread if there's a high launched
// from any ThreadQueue
// . hi priority is niceness <= 0, med/lo priority is niceness >= 1
//long long hiActive = g_threads.m_hiLaunched - g_threads.m_hiReturned;
// now just limit it to this queue... so you can launch a low
// merge thread even if there's a high disk read going on
//if ( minNiceness >= 1 && hiActive > 0 ) return false;
// if we're low priority and suspended is true then bail
//if ( minNiceness > 0 && m_isLowPrioritySuspended ) return false;
// if we're going to launch a high priority thread then CANCEL
// any low priority disk-read threads already in progress
//if ( minNiceness <= 0 && highest > 0 ) cancelLowPriorityThreads ();
// . let's cancel ALL low priority threads if we're launching a high
// . hi priority is niceness <= 0, low priority is niceness >= 1
//long long loActive = g_threads.m_loLaunched - g_threads.m_loReturned;
long realNiceness = m_entries[mini].m_niceness;
//if ( realNiceness <= 0 && (loActive + mdActive) > 0 )
// // this actually cancels medium priority threads, too
// g_threads.cancelLowPriorityThreads();
// point to winning entry
ThreadEntry *t = &m_entries[mini];
// if descriptor was closed, just return error now, we
// cannot try to re-open because the file might have been
// unlinked. Sync.cpp does a DISK_THREAD but does not pass in a valid
// FileState ptr because it does its own saving, so check for NULLs.
FileState *fs = (FileState *)t->m_state;
bool allocated = false;
if ( m_threadType == DISK_THREAD && fs && ! fs->m_doWrite ) {
// allocate the read buffer here!
if ( ! fs->m_doWrite && ! fs->m_buf && t->m_bytesToGo > 0 ) {
long need = t->m_bytesToGo + fs->m_allocOff;
char *p = (char *) mmalloc ( need , "ThreadReadBuf" );
if ( p ) {
fs->m_buf = p + fs->m_allocOff;
fs->m_allocBuf = p;
fs->m_allocSize = need;
allocated = true;
}
else
log("thread: read buf alloc failed for %li "
"bytes.",need);
// just let the BigFile::readWrite_r() handle the
// error for the NULL read buf
}
// . otherwise, they are intact, so get the real fds
// . we know the stored File is still around because of that
bool doWrite = fs->m_doWrite;
BigFile *bb = fs->m_this;
fs->m_fd1 = bb->getfd (fs->m_filenum1, !doWrite, &fs->m_vfd1);
fs->m_fd2 = bb->getfd (fs->m_filenum2, !doWrite, &fs->m_vfd2);
// is this bad?
if ( fs->m_fd1 < 0 ) log("disk: fd1 is %i for %s",
fs->m_fd1,bb->m_baseFilename);
if ( fs->m_fd2 < 0 ) log("disk: fd2 is %i for %s.",
fs->m_fd2,bb->m_baseFilename);
fs->m_closeCount1 = getCloseCount_r ( fs->m_fd1 );
fs->m_closeCount2 = getCloseCount_r ( fs->m_fd2 );
}
// count it as launched now, before we actually launch it
m_launched++;
// priority-based GLOBAL & LOCAL launch count
if ( realNiceness <= 0 ) m_hiLaunched++;
else if ( realNiceness == 1 ) m_mdLaunched++;
else if ( realNiceness >= 2 ) m_loLaunched++;
// deal with the tiers for disk threads based on read sizes
if ( m_threadType == DISK_THREAD ) {
// writes are special cases
if ( minIsWrite ) m_writesLaunched++;
//FileState *fs = (FileState *)m_entries[mini].m_state;
long rs = t->m_bytesToGo; // 0;
//if ( fs ) rs = fs->m_bytesToGo;
if ( realNiceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
m_loLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_loLaunchedMed++;
else
m_loLaunchedSma++;
}
else if ( realNiceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
m_mdLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_mdLaunchedMed++;
else
m_mdLaunchedSma++;
}
else {
if ( rs > g_conf.m_medReadSize )
m_hiLaunchedBig++;
else if ( rs > g_conf.m_smaReadSize )
m_hiLaunchedMed++;
else
m_hiLaunchedSma++;
}
}
// debug msg
//if ( m_threadType == 0 )
// log("creating thread, t=%lu state=%lu launched = %li",
// t , (long)t->m_state , m_launched );
// and set the flag
t->m_isLaunched = true;
// . launch it
// . this sets the pthread_t ptr for identificatoin
// . returns false on error
//pthread_t tmp;
loop:
// debug msg
if ( g_conf.m_logDebugThread ) {
active = m_launched - m_returned ;
long long now = gettimeofdayInMilliseconds();
log(LOG_DEBUG,"thread: [t=0x%lx] launched %s thread. "
"active=%lli "
2013-08-03 00:12:24 +04:00
"niceness=%lu. waited %llu ms in queue.",
(unsigned long)t, getThreadType(), active, realNiceness,
2013-08-03 00:12:24 +04:00
now - t->m_queuedTime);
}
// be lazy with this since it uses a significant amount of cpu
if ( now == -1LL ) now = gettimeofdayInMilliseconds();
//t->m_launchedTime = g_now;
t->m_launchedTime = now;
// loop2:
2013-08-03 00:12:24 +04:00
// spawn the thread
long count = 0;
pid_t pid;
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
//int status;
//int ret;
// random failure test
//if ( rand() %10 == 1 ) { err = ENOMEM; goto hadError; }
// malloc twice the size
t->m_stackSize = STACK_SIZE;
//t->m_stack = (char *)mmalloc ( t->m_stackSize , "Threads" );
long si = g_threads.getStack ( );
if ( si < 0 ) {
log(LOG_LOGIC,"thread: Unable to get stack. Bad engineer.");
goto hadError;
}
t->m_si = si;
t->m_stack = s_stackPtrs [ si ];
// UNprotect the whole stack so we can use it
mprotect ( t->m_stack + GUARDSIZE , STACK_SIZE - GUARDSIZE ,
PROT_READ | PROT_WRITE );
// clear g_errno
g_errno = 0;
// . make another process
// . do not use sig handlers, so if a child process gets any unhandled
// signal (like SEGV) it will just exit
pid = clone ( startUp , t->m_stack + t->m_stackSize ,
CLONE_FS | CLONE_FILES | CLONE_VM | //CLONE_SIGHAND |
SIGCHLD ,
(void *)t );
// . we set the pid because we are the one that checks it!
// . if we just let him do it, when we check in cleanup routine
// we can get an uninitialized pid
t->m_pid = pid;
// might as well bitch if we should here
if ( s_bad ) {
log(LOG_LOGIC,"thread: PID received: %li > %li. Bad.",
s_badPid, (long)MAX_PID);
//char *xx = NULL; *xx = 0;
}
// wait for him
//ret = waitpid ( -1*pid , &status , 0 );
//if ( ret != pid )
// log("waitpid(pid=%li): ret=%li err=%s",
// (long)pid,(long)ret,mstrerror(errno));
// check if he's done
//if ( ! t->m_isDone ) log("NOT DONE");
// set the pid
//t->m_pid = pid;
// error?
if ( pid == (pid_t)-1 ) g_errno = errno;
//
// now use pthreads again... are they stable yet?
//
#else
// assume it does not go through
t->m_needsJoin = false;
// pthread inherits our sigmask, so don't let it handle sigalrm
// signals in Loop.cpp, it'll screw things up. that handler
// is only meant to be called by the main process. if we end up
// double calling it, this thread may think g_callback is non-null
// then it gets set to NULL, then the thread cores! seen it...
sigset_t sigs;
sigemptyset ( &sigs );
sigaddset ( &sigs , SIGALRM );
if ( sigprocmask ( SIG_BLOCK , &sigs , NULL ) < 0 )
log("threads: failed to block sig");
// this returns 0 on success, or the errno otherwise
g_errno = pthread_create ( &t->m_joinTid , &s_attr, startUp2 , t) ;
2013-08-03 00:12:24 +04:00
if ( sigprocmask ( SIG_UNBLOCK , &sigs , NULL ) < 0 )
log("threads: failed to unblock sig");
2013-08-03 00:12:24 +04:00
#endif
// we're back from pthread_create
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: Back from clone t=0x%lx pid=%li.",
2013-08-03 00:12:24 +04:00
(long)t,(long)pid);
// return true on successful creation of the thread
if ( g_errno == 0 ) {
// good stuff, the thread needs a join now
t->m_needsJoin = true;
2013-08-03 00:12:24 +04:00
if ( count > 0 )
log("thread: Call to clone looped %li times.",count);
return true;
}
//#undef usleep
2013-08-03 00:12:24 +04:00
// forever loop
if ( g_errno == EAGAIN ) {
if ( count++ == 0 )
log("thread: Call to clone had error: %s.",
mstrerror(g_errno));
//usleep(1);
//goto loop2;
goto hadError;
2013-08-03 00:12:24 +04:00
}
// debug msg
// log("created tid=%li",(long)t->m_tid);
// return true;
//}
// do again if g_errno is AGAIN
if ( g_errno == EINTR ) {
log("thread: Call to clone was interrupted. Trying again.");
goto loop;
}
//#ifndef _PTHREADS_
2013-08-03 00:12:24 +04:00
hadError:
//#endif
if ( g_errno )
log("thread: pthread_create had error = %s",
mstrerror(g_errno));
2013-08-03 00:12:24 +04:00
// it didn't launch, did it? dec the count.
m_launched--;
// priority-based LOCAL & GLOBAL launch counts
if ( realNiceness <= 0 ) m_hiLaunched--;
else if ( realNiceness == 1 ) m_mdLaunched--;
else if ( realNiceness >= 2 ) m_loLaunched--;
// . deal with the tiers for disk threads based on read sizes
// . WARNING: we cannot easily change tiers dynamically
// because it will throw these counts off
if ( m_threadType == DISK_THREAD ) {
// writes are special cases
if ( minIsWrite ) m_writesLaunched--;
//FileState *fs = (FileState *)m_entries[mini].m_state;
long rs = t->m_bytesToGo; // 0;
//if ( fs ) rs = fs->m_bytesToGo;
if ( realNiceness >= 2 ) {
if ( rs > g_conf.m_medReadSize )
m_loLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_loLaunchedMed--;
else
m_loLaunchedSma--;
}
else if ( realNiceness >= 1 ) {
if ( rs > g_conf.m_medReadSize )
m_mdLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_mdLaunchedMed--;
else
m_mdLaunchedSma--;
}
else {
if ( rs > g_conf.m_medReadSize )
m_hiLaunchedBig--;
else if ( rs > g_conf.m_smaReadSize )
m_hiLaunchedMed--;
else
m_hiLaunchedSma--;
}
}
// unset the flag
t->m_isLaunched = false;
// bail on other errors
log("thread: Call to clone had error: %s.", mstrerror(g_errno));
// correction on this error
log("thread: Try not using so much memory. "
"memused now =%lli.",g_mem.getUsedMem());
// free allocated buffer
if ( allocated ) {
2014-02-03 18:27:32 +04:00
mfree ( fs->m_allocBuf , fs->m_allocSize , "ThreadReadBuf" );
2013-08-03 00:12:24 +04:00
fs->m_buf = NULL;
}
// i'm not sure return value matters at this point? the thread
// is queued and hopefully will launch at some point
return false;
2013-08-03 00:12:24 +04:00
// if this is the direct thread request do not call callback, just
// return false, otherwise we get into an unexpected loop thingy
2013-08-03 00:12:24 +04:00
if ( t == te )
return log("thread: Returning false.");
// do it blocking
log("thread: Calling without thread. This will crash many times. "
"Please fix it.");
// return false so caller will re-do without thread!
// so BigFile::readwrite() will retry without thread and we won't
// get into a wierd loop thingy
if ( te ) return false;
2013-08-03 00:12:24 +04:00
// unsigned long long profilerStart,profilerEnd;
// unsigned long long statStart,statEnd;
//if (g_conf.m_profilingEnabled){
// address=(long)t->m_startRoutine;
// g_profiler.startTimer(address, __PRETTY_FUNCTION__);
//}
t->m_startRoutine ( t->m_state , t );
//if (g_conf.m_profilingEnabled) {
// if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__))
// log(LOG_WARN,"admin: Couldn't add the fn %li",
// (long)address);
//}
t->m_exitTime = gettimeofdayInMilliseconds();
// flag it for cleanup
t->m_isDone = true;
t->m_isLaunched = true;
// clean it up
cleanUp ( t , 200/*maxNiceness thread can have to be cleaned up*/ );
// ignore error
g_errno = 0;
// we kinda launched one, so say true here
return true; // false;
}
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
static bool s_firstTime = true;
2013-09-14 01:37:35 +04:00
#endif
2013-08-03 00:12:24 +04:00
// threads start up with cacnellation deferred until pthreads_testcancel()
// is called, but we never call that
int startUp ( void *state ) {
// get thread entry
ThreadEntry *t = (ThreadEntry *)state;
// no! now parent does since he is the one that needs to check it
// in the cleanup routine
// remember the pid
//t->m_pid = getpid();
// . sanity check
// . a thread can NOT call this
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
if ( getpid() == s_pid )
log("thread: Thread has same pid %i as main process.",s_pid);
#endif
// the cleanup handler
//pthread_cleanup_push ( exitWrapper , t ) ; // t->m_state );
// our signal set
sigset_t set;
sigemptyset(&set);
//sigaddset(&set, SIGHUP);
// we need this here so if we break the gb process with gdb it
// does not kill the child processes when it sends out the SIGINT.
sigaddset(&set, SIGINT);
// ignore the real time signal, man...
//sigaddset(&set, GB_SIGRTMIN);
//pthread_sigmask(SIG_BLOCK, &set, NULL);
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
sigprocmask(SIG_BLOCK, &set, NULL);
#else
pthread_sigmask(SIG_BLOCK,&set,NULL);
#endif
2013-08-03 00:12:24 +04:00
// . what this lwp's priority be?
// . can range from -20 to +20
// . the lower p, the more cpu time it gets
// . this is really the niceness, not the priority
int p ;
// currently our niceness ranges from -1 to 2 for us
if ( t->m_niceness == 2 ) p = 19 ;
else if ( t->m_niceness == 1 ) p = 10 ;
else p = 0 ;
// remember the tid
//t->m_tid = pthread_self();
// debug
if ( g_conf.m_logDebugThread )
log(LOG_DEBUG,"thread: [t=0x%lx] in startup pid=%li pppid=%li",
(unsigned long)t,(long)getpidtid(),(long)getppid());
2013-08-03 00:12:24 +04:00
// debug msg
//fprintf(stderr,"new thread tid=%li pid=%li\n",
// (long)t->m_tid,(long)t->m_pid);
// . set this process's priority
// . setpriority() is only used for SCHED_OTHER threads
2013-09-14 01:37:35 +04:00
//if ( pthread_setschedprio ( getpidtid() , p ) ) {
#ifndef PTHREADS
if ( setpriority ( PRIO_PROCESS, getpidtid() , p ) < 0 ) {
2013-08-03 00:12:24 +04:00
// do we even support logging from a thread?
if ( s_firstTime ) {
log("thread: Call to setpriority(%lu,%i) in thread "
2013-08-03 00:12:24 +04:00
"failed: %s. This "
"message will not be repeated.",
(unsigned long)getpidtid(),p,mstrerror(errno));
2013-08-03 00:12:24 +04:00
s_firstTime = false;
}
errno = 0;
}
2013-09-14 01:37:35 +04:00
#endif
2013-08-03 00:12:24 +04:00
/*
sched_param sp;
int pp;
int err = pthread_getschedparam ( t->m_tid , &pp , &sp ) ;
if ( err ) log("thread: startUp: pthread_getschedparam: %s",
mstrerror(err));
// adjust the priority
p = 1;
sp.sched_priority = p;
// and reassign
err = pthread_setschedparam ( t->m_tid , SCHED_OTHER , &sp ) ;
log("thread: startUp: pthread_setschedparam(%li): %s",
p,mstrerror(err));
*/
// somehow, it works ok when we have this print statement delay!!!
//fprintf(stderr,"thread pid = %li\n",(long)getpid());
// . call the startRoutine
// . IMPORTANT: this can NEVER do non-blocking stuff
t->m_startRoutine ( t->m_state , t );
// pop it off
//pthread_cleanup_pop ( 1 /*execute handler?*/ );
// . now throw it on g_loop's sigqueue
// . the first 4 bytes of t->m_state should be t->m_callback
// . no! just use 1 to tell Loop to call g_threads.cleanUp()
// . TODO: pass in a ptr to cleanUpWrapper() instead of "t"
sigval_t svt;
svt.sival_int = (int)t ; //(int)(t->m_state); // fd;
// set exit time
long long now = gettimeofdayInMilliseconds();
t->m_preExitTime = now;
t->m_exitTime = now;
if ( g_conf.m_logDebugThread ) {
log(LOG_DEBUG,"thread: [t=0x%lx] done with startup pid=%li",
(unsigned long)t,(long)getpidtid());
2013-08-03 00:12:24 +04:00
}
// . now mark thread as ready for removal
// . do this BEFORE queing the signal since we're still a thread!!!
// . cleanUp() will take care of the rest
// . cleanUp() will call pthread_join on us!
t->m_isDone = true;
// let Loop.cpp's sigHandler_r call g_thread.cleanUp()
g_threads.m_needsCleanup = true;
//if(t->m_niceness > 0) g_threads.m_needBottom = true;
// . send the signal
// . if queue is full g_loop will get a SIGIO and call
// g_threads.cleanUp()/launchThreads() in it's doPoll() routine
// . we reserve GB_SIGRTMIN itself for unblocked interrupts for
// UdpServer
// . damn, it seems that if the queue is full pthread_join is unable
// to detach threads.... so sleep until it clears up
// . HEY! process is supposed to send us an ECHLD signal? right?
//sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
// . it does not send us a signal automatically, so we must do it!
// . i noticed during the linkdb rebuild we were not getting the signal
sigqueue ( s_pid, GB_SIGRTMIN + 1 + t->m_niceness, svt ) ;
return 0;
}
// pthread_create uses this one
void *startUp2 ( void *state ) {
startUp ( state );
return NULL;
}
// watch out, UdpServer::getEmptySlot() calls UdpServer::suspend() and we
// could be in a signal handler here
void ThreadQueue::suspendLowPriorityThreads() {
// disable for now
//return;
// just return if already suspended
if ( m_isLowPrioritySuspended ) return;
// log it
//log("ThreadQueue:: suspending low priority threads!!!!!!");
// set the flag so low priority threads won't be launched
m_isLowPrioritySuspended = true;
// . cancel any outstanding low priority threads that are running
// . no, only cancel if we run another disk thread
// . this will happen above in ThreadQueue::launchThread()
//cancelLowPriorityThreads();
}
// this is called by UdpServer::destroySlot() and should NOT be in a sig handlr
void ThreadQueue::resumeLowPriorityThreads() {
// disable for now
//return;
// bail if no need
if ( ! m_isLowPrioritySuspended ) return;
// turn em back on
m_isLowPrioritySuspended = false;
// just in case, though
//if ( g_inSigHandler ) {
// log(LOG_LOGIC,"thread: resumeLowPriorityThreads: In sig "
// "handler.");
// return;
//}
// try to start up some threads then
g_threads.launchThreads();
}
void ThreadQueue::print ( ) {
// loop through candidates
for ( long i = 0 ; i < m_top ; i++ ) {
ThreadEntry *t = &m_entries[i];
// print it
log(LOG_INIT,"thread: address=%lu pid=%u state=%lu "
"occ=%i done=%i lnch=%i",
(unsigned long)t , t->m_pid ,
2013-08-03 00:12:24 +04:00
(unsigned long)t->m_state , t->m_isOccupied , t->m_isDone ,
t->m_isLaunched );
}
}
const char *ThreadQueue::getThreadType ( ) {
const char *s = "unknown";
if ( m_threadType == DISK_THREAD ) s = "disk";
if ( m_threadType == MERGE_THREAD ) s = "merge";
if ( m_threadType == INTERSECT_THREAD ) s = "intersectlists";
2013-08-03 00:12:24 +04:00
if ( m_threadType == FILTER_THREAD ) s = "filter";
if ( m_threadType == SAVETREE_THREAD ) s = "savetree";
if ( m_threadType == UNLINK_THREAD ) s = "unlink";
if ( m_threadType == GENERIC_THREAD ) s = "generic";
return s;
}
#include "BigFile.h" // FileState class
long Threads::getDiskThreadLoad ( long maxNiceness , long *totalToRead ) {
ThreadQueue *q = &m_threadQueues[DISK_THREAD];
ThreadEntry *e = q->m_entries;
long top = q->m_top;
*totalToRead = 0;
long n = 0;
// we really can't suspend threads cuz they might have the
// mutex lock so we just cancel the disk threads here then
for ( long i = 0 ; i < top ; i++ ) {
// get entry
ThreadEntry *t = &e[i];
// skip if not occupied
if ( ! t->m_isOccupied ) continue;
// skip if it's nicer than what we want
if (t->m_niceness > maxNiceness && ! t->m_isLaunched) continue;
// skip if already done
if ( t->m_isDone ) continue;
// cast state data
FileState *fs = (FileState *) t->m_state;
// sometimes NULL, like from Sync.cpp's call
if ( ! fs ) continue;
// only remove read operations, since write operations get
// the fd up front
if ( t->m_doWrite ) continue;
// how many byte to do
//long todo = fs->m_bytesToGo - fs->m_bytesDone;
long todo = t->m_bytesToGo;
// multiply by 2 if a write
if ( t->m_doWrite ) todo *= 2;
// add to total bytes to read
*totalToRead += todo;
// count the thread
n++;
}
return n;
}
// when a BigFile is removed, much like we remove its pages from DiskPageCache
// we also remove any unlaunched reads/writes on it from the thread queue.
void ThreadQueue::removeThreads ( BigFile *bf ) {
// did the BigFile get hosed? that means our BigFile was
// unlinked or closed before we got a chance to launch the
// thread.
long maxi = -1;
for ( long i = 0 ; i < m_top ; i++ ) {
ThreadEntry *t = &m_entries[i];
// skip if not occupied
if ( ! t->m_isOccupied ) continue;
// get the filestate
FileState *fs = (FileState *)t->m_state;
// skip if NULL
if ( ! fs ) continue;
// skip if not match
if ( fs->m_this != (void *)bf ) continue;
// . let it finish writing if it is a write thread
// . otherwise, if we are exiting, we could free the
// buffer being written and cause the thread to core...
if ( fs->m_doWrite ) {
log(LOG_INFO,"disk: Not removing write thread.");
continue;
}
// . should we really? if we renamed the file to another,
// we need to recompute the offsets to read, etc.. so we
// should fail up to Msg5 with EFILECLOSED or something...
// . i think we did a rename and it got the same fd, and since
// we did not remove the launched or done threads after the
// rename, we're not sure if they read from the newly renamed
// file or not, and our read offset was for the old file...
// . at least set the error flag for doneWrapper()
fs->m_errno2 = EFILECLOSED;
// log it
logf(LOG_INFO,"disk: Removing/flagging operation in thread "
"queue. fs=0x%lx", (long)fs);
// skip if already done
if ( t->m_isDone ) continue;
// skip if launched
if ( t->m_isLaunched ) continue;
// note in the log it is launched
log(LOG_INFO,"disk: Thread is launched.");
// tell donewrapper what happened
fs->m_errno = EFILECLOSED;
g_errno = EFILECLOSED;
// note it
//log(LOG_INFO,"disk: Removing operation from thread queue.");
// remove it from the thread queue
t->m_isDone = true;
t->m_isLaunched = false;
t->m_isOccupied = false;
// keep track
maxi = i;
makeCallback ( t );
}
// do we have to decrement top
if ( m_top == maxi + 1 )
while ( m_top>0 && !m_entries[m_top-1].m_isOccupied) m_top--;
g_errno = 0;
}
void Threads::printState() {
long long now = gettimeofdayInMilliseconds();
for ( long i = 0 ; i < m_numQueues; i++ ) {
ThreadQueue *q = &m_threadQueues[i];
long loActive = q->m_loLaunched - q->m_loReturned;
long mdActive = q->m_mdLaunched - q->m_mdReturned;
long hiActive = q->m_hiLaunched - q->m_hiReturned;
long total = loActive + mdActive + hiActive;
if( total == 0) continue;
log(LOG_TIMING,
"admin: Thread counts: type:%s "
"%li:low %li:med %li:high %li:total",
q->getThreadType(), loActive, mdActive, hiActive, total);
for ( long j = 0 ; j < q->m_top ; j++ ) {
ThreadEntry *t = &q->m_entries[j];
if(!t->m_isOccupied) continue;
if(t->m_isDone) {
log(LOG_TIMING,
"admin: Thread -done- "
"nice: %li "
"totalTime: %lli (ms) "
"queuedTime: %lli(ms) "
"runTime: %lli(ms) "
"cleanup: %lli(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
t->m_launchedTime - t->m_queuedTime,
t->m_exitTime - t->m_launchedTime,
now - t->m_exitTime,
g_profiler.getFnName((long)t->m_callback));
continue;
}
if(t->m_isLaunched) {
log(LOG_TIMING,
"admin: Thread -launched- "
"nice: %li "
"totalTime: %lli(ms) "
"queuedTime: %lli(ms) "
"runTime: %lli(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
t->m_launchedTime - t->m_queuedTime,
now - t->m_launchedTime,
g_profiler.getFnName((long)t->m_callback));
continue;
}
log(LOG_TIMING,
"admin: Thread -queued- "
"nice: %li "
"queueTime: %lli(ms) "
"callback:%s",
t->m_niceness,
now - t->m_queuedTime,
g_profiler.getFnName((long)t->m_callback));
}
}
}