#include "gb-include.h" #include "Loop.h" #include "Threads.h" // g_threads.launchThreads() #include "UdpServer.h" // g_udpServer2.makeCallbacks() #include "HttpServer.h" // g_httpServer.m_tcp.m_numQueued #include "Profiler.h" #include "Process.h" #include "PageParser.h" #include "Threads.h" #include "Stats.h" // raised from 5000 to 10000 because we have more UdpSlots now and Multicast // will call g_loop.registerSleepCallback() if it fails to get a UdpSlot to // send on. #define MAX_SLOTS 10000 // apple mac os x does not have real-time signal support // #ifdef __APPLE__ // #define _POLLONLY_ // #endif // TODO: . if signal queue overflows another signal is sent // . capture that signal and use poll or something??? // Tricky Gotchas: // TODO: if an event happens on a TCP fd/socket before we fully accept it // we should just register it then call the read callback in case // we just missed a ready for reading signal!!!!! // TODO: signals can be gotten off the queue after we've closed an fd // in which case the handler should be removed from Loop's registry // BEFORE being closed... so the handler will be NULL... ??? // NOTE: keep in mind that the signals might be delayed or be really fast! // TODO: don't mask signals, catch them as they arrive? (like in phhttpd) // . set this to false to disable async signal handling // . that will make our udp servers less responsive bool g_isHot = true; // extern this for all to use bool g_inSigHandler = false ; // so we know if interrupts are supposed to be enabled/disabled bool g_interruptsOn = false; // are some signals to call g_udpServer2.makeCallbacks() queued? bool g_someAreQueued = false; int32_t g_numAlarms = 0; int32_t g_numVTAlarms = 0; int32_t g_numQuickPolls = 0; int32_t g_missedQuickPolls = 0; int32_t g_numSigChlds = 0; int32_t g_numSigQueues = 0; int32_t g_numSigOthers = 0; // since we can't call gettimeofday() while in a sig handler, we use this // and update it periodically to keep it somewhat accurate int64_t g_now = 0; //int64_t g_nowGlobal = 0; int64_t g_nowApprox = 0; char g_inWaitState = false; // a global class extern'd in .h file Loop g_loop; // the global niceness char g_niceness = 0; // we make sure the same callback/handler is not hogging the cpu when it is // niceness 0 and we do not interrupt it, so this is a critical check class UdpSlot *g_callSlot = NULL; int32_t g_lastTransId = 0; int32_t g_transIdCount = 0; // keep the sig wait time static so we can change it based on m_minTick static struct timespec s_sigWaitTime ; static struct timespec s_sigWaitTime2 ; static struct timespec* s_sigWaitTimePtr ; // use this in case we unregister the "next" callback static Slot *s_callbacksNext; // this is defined in main.cpp //extern bool mainShutdown ( bool urgent ); // set it from milliseconds void Loop::setSigWaitTime ( int32_t ms ) { int32_t secs = ms / 1000; ms -= secs * 1000; s_sigWaitTime.tv_sec = secs; s_sigWaitTime.tv_nsec = ms * 1000000; } // free up all our mem void Loop::reset() { if ( m_slots ) { log(LOG_DEBUG,"db: resetting loop"); mfree ( m_slots , MAX_SLOTS * sizeof(Slot) , "Loop" ); } m_slots = NULL; /* for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) { Slot *s = m_readSlots [ i ]; while ( s ) { Slot *next = s->m_next; mfree ( s , sizeof(Slot) ,"Loop" ); s = next; } m_readSlots [ i ] = NULL; s = m_writeSlots [ i ]; while ( s ) { Slot *next = s->m_next; mfree ( s , sizeof(Slot) , "Loop" ); s = next; } m_writeSlots [ i ] = NULL; } */ } //static void sigHandler_r ( int x , siginfo_t *info , void *v ) ; //static void sigHandlerRT ( int x , siginfo_t *info , void *v ) ; static void sigbadHandler ( int x , siginfo_t *info , void *y ) ; static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ; static void sighupHandler ( int x , siginfo_t *info , void *y ) ; //static void sigioHandler ( int x , siginfo_t *info , void *y ) ; static void sigalrmHandler( int x , siginfo_t *info , void *y ) ; static void sigvtalrmHandler( int x , siginfo_t *info , void *y ) ; //int32_t g_fdWriteBits[MAX_NUM_FDS/32]; //int32_t g_fdReadBits [MAX_NUM_FDS/32]; void Loop::unregisterReadCallback ( int fd, void *state , void (* callback)(int fd,void *state), bool silent ){ if ( fd < 0 ) return; // from reading unregisterCallback ( m_readSlots , fd , state , callback , silent ); } void Loop::unregisterWriteCallback ( int fd, void *state , void (* callback)(int fd,void *state)){ // from writing unregisterCallback ( m_writeSlots , fd , state , callback ); } void Loop::unregisterSleepCallback ( void *state , void (* callback)(int fd,void *state)){ unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback); } static fd_set s_selectMaskRead; static fd_set s_selectMaskWrite; static fd_set s_selectMaskExcept; static int s_readFds[MAX_NUM_FDS]; static int32_t s_numReadFds = 0; static int s_writeFds[MAX_NUM_FDS]; static int32_t s_numWriteFds = 0; void Loop::unregisterCallback ( Slot **slots , int fd , void *state , void (* callback)(int fd,void *state) , bool silent ) { // bad fd if ( fd < 0 ) {log(LOG_LOGIC, "loop: fd to unregister is negative.");return;} // set a flag if we found it bool found = false; // slots is m_readSlots OR m_writeSlots Slot *s = slots [ fd ]; Slot *lastSlot = NULL; // . keep track of new min tick for sleep callbacks // . sleep a min of 40ms so g_now is somewhat up to date int32_t min = 40; // 0x7fffffff; int32_t lastMin = min; // chain through all callbacks registerd with this fd while ( s ) { // get the next slot (NULL if no more) Slot *next = s->m_next; // if we're unregistering a sleep callback // we might have to recalculate m_minTick if ( s->m_tick < min ) { lastMin = min; min = s->m_tick; } // skip this slot if callbacks don't match if ( s->m_callback != callback ) { lastSlot = s; goto skip; } // skip this slot if states don't match if ( s->m_state != state ) { lastSlot = s; goto skip; } // free this slot since it callback matches "callback" //mfree ( s , sizeof(Slot) , "Loop" ); returnSlot ( s ); found = true; // if the last one, then remove the FD from s_fdList // so and clear a bit so doPoll() function is fast if ( slots[fd] == s && s->m_next == NULL ) { for (int32_t i = 0; i < s_numReadFds ; i++ ) { if ( s_readFds[i] != fd ) continue; s_readFds[i] = s_readFds[s_numReadFds-1]; s_numReadFds--; // remove from select mask too FD_CLR(fd,&s_selectMaskRead ); break; } for (int32_t i = 0; i < s_numWriteFds ; i++ ) { if ( s_writeFds[i] != fd ) continue; s_writeFds[i] = s_writeFds[s_numWriteFds-1]; s_numWriteFds--; // remove from select mask too FD_CLR(fd,&s_selectMaskWrite); if ( g_conf.m_logDebugLoop ) log("loop: clearing fd=%"INT32" from " "write #wrts=%"INT32"" ,(int32_t)fd,(int32_t)s_numWriteFds); // FD_CLR(fd,&s_selectMaskExcept); break; } } // debug msg //log("Loop::unregistered fd=%"INT32" state=%"UINT32"", fd, (int32_t)state ); // revert back to old min if this is the Slot we're removing min = lastMin; // excise the previous slot from linked list if ( lastSlot ) lastSlot->m_next = next; else slots[fd] = next; // watch out if we're in the previous callback, we need to // fix the linked list in callCallbacks_ass if ( s_callbacksNext == s ) s_callbacksNext = next; skip: // advance to the next slot s = next; } // set our new minTick if we were unregistering a sleep callback if ( fd == MAX_NUM_FDS ) { m_minTick = min; // . set s_sigWaitTime to m_minTick // . 1 billion nanoseconds = 1 second // . m_minTick is in milliseconds, 1000 ms in a second // . multiply m_minTick in ms by 1 million to get nano setSigWaitTime ( m_minTick ); } // return now if found if ( found ) return; // . otherwise, bitch if we're not silent // . HttpServer.cpp always calls this even if it did not register its // File's fd just to make sure. if ( silent ) return; return; // sometimes the socket is abruptly closed and that calls the // unregisterWriteCallback() for us... so skip this log(LOG_LOGIC, "loop: unregisterCallback: callback not found (fd=%i).",fd); } bool Loop::registerReadCallback ( int fd, void *state, void (* callback)(int fd,void *state ) , int32_t niceness ) { // the "true" answers the question "for reading?" if ( addSlot ( true, fd, state, callback, niceness ) ) return true; return log("loop: Unable to register read callback."); } bool Loop::registerWriteCallback ( int fd, void *state, void (* callback)(int fd, void *state ) , int32_t niceness ) { // the "false" answers the question "for reading?" if ( addSlot ( false, fd, state, callback, niceness ) )return true; return log("loop: Unable to register write callback."); } // tick is in milliseconds bool Loop::registerSleepCallback ( int32_t tick , void *state, void (* callback)(int fd,void *state ) , int32_t niceness ) { if ( ! addSlot ( true, MAX_NUM_FDS, state, callback , niceness ,tick) ) return log("loop: Unable to register sleep callback"); if ( tick < m_minTick ) m_minTick = tick; // wait this int32_t in the sig wait loop setSigWaitTime ( m_minTick ); return true; } // . returns false and sets g_errno on error bool Loop::addSlot ( bool forReading , int fd, void *state, void (* callback)(int fd, void *state), int32_t niceness , int32_t tick ) { // ensure fd is >= 0 if ( fd < 0 ) { g_errno = EBADENGINEER; return log(LOG_LOGIC,"loop: fd to register is negative."); } // sanity if ( fd > MAX_NUM_FDS ) { log("loop: bad fd of %"INT32"",(int32_t)fd); char *xx=NULL;*xx=0; } // . ensure fd not already registered with this callback/state // . prevent dups so you can keep calling register w/o fear Slot *s; if ( forReading ) s = m_readSlots [ fd ]; else s = m_writeSlots [ fd ]; while ( s ) { if ( s->m_callback == callback && s->m_state == state ) { // don't set g_errno for this anymore, just bitch //g_errno = EBADENGINEER; log(LOG_LOGIC,"loop: fd %i is already registered.",fd); return true; } s = s->m_next; } // . make a new slot // . TODO: implement mprimealloc() to pre-alloc slots for us for speed //s = (Slot *) mmalloc ( sizeof(Slot ) ,"Loop"); s = getEmptySlot ( ); if ( ! s ) return false; // for pointing to slot already in position for fd Slot *next ; // store ourselves in the slot for this fd if ( forReading ) { next = m_readSlots [ fd ]; m_readSlots [ fd ] = s; // if not already registered, add to list if ( fdMAX_NUM_FDS){char *xx=NULL;*xx=0;} } // fd == MAX_NUM_FDS if it's a sleep callback //if ( fd < MAX_NUM_FDS ) { //FD_SET ( fd , &m_readfds ); //FD_SET ( fd , &m_exceptfds ); //} } else { next = m_writeSlots [ fd ]; m_writeSlots [ fd ] = s; //FD_SET ( fd , &m_writefds ); // if not already registered, add to list if ( fdMAX_NUM_FDS){char *xx=NULL;*xx=0;} } } // set our callback and state s->m_callback = callback; s->m_state = state; // point to the guy that was registered for fd before us s->m_next = next; // save our niceness for doPoll() s->m_niceness = niceness; // store the tick for sleep wrappers (should be max for others) s->m_tick = tick; // and the last called time for sleep wrappers only really if ( fd == MAX_NUM_FDS ) s->m_lastCall = gettimeofdayInMilliseconds(); // debug msg //log("Loop::registered fd=%i state=%"UINT32"",fd,state); // if fd == MAX_NUM_FDS if it's a sleep callback if ( fd == MAX_NUM_FDS ) return true; // watch out for big bogus fds used for thread exit callbacks if ( fd > MAX_NUM_FDS ) return true; // set fd non-blocking return setNonBlocking ( fd , niceness ) ; } // . now make sure we're listening for an interrupt on this fd // . set it non-blocing and enable signal catching for it // . listen for an interrupt for this fd bool Loop::setNonBlocking ( int fd , int32_t niceness ) { retry: int flags = fcntl ( fd , F_GETFL ) ; if ( flags < 0 ) { // valgrind if ( errno == EINTR ) goto retry; g_errno = errno; return log("loop: fcntl(F_GETFL): %s.",strerror(errno)); } retry9: if ( fcntl ( fd, F_SETFL, flags|O_NONBLOCK|O_ASYNC) < 0 ) { // valgrind if ( errno == EINTR ) goto retry9; g_errno = errno; return log("loop: fcntl(NONBLOCK): %s.",strerror(errno)); } // we use select()/poll now so skip stuff below return true; /* retry8: // tell kernel to send the signal to us when fd is ready for read/write if ( fcntl (fd, F_SETOWN , getpid() ) < 0 ) { g_errno = errno; // valgrind if ( errno == EINTR ) goto retry8; return log("loop: fcntl(F_SETOWN): %s.",strerror(errno)); } // . tell kernel what signal we'd like to recieve when this happens // . additional signal info (including fd) should be available #ifdef _POLLONLY_ return true; #endif // trunc nicess cuz we only get GB_SIGRTMIN+1 to GB_SIGRTMIN+2 signals if ( niceness < -1 ) niceness = -1; if ( niceness > MAX_NICENESS ) niceness = MAX_NICENESS; // debug msg //log("fd on niceness = %"INT32" sig = %"INT32"",niceness,GB_SIGRTMIN +1+niceness); retry6: // . tell kernel to send this signal when fd is ready for read/write // . reserve GB_SIGRTMIN for unmaskable interrupts (niceness = -1) // as used by high priority udp server, g_udpServer2 if ( fcntl (fd, F_SETSIG , GB_SIGRTMIN + 1 + niceness ) < 0 ) { // valgrind if ( errno == EINTR ) goto retry6; g_errno = errno; return log("loop: fcntl(F_SETSIG): %s.",strerror(errno)); } return true; */ } // . if "forReading" is true call callbacks registered for reading on "fd" // . if "forReading" is false call callbacks registered for writing on "fd" // . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now , int32_t niceness ) { // debug msg //if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass start"); //if ( fd != 1024 ) { //if (forReading) fprintf(stderr,"got read sig on fd=%"INT32"\n",(int32_t)fd); //else fprintf(stderr,"got write sig on fd=%"INT32"\n",(int32_t)fd); //} // save the g_errno to send to all callbacks int saved_errno = g_errno; // get the first Slot in the chain that is waiting on this fd Slot *s ; //if ( forReading ) s = m_readSlots [ fd ]; //else s = m_writeSlots [ fd ]; s = m_readSlots [ fd ]; // ensure we called something int32_t numCalled = 0; // a hack fix if ( niceness == -1 && m_inQuickPoll ) niceness = 0; // . now call all the callbacks // . most will re-register themselves (i.e. call registerCallback...() //int64_t startTime = gettimeofdayInMilliseconds(); while ( s ) { // skip this slot if he has no callback if ( ! s->m_callback ) continue; // NOTE: callback can unregister fd for Slot s, so get next //Slot *next = s->m_next; s_callbacksNext = s->m_next; // watch out if clock was set back if ( s->m_lastCall > now ) s->m_lastCall = now; // if we're a sleep callback, check to make sure not premature if ( fd == MAX_NUM_FDS && s->m_lastCall + s->m_tick > now ) { s = s_callbacksNext; continue; } // skip if not a niceness match if ( niceness == 0 && s->m_niceness != 0 ) { s = s_callbacksNext; continue; } // update the lastCall timestamp for this slot if ( fd == MAX_NUM_FDS ) s->m_lastCall = now; // . debug msg // . this is called a lot cuz we process all dgrams/whatever // in one clump so there's a lot of redundant signals //if ( g_conf.m_logDebugUdp && fd != 1024 ) // log("Loop::callCallbacks_ass: for fd=%"INT32" state=%"UINT32"", // fd,(int32_t)s->m_state); // do the callback //int32_t address = 0; // uint64_t profilerStart,profilerEnd; // uint64_t statStart, statEnd; /* if(g_conf.m_profilingEnabled){ address=(int32_t)s->m_callback; g_profiler.startTimer(address, __PRETTY_FUNCTION__); //profilerStart=gettimeofdayInMillisecondsLocal(); //statStart = gettimeofdayInMilliseconds(); } */ //startBlockedCpuTimer(); // log it now if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: enter fd callback fd=%"INT32" " "nice=%"INT32"",(int32_t)fd,(int32_t)s->m_niceness); // sanity check. -1 no longer supported if ( s->m_niceness < 0 ) { char *xx=NULL;*xx=0; } // save it int32_t saved = g_niceness; // set the niceness g_niceness = s->m_niceness; // make sure not 2 if ( g_niceness >= 2 ) g_niceness = 1; // sanity check -- need to be able to quickpoll! //if ( s->m_niceness > 0 && ! g_loop.m_canQuickPoll ) { // char *xx=NULL;*xx=0; } s->m_callback ( fd , s->m_state ); // restore it g_niceness = saved; // log it now if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: exit fd callback fd=%"INT32" " "nice=%"INT32"", (int32_t)fd,(int32_t)s->m_niceness); /* if(g_conf.m_profilingEnabled){ //profilerEnd =gettimeofdayInMillisecondsLocal(); if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__)) log(LOG_WARN,"admin: Couldn't add the fn %"INT32"", (int32_t)address); } */ // . debug msg // . this is called a lot cuz we process all dgrams/whatever // in one clump so there's a lot of redundant signals //if ( g_conf.m_logDebugUdp && fd != 1024 ) // log("Loop::callCallbacks_ass: back"); // inc the flag numCalled++; // reset g_errno so all callbacks for this fd get same g_errno g_errno = saved_errno; // get the next n (will be -1 if no slot after it) s = s_callbacksNext; } s_callbacksNext = NULL; // int64_t now2 = gettimeofdayInMilliseconds(); // int64_t took = now2 - startTime; // if(g_conf.m_profilingEnabled && took > 10) { // g_stats.addStat_r ( 0 , // startTime, // now2, // 0 , // STAT_GENERIC, // __PRETTY_FUNCTION__,__LINE__); // if(g_conf.m_sequentialProfiling) { // log(LOG_TIMING, // "admin: loop time to do %"INT32" callbacks: %"INT64" ms", // numCalled,took); // } // } // log an error if nothing called //if ( ! called ) log("Loop::callCallbacks: no callback for signal"); // debug msg //if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass end"); } Loop::Loop ( ) { // . default sig wait time to 10 ms (10,000,000 nanoseconds) // . 1 billion nanoseconds = 1 second setSigWaitTime ( 1000 /*ms*/ ); s_sigWaitTime2.tv_sec = 0; s_sigWaitTime2.tv_nsec = 0; s_sigWaitTimePtr = &s_sigWaitTime; m_inQuickPoll = false; m_needsToQuickPoll = false; m_canQuickPoll = false; m_isDoingLoop = false; // set all callbacks to NULL so we know they're empty for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) { m_readSlots [i] = NULL; //m_writeSlots[i] = NULL; } // the extra sleep slots //m_readSlots [ MAX_NUM_FDS ] = NULL; m_slots = NULL; } // free all slots from addSlots Loop::~Loop ( ) { reset(); } // returns NULL and sets g_errno if none are left Slot *Loop::getEmptySlot ( ) { Slot *s = m_head; if ( ! s ) { g_errno = EBUFTOOSMALL; log("loop: No empty slots available. " "Increase #define MAX_SLOTS."); return NULL; } m_head = s->m_nextAvail; return s; } void Loop::returnSlot ( Slot *s ) { s->m_nextAvail = m_head; m_head = s; } // . come here when we get a GB_SIGRTMIN+X signal etc. // . do not call anything from here because the purpose of this is to just // queue the signals up and DO DEDUPING which linux does not do causing // the sigqueue to overflow. // . we should break out of the sleep loop after the signal is handled // so we can handle/process the queued signals properly. 'man sleep' // states "sleep() makes the calling process sleep until seconds // seconds have elapsed or a signal arrives which is not ignored." void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) { // if we just needed to cleanup a thread if ( info->si_signo == SIGCHLD ) { g_numSigChlds++; // this has no fd really, Threads.cpp just sends it when // the thread is done g_threads.m_needsCleanup = true; return; } if ( info->si_code == SI_QUEUE ) { g_numSigQueues++; //log("admin: got sigqueue"); // the thread is done g_threads.m_needsCleanup = true; return; } // wtf is this? g_numSigOthers++; // the stuff below should no longer be used since we // do not use F_SETSIG now return; /* // extract the file descriptor that needs attention int fd = info->si_fd; if ( fd >= MAX_NUM_FDS ) { log("loop: CRITICAL ERROR. fd=%i > %i",fd,(int)MAX_NUM_FDS); return; } // set the right callback // info->si_band values: //#define POLLIN 0x0001 // There is data to read //#define POLLPRI 0x0002 // There is urgent data to read //#define POLLOUT 0x0004 // Writing now will not block //#define POLLERR 0x0008 // Error condition //#define POLLHUP 0x0010 // Hung up //#define POLLNVAL 0x0020 // Invalid request: fd not open int band = info->si_band; // translate SIGPIPE's to band of POLLHUP if ( info->si_signo == SIGPIPE ) { band = POLLHUP; //log("loop: Received SIGPIPE signal. Broken pipe."); } // . call the appropriate handler(s) // . TODO: bitch if no callback to handle the read!!!!!!! // . NOTE: when it's connected it sets both POLLIN and POLLOUT // . NOTE: or when a socket is trying to connect to it if it's listener //if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) ) // g_loop.callCallbacks_ass ( true , fd ); // for reading if ( band & POLLIN ) { // keep stats on this now since some linuxes dont work right g_stats.m_readSignals++; //log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd); //g_loop.callCallbacks_ass ( true , fd ); g_fdReadBits[fd/32] = 1<<(fd%32); } else if ( band & POLLPRI ) { // keep stats on this now since some linuxes dont work right g_stats.m_readSignals++; //log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd); //g_loop.callCallbacks_ass ( true , fd ) ; g_fdReadBits[fd/32] = 1<<(fd%32); } else if ( band & POLLOUT ) { // keep stats on this now since some linuxes dont work right g_stats.m_writeSignals++; //log("Loop: write %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd) //g_loop.callCallbacks_ass ( false , fd ); g_fdWriteBits[fd/32] = 1<<(fd%32); } // fix qainject1() test with this else if ( band & POLLERR ) { //log(LOG_INFO,"loop: got POLLERR on fd=%i.",fd); } //g_loop.callCallbacks_ass ( false , fd ); // this happens if the socket closes abruptly // or out of band data, etc... see "man 2 poll" for more info else if ( band & POLLHUP ) { // i see these all the time for fd == 0, so don't print it //if ( fd != 0 ) // log(LOG_INFO,"loop: Received hangup on fd=%i.",fd); // it is ready for writing i guess g_fdWriteBits[fd/32] = 1<<(fd%32); } */ } bool Loop::init ( ) { // clear this up here before using in doPoll() FD_ZERO(&s_selectMaskRead); FD_ZERO(&s_selectMaskWrite); FD_ZERO(&s_selectMaskExcept); // redhat 9's NPTL doesn't like our async signals if ( ! g_conf.m_allowAsyncSignals ) g_isHot = false; #ifdef _VALGRIND_ g_isHot = false; #endif // sighupHandler() will set this to true so we know when to shutdown m_shutdown = 0; // . reset this cuz we have no sleep callbacks right now // . sleep a min of 40ms so g_now is somewhat up to date m_minTick = 40; //0x7fffffff; // reset the need to poll flag m_needToPoll = false; // let 'em know if we're hot if ( g_isHot ) log ( LOG_INIT , "loop: Using asynchronous signals " "for udp server."); // make slots m_slots = (Slot *) mmalloc ( MAX_SLOTS * (int32_t)sizeof(Slot) , "Loop" ); if ( ! m_slots ) return false; // log it log(LOG_DEBUG,"loop: Allocated %"INT32" bytes for %"INT32" callbacks.", MAX_SLOTS * (int32_t)sizeof(Slot),(int32_t)MAX_SLOTS); // init link list ptr for ( int32_t i = 0 ; i < MAX_SLOTS - 1 ; i++ ) { m_slots[i].m_nextAvail = &m_slots[i+1]; } m_slots[MAX_SLOTS - 1].m_nextAvail = NULL; m_head = &m_slots[0]; m_tail = &m_slots[MAX_SLOTS - 1]; // an innocent log msg //log ( 0 , "Loop: starting the i/o loop"); // . when using threads GB_SIGRTMIN becomes 35, not 32 anymore // since threads use these signals to reactivate suspended threads // . debug msg //log("admin: GB_SIGRTMIN=%"INT32"", (int32_t)GB_SIGRTMIN ); // . block the GB_SIGRTMIN signal // . anytime this is raised it goes onto the signal queue // . we use sigtimedwait() to get signals off the queue // . sigtimedwait() selects the lowest signo first for handling // . therefore, GB_SIGRTMIN is higher priority than (GB_SIGRTMIN + 1) //sigfillset ( &sigs ); // set of signals to block sigset_t sigs; sigemptyset ( &sigs ); sigaddset ( &sigs , SIGPIPE ); //if we write to a close socket // #ifndef _VALGRIND_ // sigaddset ( &sigs , GB_SIGRTMIN ); // #endif // sigaddset ( &sigs , GB_SIGRTMIN + 1 ); // sigaddset ( &sigs , GB_SIGRTMIN + 2 ); // sigaddset ( &sigs , GB_SIGRTMIN + 3 ); sigaddset ( &sigs , SIGCHLD ); #ifdef PTHREADS // now since we took out SIGIO... (see below) // we should ignore this signal so it doesn't suddenly stop the gb // process since we took out the SIGIO handler because newer kernels // were throwing SIGIO signals ALL the time, on every datagram // send/receive it seemed and bogged us down. sigaddset ( &sigs , SIGIO ); #endif // . block on any signals in this set (in addition to current sigs) // . use SIG_UNBLOCK to remove signals from block list // . this returns -1 and sets g_errno on error // . we block a signal so it does not interrupt us, then we can // take it off using our call to sigtimedwait() // . allow it to interrupt us now and we will queue it ourselves // to prevent the linux queue from overflowing // . see 'cat /proc//status | grep SigQ' output to see if // overflow occurs. linux does not dedup the signals so when a // host cpu usage hits 100% it seems to miss a ton of signals. // i suspect the culprit is pthread_create() so we need to get // thread pools out soon. // . now we are handling the signals and queueing them ourselves // so comment out this sigprocmask() call // if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) { // g_errno = errno; // return log("loop: sigprocmask: %s.",strerror(g_errno)); // } struct sigaction sa2; // . sa_mask is the set of signals that should be blocked when // we're handling the GB_SIGRTMIN, make this empty // . GB_SIGRTMIN signals will be automatically blocked while we're // handling a SIGIO signal, so don't worry about that // . what sigs should be blocked when in our handler? the same // sigs we are handling i guess gbmemcpy ( &sa2.sa_mask , &sigs , sizeof(sigs) ); sa2.sa_flags = SA_SIGINFO ; //| SA_ONESHOT; // call this function sa2.sa_sigaction = sigHandlerQueue_r; g_errno = 0; if ( sigaction ( SIGPIPE, &sa2, 0 ) < 0 ) g_errno = errno; // if ( sigaction ( GB_SIGRTMIN , &sa2, 0 ) < 0 ) g_errno = errno; // if ( sigaction ( GB_SIGRTMIN + 1, &sa2, 0 ) < 0 ) g_errno = errno; // if ( sigaction ( GB_SIGRTMIN + 2, &sa2, 0 ) < 0 ) g_errno = errno; // if ( sigaction ( GB_SIGRTMIN + 3, &sa2, 0 ) < 0 ) g_errno = errno; if ( sigaction ( SIGCHLD, &sa2, 0 ) < 0 ) g_errno = errno; if ( sigaction ( SIGIO, &sa2, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction(): %s.", mstrerror(g_errno) ); // . we turn this signal on/off to turn interrupts off/on // . clear all signals from the set //sigemptyset ( &m_sigrtmin ); // tmp debug hack, so we don't have real time signals now... //sigaddset ( &m_sigrtmin, GB_SIGRTMIN ); /* // now set up a signal handler to handle just/only SIGIO struct sigaction sa; // . sa_mask is the set of signals that should be blocked when // we're handling the GB_SIGRTMIN, make this empty // . GB_SIGRTMIN signals will be automatically blocked while we're // handling a SIGIO signal, so don't worry about that sigemptyset (&sa.sa_mask); // . these flags determine the signal handling process // . SA_SIGINFO means to use sa.sa_sigaction() as the sig handler // and not sa_.sa_handler() (added in Linux 2.1.86) // . this allows us to get the siginfo_t structure to get the fd // that generated the signal // . SA_ONESHOT restores the handler to the default state when // our sig handler is called so we don't get interuppted by another // signal when we're handling that one // . incoming GB_SIGRTMIN sigs should be queued (sigtimedwait()) sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT; // the handler for unblocked signals, same as other signals really sa.sa_sigaction = sigHandlerRT; // clear g_errno g_errno = 0; // now when we got an unblocked GB_SIGRTMIN signal go here right away // #ifndef _VALGRIND_ // if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno; // if ( g_errno)log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno)); // #endif // set it this way for SIGIO's sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT; // . define the actual routine that handles the SIGIO signal // . void (*sa_sigaction)(int, siginfo_t *, void *); sa.sa_sigaction = sigioHandler; // . register our sigHandler() to handle the GB_SIGRTMIN signal // . when a file/socket is made non-blocking it should have done a: // fcntl(fd,SET_SIG,GB_SIGRTMIN) so we're notified with that signal // . this returns -1 and sets g_errno on error // . TODO: is this the SOURCE signal or the altered signal? #ifndef PTHREADS // i think this was supposed to be sent when the signal queue was // overflowing so we needed to do a poll operation when we got this // signal, however, newer kernel seems to throw this signal all the // time when it just gets IO causing cpu to be 100% floored! // i'm afraid without this code we might miss data on a socket // or not read it promptly, but let's see how it goes. if ( sigaction ( SIGIO, &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGIO: %s.", mstrerror(errno)); #endif */ struct sigaction sa; // . sa_mask is the set of signals that should be blocked when // we're handling the signal, make this empty // . GB_SIGRTMIN signals will be automatically blocked while we're // handling a SIGIO signal, so don't worry about that sigemptyset (&sa.sa_mask); sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT; // handle HUP signals gracefully by saving and shutting down sa.sa_sigaction = sighupHandler; if ( sigaction ( SIGHUP , &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGHUP: %s.", mstrerror(errno)); if ( sigaction ( SIGTERM, &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGTERM: %s.", mstrerror(errno)); // we should save our data on segv, sigill, sigfpe, sigbus sa.sa_sigaction = sigbadHandler; if ( sigaction ( SIGSEGV, &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGSEGV: %s.", mstrerror(errno)); if ( sigaction ( SIGILL , &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGILL: %s.", mstrerror(errno)); if ( sigaction ( SIGFPE , &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGFPE: %s.", mstrerror(errno)); if ( sigaction ( SIGBUS , &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno)); // if the UPS is about to go off it sends a SIGPWR sa.sa_sigaction = sigpwrHandler; if ( sigaction ( SIGPWR, &sa, 0 ) < 0 ) g_errno = errno; //now set up our alarm for quickpoll m_quickInterrupt.it_value.tv_sec = 0; m_quickInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000; m_quickInterrupt.it_interval.tv_sec = 0; m_quickInterrupt.it_interval.tv_usec = QUICKPOLL_INTERVAL * 1000; m_realInterrupt.it_value.tv_sec = 0; // 1000 microseconds in a millisecond m_realInterrupt.it_value.tv_usec = 1 * 1000; m_realInterrupt.it_interval.tv_sec = 0; m_realInterrupt.it_interval.tv_usec = 1 * 1000; m_noInterrupt.it_value.tv_sec = 0; m_noInterrupt.it_value.tv_usec = 0; m_noInterrupt.it_interval.tv_sec = 0; m_noInterrupt.it_interval.tv_usec = 0; //m_realInterrupt.it_value.tv_sec = 0; //m_realInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000; // set the interrupts to off for now //mdw:disableTimer(); // make this 10ms i guess setitimer(ITIMER_REAL, &m_realInterrupt, NULL); // this is 10ms setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL); sa.sa_sigaction = sigalrmHandler; // it's gotta be real time, not virtual cpu time now if ( sigaction ( SIGALRM, &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) return log("loop: sigaction: %s.", mstrerror(errno)); // block sigvtalarm sa.sa_sigaction = sigvtalrmHandler; if ( sigaction ( SIGVTALRM, &sa, 0 ) < 0 ) g_errno = errno; if ( g_errno ) log("loop: sigaction SIGVTALRM: %s.", mstrerror(errno)); // success return true; } // TODO: if we get a segfault while saving, what then? void sigpwrHandler ( int x , siginfo_t *info , void *y ) { // let main process know to shutdown g_loop.m_shutdown = 3; } #include void printStackTrace ( int signum , siginfo_t *info , void *ptr ) { int arch = 32; if ( __WORDSIZE == 64 ) arch = 64; if ( __WORDSIZE == 128 ) arch = 128; // right now only works for 32 bit if ( arch != 32 ) return; logf(LOG_DEBUG,"gb: seg fault. printing stack trace. use " "addr2line to decode the hex below."); static void *s_bt[200]; int sz = backtrace(s_bt, 200); //char **strings = backtrace_symbols(s_bt, sz); for( int i = 0; i < sz; ++i) { //unsigned long long ba; //ba = g_profiler.getFuncBaseAddr((PTRTYPE)s_bt[i]); //sigsegv_outp("%s", strings[i]); //logf(LOG_DEBUG,"[0x%llx->0x%llx] %s" logf(LOG_DEBUG,"[0x%"XINT64"]" ,(uint64_t)s_bt[i] //,ba //,g_profiler.getFnName(ba,0)); ); } } // TODO: if we get a segfault while saving, what then? void sigbadHandler ( int x , siginfo_t *info , void *y ) { // thread should set it errno to 0x7fffffff which means that // Threads.cpp should not look for its ThreadEntry::m_isDone flag // to be set before calling waitpid() on it if ( g_threads.amThread() ) errno = 0x7fffffff; // turn off sigalarms g_loop.disableTimer(); log("loop: sigbadhandler. disabling handler from recall."); // . don't allow this handler to be called again // . does this work if we're in a thread? struct sigaction sa; sigemptyset (&sa.sa_mask); sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT; sa.sa_sigaction = NULL; sigaction ( SIGSEGV, &sa, 0 ) ; sigaction ( SIGILL , &sa, 0 ) ; sigaction ( SIGFPE , &sa, 0 ) ; sigaction ( SIGBUS , &sa, 0 ) ; //sigaction ( SIGALRM, &sa, 0 ) ; // if we've already been here, or don't need to be, then bail if ( g_loop.m_shutdown ) { log("loop: sigbadhandler. shutdown already called."); return; } // unwind printStackTrace( x , info , y ); // if we're a thread, let main process know to shutdown g_loop.m_shutdown = 2; log("loop: sigbadhandler. trying to save now. mode=%"INT32"", (int32_t)g_process.m_mode); // . this will save all Rdb's // . if "urgent" is true it will dump core // . if "urgent" is true it won't broadcast its shutdown to all hosts //#ifndef NO_MAIN // mainShutdown ( true ); // urgent? //#endif g_process.shutdown ( true ); } void sigvtalrmHandler ( int x , siginfo_t *info , void *y ) { // stats g_numVTAlarms++; // see if a niceness 0 algo is hogging the cpu if ( g_callSlot && g_niceness == 0 ) { // are we handling the same request or callback? if ( g_callSlot->m_transId == g_lastTransId ) g_transIdCount++; else g_transIdCount=1; // set it g_lastTransId = g_callSlot->m_transId; // sanity check //if ( g_transIdCount >= 10 ) { char *xx=NULL;*xx=0; } bool logIt = false; if ( g_transIdCount >= 4 ) logIt = true; // do not spam for msg99 handler so much if ( g_callSlot->m_msgType == 0x99 && g_transIdCount != 50 ) logIt = false; // it's not safe to call fprintf() even with // mutex locks for sig handlers with pthreads // going on!!! #ifdef PTHREADS logIt = false; #endif // panic if hogging if ( logIt ) { if ( g_callSlot->m_callback ) log("loop: msg type 0x%hhx reply callback " "hogging cpu for %"INT32" ticks", g_callSlot->m_msgType, g_transIdCount); else log("loop: msg type 0x%hhx handler " "hogging cpu for %"INT32" ticks", g_callSlot->m_msgType, g_transIdCount); } } g_nowApprox += QUICKPOLL_INTERVAL; // 10 ms // sanity check if ( g_loop.m_inQuickPoll && g_niceness != 0 && // seems to happen a lot when doing a qa test because we slow // things down a lot when that happens ! g_conf.m_testParserEnabled && ! g_conf.m_testSpiderEnabled && ! g_conf.m_testSearchEnabled && // likewise if doing a page parser test... ! g_inPageParser && ! g_inPageInject ) { #ifndef PTHREADS // i guess sometimes niceness 1 things call niceness 0 things? log("loop: crap crap crap!!!"); #endif //char *xx=NULL;*xx=0; } } // basically ignore this alarm if already in a quickpoll if ( g_loop.m_inQuickPoll ) return; if ( ! g_conf.m_useQuickpoll ) return; g_loop.m_needsToQuickPoll = true; //fprintf(stderr,"missed=%"INT32"\n",g_missedQuickPolls); // another missed quickpoll if ( g_niceness == 1 ) g_missedQuickPolls++; // reset if niceness is 0 else if ( g_niceness == 0 ) g_missedQuickPolls = 0; // if we missed to many, then dump core if ( g_niceness == 1 && g_missedQuickPolls >= 4 ) { //g_inSigHandler = true; // NOT SAFE for pthreads cuz we're in sig handler #ifndef PTHREADS log("loop: missed quickpoll"); #endif //g_inSigHandler = false; // seems to core a lot in gbcompress() we need to // put a quickpoll into zlib deflate() or // deflat_slot() or logest_match() function // for now do not dump core --- re-enable this later // mdw TODO //char *xx=NULL;*xx=0; } // if it has been a while since heartbeat (> 10000ms) dump core so // we can see where the process was... that is a missed quick poll? if ( g_process.m_lastHeartbeatApprox == 0 ) return; if ( g_conf.m_maxHeartbeatDelay <= 0 ) return; if ( g_nowApprox - g_process.m_lastHeartbeatApprox > g_conf.m_maxHeartbeatDelay ) { #ifndef PTHREADS logf(LOG_DEBUG,"gb: CPU seems blocked. Forcing core."); #endif //char *xx=NULL; *xx=0; } //logf(LOG_DEBUG, "xxx now: %"INT64"! approx: %"INT64"", g_now, g_nowApprox); } void sigalrmHandler ( int x , siginfo_t *info , void *y ) { // so we don't call gettimeofday() thousands of times a second... g_clockNeedsUpdate = true; // stats g_numAlarms++; // . see where we are in the code // . for computing cpu usage // . if idling we will be in sigtimedwait() at the lowest level Host *h = g_hostdb.m_myHost; // if doing injects... if ( ! h ) return; // . i guess this means we were doing something... (otherwise idle) // . this is KINDA like a 100 point sample, but it has crazy decay // logic built into it if ( ! g_inWaitState ) h->m_pingInfo.m_cpuUsage = .99 * h->m_pingInfo.m_cpuUsage + .01 * 100; else h->m_pingInfo.m_cpuUsage = .99 * h->m_pingInfo.m_cpuUsage + .01 * 000; if ( g_profiler.m_realTimeProfilerRunning ) g_profiler.getStackFrame(0); } static sigset_t s_rtmin; void maskSignals() { static bool s_init = false; if ( ! s_init ) { s_init = true; sigemptyset ( &s_rtmin ); // sigaddset ( &s_rtmin, GB_SIGRTMIN ); // sigaddset ( &s_rtmin, GB_SIGRTMIN + 1 ); // sigaddset ( &s_rtmin, GB_SIGRTMIN + 2 ); // sigaddset ( &s_rtmin, GB_SIGRTMIN + 3 ); sigaddset ( &s_rtmin, SIGCHLD ); sigaddset ( &s_rtmin, SIGIO ); sigaddset ( &s_rtmin, SIGPIPE ); } // block it if ( sigprocmask ( SIG_BLOCK , &s_rtmin, 0 ) < 0 ) { log("loop: maskSignals: sigprocmask: %s.", strerror(errno)); return; } } void unmaskSignals() { // unblock it if ( sigprocmask ( SIG_UNBLOCK , &s_rtmin, 0 ) < 0 ) { log("loop: unmaskSignals: sigprocmask: %s.", strerror(errno)); return; } } // shit, we can't make this realtime!! RdbClose() cannot be called by a // real time sig handler void sighupHandler ( int x , siginfo_t *info , void *y ) { // let main process know to shutdown g_loop.m_shutdown = 1; } // . keep a timestamp for the last time we called the sleep callbacks // . we have to call those every 1 second int64_t s_lastTime = 0; bool Loop::runLoop ( ) { #ifndef _POLLONLY_ // set of signals to watch for sigset_t sigs0; //sigset_t sigs1; // clear all signals from the set sigemptyset ( &sigs0 ); //sigemptyset ( &sigs1 ); // . set sigs on which sigtimedwait() listens for // . add this signal to our set of signals to watch (currently NONE) sigaddset ( &sigs0, SIGPIPE ); // #ifndef _VALGRIND_ // sigaddset ( &sigs0, GB_SIGRTMIN ); // #endif // sigaddset ( &sigs0, GB_SIGRTMIN + 1 ); // sigaddset ( &sigs0, GB_SIGRTMIN + 2 ); // sigaddset ( &sigs0, GB_SIGRTMIN + 3 ); sigaddset ( &sigs0, SIGCHLD ); //sigaddset ( &sigs0, SIGVTALRM ); // . TODO: do we need to mask SIGIO too? (sig queue overflow?) // . i would think so, because what if we tried to queue an important // handler to be called in the high priority UdpServer but the queue // was full? Then we would finish processing the signals on the queue // before we would address the excluded high priority signals by // calling doPoll() sigaddset ( &sigs0, SIGIO ); // . set up a time to block waiting for signals to be 1/2 a second // . 1 billion nanoseconds = 1 second //struct timespec t = { 0 /*seconds*/, 500000000 /*nanoseconds*/}; //struct timespec t = { 0 /*seconds*/, 10000000 /*nanoseconds*/}; // grab any high priority sig first //siginfo_t info ; //int32_t sigNum ; //= sigwaitinfo ( &sigs1, &info ); #endif s_lastTime = 0; // . allow us to be interrupted // . UNBLOCKs GB_SIGRTMIN // . makes g_udpServer2 quite jumpy g_loop.interruptsOn(); m_isDoingLoop = true; //mdw:enableTimer(); // . now loop forever waiting for signals // . but every second check for timer-based events BIGLOOP: g_now = gettimeofdayInMilliseconds(); //set the time back to its exact value and reset //the timer. g_nowApprox = g_now; // MDW: won't this hog cpu? just don't disable it in // Process::save2() any more and it should be ok //enableTimer(); m_lastPollTime = g_now; m_needsToQuickPoll = false; /* // test the heartbeat core... if ( g_numAlarms > 100 ) { goo: int32_t j; for ( int32_t k = 0 ; k < 2000000000 ; k++ ) { j=k *5; } goto goo; } */ g_errno = 0; if ( m_shutdown ) { // a msg if (m_shutdown==1) log(LOG_INIT,"loop: got SIGHUP or SIGTERM."); else if (m_shutdown==2) log(LOG_INIT,"loop: got SIGBAD in thread."); else log(LOG_INIT,"loop: got SIGPWR."); // . turn off interrupts here because it doesn't help to do // it in the thread // . TODO: turn off signals for sigbadhandler() interruptsOff(); // if thread got the signal, just wait for him to save all // Rdbs and then dump core if ( m_shutdown == 2 ) { //log(0,"Thread is saving & shutting down urgently."); //while ( 1 == 1 ) sleep (50000); log("loop: Resuming despite thread crash."); m_shutdown = 0; goto BIGLOOP; } // otherwise, thread did not save, so we must do it log ( LOG_INIT ,"loop: Saving and shutting down urgently."); // . this will save all Rdb's and dump core // . since "urgent" is true it won't broadcast its shutdown // to all hosts //#ifndef NO_MAIN //mainShutdown( true ); // urgent? //#endif g_process.shutdown ( true ); } // // // THE HEART OF GB. process events/signals on FDs. // // doPoll(); goto BIGLOOP; // make compiler happy return 0; //g_udpServer2.sendPoll_ass(true,g_now); //g_udpServer2.process_ass ( g_now ); // MDW: see if this works without this junk, if not then // put it back in // seems like never getting signals on redhat 16-core box. // we always process dgrams through this... wtf? try taking // it out and seeing what's happening //g_udpServer.sendPoll_ass (true,g_now); //g_udpServer.process_ass ( g_now ); // and dns now too //g_dns.m_udpServer.sendPoll_ass(true,g_now); //g_dns.m_udpServer.process_ass ( g_now ); // if there was a high niceness http request within a // quickpoll, we stored it and now we'll call it here. //g_httpServer.callQueuedPages(); //g_udpServer.printState ( ); // if ( g_someAreQueued ) { // // assume none are queued now, we may get interrupted // // and it may get set back to true // g_someAreQueued = false; // //g_udpServer2.makeCallbacks_ass ( 0 ); // //g_udpServer2.makeCallbacks_ass ( 1 ); // } // if ( g_threads.m_needsCleanup ) { // // bitch about // static bool s_bitched = false; // if ( ! s_bitched ) { // log(LOG_REMIND,"loop: Lost thread signal."); // s_bitched = true; // } // } //cleanup and launch threads: //g_threads.printState(); //g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms // do it anyway // take this out as well to see if signals are coming in //doPoll(); //while ( m_needToPoll ) doPoll(); //#ifndef _POLLONLY_ // hack //char buffer[100]; //if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) { // logf(LOG_DEBUG,"CLOSED CLOSED!!"); //} //g_errno = 0; //check for pending signals, return right away if none. //then we'll do the low priority stuff while we were //supposed to be sleeping. //g_inWaitState = true; //sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ; //#undef usleep // now we just usleep(). an arriving signal will call // sigHandlerQueue_r() then break us out of this sleep. // 10000 microseconds is 10 milliseconds. it should break out // when a signal comes in just like the sleep() function. //usleep(1000 * 10); // reinstate the thing that prevents us from non-chalantly adding // usleeps() which could degrade performance //#define usleep(a) { char *xx=NULL;*xx=0; } // if no signal, we just waited 20 ms and nothing happened // why do we need this now? MDW //if ( sigNum == -1 ) // sigalrmHandler( 0,&info,NULL); //logf(LOG_DEBUG,"loop: sigNum=%"INT32" signo=%"INT32" alrm=%"INT32"", // (int32_t)sigNum,info.si_signo,(int32_t)SIGVTALRM); // no longer in a wait state... //g_inWaitState = false; // int32_t n = MAX_NUM_FDS / 32; // // process file descriptor callbacks for file descriptors // // we queued in sigHandlerQueue_r() function above. // // we use an array of 1024 bits like the poll function i guess. // for ( int32_t i = 0 ; i < n ; i++ ) { // // this is a 32-bit number // if ( ! g_fdReadBits[i] ) continue; // // scan the individual bits now // for ( int32_t j = 0 ; j < 32 ; j++ ) { // // mask mask // uint32_t mask = 1 << j; // // skip jth bit if not on // if ( ! g_fdReadBits[i] & mask ) continue; // // block signals for just a sec so we can // // clear it now that we've handled it // //maskSignals(); // // clear it // g_fdReadBits[i] &= ~mask; // // reinstate signals // //unmaskSignals(); // // construct the file descriptor // int32_t fd = i*32 + j; // // . call all callbacks registered on this fd // // . forReading = true // callCallbacks_ass ( true , fd , g_now ); // } // } // // do the same thing but for writing now // for ( int32_t i = 0 ; i < n ; i++ ) { // // this is a 32-bit number // if ( ! g_fdWriteBits[i] ) continue; // // scan the individual bits now // for ( int32_t j = 0 ; j < 32 ; j++ ) { // // mask mask // uint32_t mask = 1 << j; // // skip jth bit if not on // if ( ! g_fdWriteBits[i] & mask ) continue; // // block signals for just a sec so we can // // clear it now that we've handled it // //maskSignals(); // // clear it // g_fdWriteBits[i] &= ~mask; // // reinstate signals // //unmaskSignals(); // // construct the file descriptor // int32_t fd = i*32 + j; // // . call all callbacks registered on this fd. // // . forReading = false. // callCallbacks_ass ( false , fd , g_now ); // } // } // int64_t elapsed = g_now - s_lastTime; // // if someone changed the system clock on us, this could be negative // // so fix it! otherwise, times may NEVER get called in our lifetime // if ( elapsed < 0 ) elapsed = m_minTick; // // call this every (about) m_minTicks milliseconds // if ( elapsed >= m_minTick ) { // // MAX_NUM_FDS is the fd for sleep callbacks // callCallbacks_ass ( true , MAX_NUM_FDS , g_now ); // // note the last time we called them // //g_now = gettimeofdayInMilliseconds(); // s_lastTime = g_now; // } // // call remaining callbacks for udp msgs // if ( g_udpServer.needBottom() ) // g_udpServer.makeCallbacks_ass ( 2 ); //if(g_udpServer2.needBottom()) // g_udpServer2.makeCallbacks_ass ( 2 ); // if(gettimeofdayInMillisecondsLocal() - // startTime > 10) // goto notime; // if ( g_conf.m_sequentialProfiling ) // g_threads.printState(); // if ( g_threads.m_needsCleanup ) // // limit to 4ms. cleanup any niceness thread. // g_threads.timedCleanUp(4 ,MAX_NICENESS); // #endif /* if ( sigNum < 0 ) { if ( errno == EAGAIN || errno == EINTR || errno == EILSEQ || errno == 0 ) { sigNum = 0; errno = 0; } else if ( errno != ENOMEM ) { log("loop: sigtimedwait(): %s.", strerror(errno)); continue; } } if ( sigNum == 0 ) { //no signals pending, try to take care of anything // left undone: int64_t startTime =gettimeofdayInMillisecondsLocal(); if(g_now & 1) { if(g_udpServer.needBottom()) g_udpServer.makeCallbacks_ass ( 2 ); //if(g_udpServer2.needBottom()) // g_udpServer2.makeCallbacks_ass ( 2 ); if(gettimeofdayInMillisecondsLocal() - startTime > 10) goto notime; if(g_conf.m_sequentialProfiling) g_threads.printState(); if(g_threads.m_needsCleanup) g_threads.timedCleanUp(4 , // ms MAX_NICENESS); } else { if(g_conf.m_sequentialProfiling) g_threads.printState(); if(g_threads.m_needsCleanup) g_threads.timedCleanUp(4 , // ms MAX_NICENESS); if(gettimeofdayInMillisecondsLocal() - startTime > 10) goto notime; if(g_udpServer.needBottom()) g_udpServer.makeCallbacks_ass ( 2 ); //if(g_udpServer2.needBottom()) // g_udpServer2.makeCallbacks_ass ( 2 ); } notime: //if we still didn't get all of them cleaned up set //sleep time to none. if(g_udpServer.needBottom() ) { //g_udpServer2.needBottom()) { s_sigWaitTimePtr = &s_sigWaitTime2; } else { //otherwise set it to minTick s_sigWaitTimePtr = &s_sigWaitTime; } } // // we got a signal, process it // else { if ( info.si_code == SIGIO ) { log("loop: got sigio"); m_needToPoll = true; } // handle the signal else sigHandler_r ( 0 , &info , NULL ); } */ // } while(1) /* loop: g_now = gettimeofdayInMilliseconds(); g_errno = 0; // debug msg //if ( g_conf.m_logDebugUdp ) log("Loop: top of loop"); // shutdown if we got a critical signal if ( m_shutdown ) { // a msg if ( m_shutdown == 1 ) log("loop: got SIGHUP or SIGTERM."); else if ( m_shutdown == 2 ) log("loop: got SIGBAD in thread."); else log("loop: got SIGPWR."); // . turn off interrupts here because it doesn't help to do // it in the thread // . TODO: turn off signals for sigbadhandler() interruptsOff(); // if thread got the signal, just wait for him to save all // Rdbs and then dump core if ( m_shutdown == 2 ) { log("loop: Cored in thread."); //log ("Thread is saving and shutting down urgently."); //while ( 1 == 1 ) sleep (50000); //log("loop: Resuming despite thread crash."); //m_shutdown = 0; //goto resume; } // otherwise, thread did not save, so we must do it log ( "loop: Saving and shutting down urgently."); // sleep forver now so we can call findPtr() function // after calling g_mem.printBreeches(0) log("loop: sleeping forever."); sleep(1000000); // . this will save all Rdb's and dump core // . since "urgent" is true it won't broadcast its shutdown // to all hosts //#ifndef NO_MAIN //mainShutdown( true ); // urgent? //#endif // urgent = true g_process.shutdown ( true ); } // resume: // . get the time now, sigHandlerRT() can use this, cuz gettimeofday() // ain't async signal safe // . g_now only used in hot udpServer for doing time outs really g_now = gettimeofdayInMilliseconds(); // clear any g_errno before possibly calling sendPoll_ass() g_errno = 0; // occasionaly call to sendto() will not send a dgram and since we // don't count on receiving ready-to-write signals on our // UdpServer's fds we check them here... it sucks, but hopefully it // fixes the problem of requests not getting fully transmitted // and stagnating in the UdpServer. //if (g_udpServer2.m_needToSend) g_udpServer2.sendPoll_ass(true,g_now); //if (g_udpServer.m_needToSend ) g_udpServer.sendPoll_ass (true,g_now); // . well, sender may be choking even with m_needsToSend var // . i bet we're just losing signals // . TODO: do we lose them when in the handler? if so, send a signal // so loop will read/write after coming out of the async handler //g_udpServer2.sendPoll_ass(true,g_now); // and also read just in case, too //g_udpServer2.process_ass ( g_now ); // and the low priority guy g_udpServer.sendPoll_ass (true,g_now); g_udpServer.process_ass ( g_now ); // and dns now too g_dns.m_udpServer.sendPoll_ass(true,g_now); g_dns.m_udpServer.process_ass ( g_now ); // tcp server contained in http server has the same problem //if ( g_httpServer.m_tcp.m_numQueued > 0 ) // g_httpServer.m_tcp.writeSocketsInQueue(); // . likewise, we may have not got the SIGQUEUE signal from when // UdpServer wanted to call a callback but couldn't because it was // in an async signal handler, and then at SIGQUEUE sig got lost... // . this should clear those completed transactions we sometimes // see in the UdpServer socket table if ( g_someAreQueued ) { // assume none are queued now, we may get interrupted // and it may get set back to true g_someAreQueued = false; //g_udpServer2.makeCallbacks_ass ( 0 ); //g_udpServer2.makeCallbacks_ass ( 1 ); } // clean up threads in case signal got lost somehow if ( g_threads.m_needsCleanup ) { // bitch about static bool s_bitched = false; if ( ! s_bitched ) { log(LOG_REMIND,"loop: Lost thread signal."); s_bitched = true; } // assume not any more //g_threads.m_needsCleanup = false; // check thread queue for any threads that completed // so we can call their callbacks and remove them g_threads.cleanUp ( 0 , 1000) ; // max niceness // launch any threads in waiting since this sig was // from a terminating one g_threads.launchThreads(); } // clear any g_errno before calling sleep callbacks g_errno = 0; // get time difference elapsed = g_now - s_lastTime; // if someone changed the system clock on us, this could be negative // so fix it! otherwise, times may NEVER get called in our lifetime if ( elapsed < 0 ) elapsed = m_minTick; // print log msgs we accumulated while in a signal handler //if ( g_log.needsPrinting() ) g_log.printBuf(); // . poll if we need to // . make it a while loop incase a hot sig handler resets m_needToPoll // . CAUTION: WE CAN LOOP IN HERE FOR ~ A MINUTE! I've seen it happen // when RdbDump was going... while ( m_needToPoll ) doPoll(); // call this every (about) 1 second if ( elapsed >= m_minTick ) { // MAX_NUM_FDS is the fd for sleep callbacks callCallbacks_ass ( true , MAX_NUM_FDS , g_now ); // note the last time we called them s_lastTime = g_now; } // just do polls if this linux doesn't support this: #ifdef _POLLONLY_ doPoll(); #endif #ifndef _POLLONLY_ // cancel silly errors //if ( g_errno == EAGAIN ) { sigNum = 0; g_errno = 0; } //if ( g_errno == EINTR ) { sigNum = 0; g_errno = 0; } // if sigNum is valid then handle it //sigHandler_r ( 0 , &info , NULL ); // subloop: // debug msg //if ( g_conf.m_logDebugUdp ) log("Loop: entering sigwait"); // . this has a timer resolution of 20ms, I imagine due to how the // kernel time slices between processes // . this means UdpServer can not effectively have a wait between // resends of less than 20ms which makes it a little less zippy sigNum = sigtimedwait (&sigs0, &info, &s_sigWaitTime ) ; // cancel silly errors if ( sigNum < 0 && errno == EAGAIN ) { sigNum = 0; errno = 0; } if ( sigNum < 0 && errno == EINTR ) { sigNum = 0; errno = 0; } // a zero signal is no signal, just a wake up call if ( sigNum == 0 ) goto loop; // sigNum is < 0 on error if ( sigNum < 0 ) { // this error happens on the newer libc for some reason // so ignore it if ( errno != ENOMEM ) log("loop: sigtimedwait(): %s.",strerror(errno)); goto loop; } // sleep test //log("SLEEPING"); //sleep(10); //for ( int64_t i = 0 ; i < 1000000000000LL ; i++ ); // . we use g_now in UdpServer.cpp and should make it // as accurate as possible // . but it's main use is because gettimeofday() is not async sig safe g_now = gettimeofdayInMilliseconds(); // debug msg //if ( g_conf.m_logDebugUdp ) log("Loop: processing signal"); // call sighandler on other queued signals and continue looping //log("Loop::runLoop:got queued signum=%"INT32"",sigNum); // was it a SIGIO? if ( info.si_code == SIGIO ) doPoll(); // handle the signal else sigHandler_r ( 0 , &info , NULL ); #endif // don't call any time handlers until no more signals waiting //goto subloop; // we need to make g_now as accurate as possible for hot UdpServer... goto loop; */ } // . the kernel sends a SIGIO signal when the sig queue overflows // . we resort to polling the fd's when that happens // void sigioHandler ( int x , siginfo_t *info , void *y ) { // // set the m_needToPoll flag // g_loop.m_needToPoll = true; // return; // } //--- TODO: flush the signal queue after polling until done //--- are we getting stale signals resolved by flush so we get //--- read event on a socket that isnt in read mode??? // TODO: set signal handler to SIG_DFL to prevent signals from queuing up now // . this handles high priority fds first (lowest niceness) void Loop::doPoll ( ) { // set time //g_now = gettimeofdayInMilliseconds(); // debug msg //log("**************** GOT SIGIO *************"); // . turn it off here so it can be turned on again after we've // called select() so we don't lose any fd events through the cracks // . some callbacks we call make trigger another SIGIO, but if they // fail they should set Loop::g_needToPoll to true m_needToPoll = false; // debug msg //if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Entered doPoll."); if ( g_conf.m_logDebugLoop) log(LOG_DEBUG,"loop: Entered doPoll."); // print log if ( g_log.needsPrinting() ) g_log.printBuf(); // sigqueue() might have been called from a hot udp server and // we queued some handlers to be called if ( g_someAreQueued ) { // assume none are queued now, we may get interrupted // and it may get set back to true g_someAreQueued = false; //g_udpServer2.makeCallbacks_ass ( 0 ); //g_udpServer2.makeCallbacks_ass ( 1 ); } if(g_udpServer.needBottom()) g_udpServer.makeCallbacks_ass ( 1 ); //if(g_udpServer2.needBottom()) g_udpServer2.makeCallbacks_ass ( 1 ); //bool processedOne; int32_t n; // int32_t repeats = 0; // skipLowerPriorities: // descriptor bits for calling select() // fd_set readfds; // fd_set writefds; // fd_set exceptfds; // clear fds for select() //FD_ZERO ( &readfds ); //FD_ZERO ( &writefds ); //FD_ZERO ( &exceptfds ); timeval v; v.tv_sec = 0; if ( m_inQuickPoll ) v.tv_usec = 0; // 10ms for sleepcallbacks so they can be called... // and we need this to be the same as sigalrmhandler() since we // keep track of cpu usage here too, since sigalrmhandler is "VT" // based it only goes off when that much "cpu time" has elapsed. else v.tv_usec = QUICKPOLL_INTERVAL * 1000; // set descriptors we should watch // MDW: no longer necessary since we have s_selectMaskRead, etc. // for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // if ( m_readSlots [i] ) { // FD_SET ( i , &readfds ); // FD_SET ( i , &exceptfds ); // } // if ( m_writeSlots[i] ) { // FD_SET ( i , &writefds ); // FD_SET ( i , &exceptfds ); // } // } again: // gotta copy to our own since bits get cleared by select() function fd_set readfds; fd_set writefds; fd_set exceptfds; gbmemcpy ( &readfds, &s_selectMaskRead , sizeof(fd_set) ); gbmemcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) ); //gbmemcpy ( &exceptfds, &s_selectMaskExcept , sizeof(fd_set) ); // what is the point of fds for writing... its for when we // get a new socket via accept() it is read for writing... //FD_ZERO ( &writefds ); FD_ZERO ( &exceptfds ); if ( g_conf.m_logDebugLoop ) log("loop: in select"); // used to measure cpu usage. sigalarm needs to know if we are // sitting idle in select() or are actively doing something w/ the cpu g_inWaitState = true; // for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // // continue if not set for reading // if ( FD_ISSET ( i , &s_selectMaskRead ) || // FD_ISSET ( i , &writefds ) || // FD_ISSET ( i , &exceptfds ) ) // // debug // log("loop: fd %"INT32" is set",i); // // if niceness is not -1, handle it below // } // . poll the fd's searching for socket closes // . the sigalrms and sigvtalrms and SIGCHLDs knock us out of this // select() with n < 0 and errno equal to EINTR n = select (MAX_NUM_FDS, &readfds, &writefds, &exceptfds, &v ); g_inWaitState = false; if ( g_conf.m_logDebugLoop ) log("loop: out select n=%"INT32" errno=%"INT32" errnomsg=%s", (int32_t)n,(int32_t)errno,mstrerror(errno)); if ( n < 0 ) { // valgrind if ( errno == EINTR ) { // got it. if we get a sig alarm or vt alarm or // SIGCHLD (from Threads.cpp) we end up here. //log("loop: got errno=%"INT32"",(int32_t)errno); // if shutting own was it a sigterm ? if ( m_shutdown ) goto again; // handle returned threads for niceness 0 g_threads.timedCleanUp(-3,0); // 3 ms if ( m_inQuickPoll ) goto again; // high niceness threads g_threads.timedCleanUp(-4,MAX_NICENESS); // 3 ms goto again; } g_errno = errno; log("loop: select: %s.",strerror(g_errno)); return; } // if we wait for 10ms with nothing happening, fix cpu usage here too // if ( n == 0 ) { // Host *h = g_hostdb.m_myHost; // h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000; // } // debug msg if ( g_conf.m_logDebugLoop) logf(LOG_DEBUG,"loop: Got %"INT32" fds waiting.",n); if ( g_conf.m_logDebugLoop) { for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // continue if not set for reading if ( FD_ISSET ( i , &readfds ) ) log("loop: fd %"INT32" is on for read",i); if ( FD_ISSET ( i , &writefds ) ) log("loop: fd %"INT32" is on for write",i); if ( FD_ISSET ( i , &exceptfds ) ) log("loop: fd %"INT32" is on for except",i); // debug // if niceness is not -1, handle it below } } // . reset the need to poll flag if everything is caught up now // . let's take this out for now ... won't this leave some // threads hanging, they do not always generate SIGIO's if // the sigqueue is full!! LET'S SEE... mdw //if ( n == 0 ) { // // deal with any threads before returning // g_threads.cleanUp ( NULL , 1000 /*max niceness*/ ); // g_threads.launchThreads(); // return; //} //processedOne = false; // a Slot ptr Slot *s; g_now = gettimeofdayInMilliseconds(); /* // call g_udpServer sig handlers for niceness -1 here for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // continue if not set for reading if ( ! FD_ISSET ( i , &readfds ) ) continue; // if niceness is not -1, handle it below s = m_readSlots [ i ]; if ( s && s->m_niceness != -1 ) continue; callCallbacks_ass (true,i, g_now); // for reading = true processedOne = true; } for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { if ( ! FD_ISSET ( i , &writefds ) ) continue; // if niceness is not -1, handle it below s = m_writeSlots [ i ]; if ( s && s->m_niceness != -1 ) continue; callCallbacks_ass (false,i, g_now); // for reading = false processedOne = true; } */ // handle returned threads for niceness -1 //g_threads.timedCleanUp(2/*ms*/); bool calledOne = false; // now keep this fast, too. just check fds we need to. for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) { if ( n == 0 ) break; int fd = s_readFds[i]; s = m_readSlots [ fd ]; // if niceness is not 0, handle it below if ( s && s->m_niceness > 0 ) continue; // must be set if ( FD_ISSET ( fd , &readfds ) ) { if ( g_conf.m_logDebugLoop ) log("loop: calling cback0 niceness=%"INT32" fd=%i" , s->m_niceness , fd ); calledOne = true; callCallbacks_ass (true/*forReading?*/,fd, g_now,0); } // fds are always ready for writing so take this out. // our read callbacks always try to do a write as well. if ( FD_ISSET ( fd , &writefds ) ) { if ( g_conf.m_logDebugLoop ) log("loop: calling cback0 niceness=%"INT32" fd=%i" , s->m_niceness , fd ); calledOne = true; callCallbacks_ass (false/*forReading?*/,fd, g_now,0); } } // for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) { // if ( n == 0 ) break; // int fd = s_writeFds[i]; // s = m_writeSlots [ fd ]; // // if niceness is not 0, handle it below // if ( s && s->m_niceness > 0 ) continue; // // must be set // if ( FD_ISSET ( fd , &writefds ) ) // callCallbacks_ass (false/*forReading?*/,fd, g_now,1); // } // handle returned threads for niceness 0 g_threads.timedCleanUp(-3,0); // 3 ms // // the stuff below is not super urgent, do not do if in quickpoll // if ( m_inQuickPoll ) return; // now for lower priority fds for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) { if ( n == 0 ) break; int fd = s_readFds[i]; s = m_readSlots [ fd ]; // if niceness is not 0, handle it below if ( s && s->m_niceness <= 0 ) continue; // must be set if ( FD_ISSET ( fd , &readfds ) ) { if ( g_conf.m_logDebugLoop ) log("loop: calling cback1 niceness=%"INT32" fd=%i" , s->m_niceness , fd ); calledOne = true; callCallbacks_ass (true/*forReading?*/,fd, g_now,1); } // fds are always ready for writing so take this out. // our read callbacks always try to do a write as well. if ( FD_ISSET ( fd , &writefds ) ) { if ( g_conf.m_logDebugLoop ) log("loop: calling cback1 niceness=%"INT32" fd=%i" , s->m_niceness , fd ); calledOne = true; callCallbacks_ass (false/*forReading?*/,fd, g_now,1); } } // for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) { // if ( n == 0 ) break; // int fd = s_writeFds[i]; // s = m_writeSlots [ fd ]; // // if niceness is not 0, handle it below // if ( s && s->m_niceness <= 0 ) continue; // // must be set // if ( FD_ISSET ( fd , &writefds ) ) // callCallbacks_ass (false/*forReading?*/,fd, g_now,1); // } //if ( ! calledOne ) // log("loop: select returned n=%"INT32" but nothing called.",n); // . MDW: replaced this with more efficient logic above // . call the callback for each fd we got // . only call callbacks for fds that have a nice of 0 here // for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // // continue if not set for reading // if ( ! FD_ISSET ( i , &readfds ) ) continue; // // if niceness is not 0, handle it below // s = m_readSlots [ i /*fd*/ ]; // if ( s && s->m_niceness != 0 ) continue; // callCallbacks_ass (true/*forReading?*/,i, g_now); // processedOne = true; // } // // only call callbacks for fds that have a nice of 0 here // for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // if ( ! FD_ISSET ( i , &writefds ) ) continue; // // if niceness is not 0, handle it below // s = m_writeSlots [ i /*fd*/ ]; // if ( s && s->m_niceness != 0 ) continue; // callCallbacks_ass (false/*forReading?*/,i, g_now); // processedOne = true; // } // #if 0 // if(processedOne && repeats < QUERYPRIORITYWEIGHT) { // //m_needToPoll = true; // repeats++; // goto skipLowerPriorities; // } // // log(LOG_WARN, // // "Loop: repeated %"INT32" times before moving to lower priority threads", // // repeats); // #endif // // handle low priority fds here // for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // // continue if not set for reading // if ( ! FD_ISSET ( i , &readfds ) ) continue; // // if niceness is 0, we already handled it above // s = m_readSlots [ i /*fd*/ ]; // if ( s && s->m_niceness <= 0 ) continue; // callCallbacks_ass (true/*forReading?*/,i, g_now); // } // // only call callbacks for fds that have a nice of 0 here // for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // if ( ! FD_ISSET ( i , &writefds ) ) continue; // // if niceness is 0, we already handled it above // s = m_writeSlots [ i /*fd*/ ]; // if ( s && s->m_niceness <= 0 ) continue; // callCallbacks_ass (false/*forReading?*/,i, g_now); // } // handle returned threads for all other nicenesses g_threads.timedCleanUp(-4,MAX_NICENESS); // 4 ms // set time g_now = gettimeofdayInMilliseconds(); // call sleepers if they need it // call this every (about) 1 second int32_t elapsed = g_now - s_lastTime; // if someone changed the system clock on us, this could be negative // so fix it! otherwise, times may NEVER get called in our lifetime if ( elapsed < 0 ) elapsed = m_minTick; if ( elapsed >= m_minTick ) { // MAX_NUM_FDS is the fd for sleep callbacks callCallbacks_ass ( true , MAX_NUM_FDS , g_now ); // note the last time we called them s_lastTime = g_now; // handle returned threads for all other nicenesses g_threads.timedCleanUp(-4,MAX_NICENESS); // 4 ms } // debug msg if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Exited doPoll."); } // for FileState class #include "BigFile.h" // call this when you don't want to be interrupted void Loop::interruptsOff ( ) { // . debug // . until we have our own malloc, don't turn them on if ( ! g_isHot ) return; // bail in already in a sig handler if ( g_inSigHandler ) return; // if interrupts already off bail if ( ! g_interruptsOn ) return; // looks like sigprocmask is destructive on our sigset sigset_t rtmin; sigemptyset ( &rtmin ); // tmp debug hack, so we don't have real time signals now... // #ifndef _VALGRIND_ // sigaddset ( &rtmin, GB_SIGRTMIN ); // #endif // block it if ( sigprocmask ( SIG_BLOCK , &rtmin, 0 ) < 0 ) { log("loop: interruptsOff: sigprocmask: %s.", strerror(errno)); return; } g_interruptsOn = false; // debug msg //log("interruptsOff"); } // and this to resume being interrupted void Loop::interruptsOn ( ) { // . debug // . until we have our own malloc, don't turn them on if ( ! g_isHot ) return; // bail in already in a sig handler if ( g_inSigHandler ) return; // if interrupts already on bail if ( g_interruptsOn ) return; // looks like sigprocmask is destructive on our sigset sigset_t rtmin; sigemptyset ( &rtmin ); // uncomment this next line to easily disable real time interrupts // #ifndef _VALGRIND_ // sigaddset ( &rtmin, GB_SIGRTMIN ); // #endif // debug msg //log("interruptsOn"); // let everyone know before we are vulnerable to an interrupt g_interruptsOn = true; // unblock it so interrupts flow if ( sigprocmask ( SIG_UNBLOCK, &rtmin, 0 ) < 0 ) { log("loop: interruptsOn: sigprocmask: %s.", strerror(errno)); return; } } /* // handle hot real time signals here void sigHandlerRT ( int x , siginfo_t *info , void *v ) { // if we're not hot, what are we doing here? if ( ! g_isHot ) { fprintf(stderr,"SHIT\n"); exit(-1); } // bitch if we shouldn't have gotten this // fprintf(stderr,"Hey! why are we here?\n"); if ( ! g_interruptsOn ) { log(LOG_LOGIC,"loop: Interrupts not on. Bad engineer."); return; } //fprintf (stderr,"in rt handler\n"); // let everyone know it // MDW: turn this off for now, how is it getting set? we dont use // real time signals any more. maybe a pthread is getting such // a signal? //g_inSigHandler = true; // debug msg //if ( g_conf.m_timingDebugEnabled ) // log("sigHandlerRT entered"); // save errno int32_t old_gerrno = g_errno; int old_errno = errno; // call normal handler sigHandler_r ( x , info , v ); // restore errno = old_errno; g_errno = old_gerrno; // debug msg //if ( g_conf.m_timingDebugEnabled ) // log("sigHandlerRT exited"); // revert g_inSigHandler = false; // debug msg //fprintf (stderr,"out of rt handler\n"); } */ /* // come here when we get a GB_SIGRTMIN+X signal void sigHandler_r ( int x , siginfo_t *info , void *v ) { // cygwin lacks the si_fd and si_band members #ifdef CYGWIN g_loop.doPoll(); #else // extract the file descriptor that needs attention int fd = info->si_fd; // debug note //if ( fd == g_httpServer.m_tcp.m_sock ) // logf(LOG_DEBUG,"loop: got http server activity"); // print signal number for debugging purposes (should be SIGPOLL/IO) // debug msg //fprintf(stderr,"got sigcode=%i\n",info->si_code ); //fprintf(stderr,"got signo =%i\n",info->si_signo); //if ( info->si_signo == SIGCHLD ) // fprintf(stderr,"got SIGCHLD\n"); //if ( info->si_code == SIGCHLD ) // fprintf(stderr,"got SIGCHLD2\n"); //fprintf(stderr,"got sigval =%i\n",(int)(info->si_value.sival_int) ); // clear g_errno before callling handlers g_errno = 0; // info->si_band values: //#define POLLIN 0x0001 // There is data to read //#define POLLPRI 0x0002 // There is urgent data to read //#define POLLOUT 0x0004 // Writing now will not block //#define POLLERR 0x0008 // Error condition //#define POLLHUP 0x0010 // Hung up //#define POLLNVAL 0x0020 // Invalid request: fd not open int band = info->si_band; // fprintf(stderr,"got fd = %i\n", fd ); // fprintf(stderr,"got band = %i\n", band ); // fprintf(stderr,"band & POLLIN = %i\n", band & POLLIN ); // fprintf(stderr,"band & POLLPRI = %i\n", band & POLLPRI ); // fprintf(stderr,"band & POLLOUT = %i\n", band & POLLOUT ); // fprintf(stderr,"band & POLLERR = %i\n", band & POLLERR ); // fprintf(stderr,"band & POLLHUP = %i\n", band & POLLHUP ); // translate SIGPIPE's to band of POLLHUP if ( info->si_signo == SIGPIPE ) { band = POLLHUP; log("loop: Received SIGPIPE signal. Broken pipe."); } // . SI_QUEUE signals used to just be from BigFile // . but now they're used by threads so we can call a callback // when the thread is completed if ( g_someAreQueued && ! g_inSigHandler ) { g_someAreQueued = false; //g_udpServer2.makeCallbacks_ass ( 0 ); //g_udpServer2.makeCallbacks_ass ( 1 ); } if ( g_threads.m_needsCleanup && ! g_inSigHandler ) { // assume not any more //g_threads.m_needsCleanup = false; // get the value //int val = (int)(info->si_value.sival_int); // debug msg //if ( g_conf.m_logDebugThreadEnabled ) // log("Loop: got thread done signal"); // check thread queue for any threads that completed // so we can call their callbacks and remove them g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms // // g_threads.cleanUp ( (ThreadEntry *)val , x);// max niceness // g_threads.cleanUp ( (ThreadEntry *)val , 1000);//max niceness // // launch any threads in waiting since this sig was // // from a terminating one // g_threads.launchThreads(); } // if we just needed to cleanup a thread if ( info->si_signo == SIGCHLD ) return; // if we don't got a signal for an fd, just a sigqueue() call, bail now if ( info->si_code == SI_QUEUE ) return; // . call the appropriate handler(s) // . TODO: bitch if no callback to handle the read!!!!!!! // . NOTE: when it's connected it sets both POLLIN and POLLOUT // . NOTE: or when a socket is trying to connect to it if it's listener //if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) ) // g_loop.callCallbacks_ass ( true , fd ); // for reading if ( band & POLLIN ) { // keep stats on this now since some linuxes dont work right g_stats.m_readSignals++; //log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd); g_loop.callCallbacks_ass ( true , fd ); } else if ( band & POLLPRI ) { // keep stats on this now since some linuxes dont work right g_stats.m_readSignals++; //log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd); g_loop.callCallbacks_ass ( true , fd ) ; } else if ( band & POLLOUT ) { // keep stats on this now since some linuxes dont work right g_stats.m_writeSignals++; //log("Loop: write %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd) g_loop.callCallbacks_ass ( false , fd ); } // fix qainject1() test with this else if ( band & POLLERR ) { log(LOG_INFO,"loop: got POLLERR on fd=%i.",fd); } //g_loop.callCallbacks_ass ( false , fd ); // this happens if the socket closes abruptly // or out of band data, etc... see "man 2 poll" for more info else if ( band & POLLHUP ) { // i see these all the time for fd == 0, so don't print it if ( fd != 0 ) log(LOG_INFO,"loop: Received hangup on fd=%i.",fd); g_errno = ESOCKETCLOSED; g_loop.callCallbacks_ass ( false , fd ); } // end ifdef CYGWIN #endif } */ /* #if 1 || (LINUX_VERSION_CODE < KERNEL_VERSION(2,3,31)) struct pollfd pfd; printf ("Trying fallback poll()\n"); pfd.fd = info.si_fd; pfd.events = POLLIN|POLLOUT|POLLHUP; if (poll (&pfd, 1, 0) < 0) { p_error ("poll(): %s", strerror (g_errno)); exit (1); } info.si_band = pfd.revents; #endif */ void Loop::startBlockedCpuTimer() { if(m_inQuickPoll) return; m_lastPollTime = gettimeofdayInMilliseconds(); g_profiler.resetLastQpoll(); } void Loop::quickPoll(int32_t niceness, const char* caller, int32_t lineno) { if ( ! g_conf.m_useQuickpoll ) return; // convert //if ( niceness > 1 ) niceness = 1; // sometimes we init HashTableX's with a niceness of 0 even though // g_niceness is 1. so prevent a core below. if ( niceness == 0 ) return; // sanity check if ( g_niceness > niceness ) { log(LOG_WARN,"loop: niceness mismatch!"); //char *xx=NULL;*xx=0; } } // sanity -- temporary -- no quickpoll in a thread!!! //if ( g_threads.amThread() ) { char *xx=NULL;*xx=0; } // if we are niceness 1 and not in a handler, make it niceness 2 // so the handlers can be answered and we don't slow other // spiders down and we don't slow turks' injections down as much if ( ! g_inHandler && niceness == 1 ) niceness = 2; // reset this g_missedQuickPolls = 0; if(m_inQuickPoll) { log(LOG_WARN, "admin: tried to quickpoll from inside quickpoll"); // this happens when handleRequest3f is called from // a quickpoll and it deletes a collection and BigFile::close // calls ThreadQueue::removeThreads and Msg3::doneScanning() // has niceness 2 and calls quickpoll again! return; //if(g_conf.m_quickpollCoreOnError) { char*xx=NULL;*xx=0; // } // else return; } int64_t now = g_now; //int64_t took = now - m_lastPollTime; int64_t now2 = g_now; int32_t gerrno = g_errno; /* if(g_conf.m_profilingEnabled){ now = gettimeofdayInMilliseconds(); took = now - m_lastPollTime; g_profiler.pause(caller, lineno, took); if(took > g_conf.m_minProfThreshold) { if(g_conf.m_dynamicPerfGraph) { g_stats.addStat_r ( 0 , m_lastPollTime, now, 0 , STAT_GENERIC, caller); } if(g_conf.m_sequentialProfiling) { log(LOG_TIMING, "admin: quickpolling from %s " "after %"INT64" ms", caller, took); } } } */ g_numQuickPolls++; m_inQuickPoll = true; // doPoll() will since we are in quickpoll and only call niceness 0 // callbacks for all the fds. and it will set the timer to 0. doPoll (); /* //g_udpServer2.process_ass ( g_now , 0 ); g_udpServer.process_ass ( g_now , 0 ); g_threads.timedCleanUp( 100 , 0 ); // ms ms, niceness 0 int32_t n; fd_set readfds; fd_set writefds; fd_set exceptfds; // clear fds for select() FD_ZERO ( &readfds ); FD_ZERO ( &writefds ); FD_ZERO ( &exceptfds ); timeval v; v.tv_sec = 0; v.tv_usec = 0; // set descriptors we should watch for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { if ( m_readSlots [i] && m_readSlots[i]->m_niceness == 0 ) { FD_SET ( i , &readfds ); FD_SET ( i , &exceptfds ); } if ( m_writeSlots[i] && m_writeSlots[i]->m_niceness == 0 ) { FD_SET ( i , &writefds ); FD_SET ( i , &exceptfds ); } } // poll the fd's searching for socket closes // this is for httpServer, since we handled // udpserver and diskthreads above n = select (MAX_NUM_FDS, &readfds, &writefds, &exceptfds, &v); // a Slot ptr Slot *s; if ( n <= 0 ) goto theend; for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { // continue if not set for reading if ( ! FD_ISSET ( i , &readfds ) ) continue; // if niceness is not -1, handle it below s = m_readSlots [ i ]; // i = fd // now we have niceness 2 if Sections.cpp if ( s && s->m_niceness >= niceness ) continue; callCallbacks_ass (true,i, now); // reading = true // sanity check if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; } } for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) { if ( ! FD_ISSET ( i , &writefds ) ) continue; // if niceness is not -1, handle it below s = m_writeSlots [ i ]; // i = fd // now we have niceness 2 if Sections.cpp if ( s && s->m_niceness >= niceness ) continue; callCallbacks_ass (false,i, now); // forReading = false // sanity check if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; } } // now we can have niceness 0 dns slots because of the niceness // conversion algorithm... g_dns.m_udpServer.sendPoll_ass(true,g_now); g_dns.m_udpServer.process_ass ( g_now ); g_dns.m_udpServer.makeCallbacks_ass(0); theend: */ // reset this again g_missedQuickPolls = 0; // . avoid quickpolls within a quickpoll // . was causing seg fault in diskHeartbeatWrapper() // which call Threads::bailOnReads() m_canQuickPoll = false; // . call sleepcallbacks, like the heartbeat in Process.cpp // . MAX_NUM_FDS is the fd for sleep callbacks // . specify a niceness of 0 so only niceness 0 sleep callbacks // will be called callCallbacks_ass ( true , MAX_NUM_FDS , now , 0 ); // sanity check if ( g_niceness > niceness ) { log("loop: niceness mismatch"); //char*xx=NULL;*xx=0; } } /* now2 = g_now; if(g_conf.m_profilingEnabled) { took = now2 - now; g_profiler.unpause(); } */ //log(LOG_WARN, "xx quickpolled took %"INT64", waited %"INT64" from %s", // now2 - now, now - m_lastPollTime, caller); m_lastPollTime = now2; m_inQuickPoll = false; m_needsToQuickPoll = false; m_canQuickPoll = true; g_errno = gerrno; // reset this again g_missedQuickPolls = 0; } void Loop::canQuickPoll(int32_t niceness) { if(niceness && !m_shutdown) m_canQuickPoll = true; else m_canQuickPoll = false; } void Loop::disableTimer() { //logf(LOG_WARN,"xxx disabling"); m_canQuickPoll = false; setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL); setitimer(ITIMER_REAL, &m_noInterrupt, NULL); } int gbsystem(char *cmd ) { // if ( ! g_conf.m_runAsDaemon ) // setitimer(ITIMER_REAL, &g_loop.m_noInterrupt, NULL); g_loop.disableTimer(); log("gb: running system(\"%s\")",cmd); int ret = system(cmd); g_loop.enableTimer(); // if ( ! g_conf.m_runAsDaemon ) // setitimer(ITIMER_REAL, &g_loop.m_realInterrupt, NULL); return ret; } void Loop::enableTimer() { m_canQuickPoll = true; // logf(LOG_WARN, "xxx enabling"); setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL); setitimer(ITIMER_REAL, &m_realInterrupt, NULL); } //calling with a 0 niceness will turn off the timer interrupt // void Loop::setitimerInterval(int32_t niceness) { // if(niceness) { // setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL); // m_canQuickPoll = true; // } // else { // setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL); // m_canQuickPoll = false; // } // }