mirror of
https://github.com/gigablast/open-source-search-engine.git
synced 2024-10-05 04:37:39 +03:00
4e803210ee
lots of core fixes. took out ppthtml powerpoint convert, it hangs. dynamic rdbmap to save memory per coll. fixed disk page cache logic and brought it back.
1974 lines
62 KiB
C++
1974 lines
62 KiB
C++
#include "gb-include.h"
|
|
|
|
#include "Loop.h"
|
|
#include "Threads.h" // g_threads.launchThreads()
|
|
#include "UdpServer.h" // g_udpServer2.makeCallbacks()
|
|
#include "HttpServer.h" // g_httpServer.m_tcp.m_numQueued
|
|
#include "Profiler.h"
|
|
#include "Process.h"
|
|
#include "PageParser.h"
|
|
#include "Threads.h"
|
|
|
|
#include "Stats.h"
|
|
// raised from 5000 to 10000 because we have more UdpSlots now and Multicast
|
|
// will call g_loop.registerSleepCallback() if it fails to get a UdpSlot to
|
|
// send on.
|
|
#define MAX_SLOTS 10000
|
|
|
|
//#define _POLLONLY_
|
|
|
|
// TODO: . if signal queue overflows another signal is sent
|
|
// . capture that signal and use poll or something???
|
|
|
|
// Tricky Gotchas:
|
|
// TODO: if an event happens on a TCP fd/socket before we fully accept it
|
|
// we should just register it then call the read callback in case
|
|
// we just missed a ready for reading signal!!!!!
|
|
// TODO: signals can be gotten off the queue after we've closed an fd
|
|
// in which case the handler should be removed from Loop's registry
|
|
// BEFORE being closed... so the handler will be NULL... ???
|
|
// NOTE: keep in mind that the signals might be delayed or be really fast!
|
|
|
|
// TODO: don't mask signals, catch them as they arrive? (like in phhttpd)
|
|
|
|
// . set this to false to disable async signal handling
|
|
// . that will make our udp servers less responsive
|
|
bool g_isHot = true;
|
|
|
|
// extern this for all to use
|
|
bool g_inSigHandler = false ;
|
|
|
|
// so we know if interrupts are supposed to be enabled/disabled
|
|
bool g_interruptsOn = false;
|
|
|
|
// are some signals to call g_udpServer2.makeCallbacks() queued?
|
|
bool g_someAreQueued = false;
|
|
|
|
long g_numAlarms = 0;
|
|
long g_numQuickPolls = 0;
|
|
long g_missedQuickPolls = 0;
|
|
|
|
// since we can't call gettimeofday() while in a sig handler, we use this
|
|
// and update it periodically to keep it somewhat accurate
|
|
long long g_now = 0;
|
|
//long long g_nowGlobal = 0;
|
|
long long g_nowApprox = 0;
|
|
|
|
char g_inWaitState = false;
|
|
|
|
// a global class extern'd in .h file
|
|
Loop g_loop;
|
|
|
|
// the global niceness
|
|
char g_niceness = 0;
|
|
|
|
// we make sure the same callback/handler is not hogging the cpu when it is
|
|
// niceness 0 and we do not interrupt it, so this is a critical check
|
|
class UdpSlot *g_callSlot = NULL;
|
|
long g_lastTransId = 0;
|
|
long g_transIdCount = 0;
|
|
|
|
// keep the sig wait time static so we can change it based on m_minTick
|
|
static struct timespec s_sigWaitTime ;
|
|
static struct timespec s_sigWaitTime2 ;
|
|
static struct timespec* s_sigWaitTimePtr ;
|
|
|
|
// use this in case we unregister the "next" callback
|
|
static Slot *s_callbacksNext;
|
|
|
|
// this is defined in main.cpp
|
|
//extern bool mainShutdown ( bool urgent );
|
|
|
|
// set it from milliseconds
|
|
void Loop::setSigWaitTime ( long ms ) {
|
|
long secs = ms / 1000;
|
|
ms -= secs * 1000;
|
|
s_sigWaitTime.tv_sec = secs;
|
|
s_sigWaitTime.tv_nsec = ms * 1000000;
|
|
}
|
|
|
|
// free up all our mem
|
|
void Loop::reset() {
|
|
if ( m_slots ) {
|
|
log(LOG_DEBUG,"db: resetting loop");
|
|
mfree ( m_slots , MAX_SLOTS * sizeof(Slot) , "Loop" );
|
|
}
|
|
m_slots = NULL;
|
|
/*
|
|
for ( long i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
|
|
Slot *s = m_readSlots [ i ];
|
|
while ( s ) {
|
|
Slot *next = s->m_next;
|
|
mfree ( s , sizeof(Slot) ,"Loop" );
|
|
s = next;
|
|
}
|
|
m_readSlots [ i ] = NULL;
|
|
s = m_writeSlots [ i ];
|
|
while ( s ) {
|
|
Slot *next = s->m_next;
|
|
mfree ( s , sizeof(Slot) , "Loop" );
|
|
s = next;
|
|
}
|
|
m_writeSlots [ i ] = NULL;
|
|
}
|
|
*/
|
|
}
|
|
|
|
static void sigHandler_r ( int x , siginfo_t *info , void *v ) ;
|
|
static void sigHandlerRT ( int x , siginfo_t *info , void *v ) ;
|
|
static void sigbadHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sigioHandler ( int x , siginfo_t *info , void *y ) ;
|
|
static void sigalrmHandler( int x , siginfo_t *info , void *y ) ;
|
|
|
|
void Loop::unregisterReadCallback ( int fd, void *state ,
|
|
void (* callback)(int fd,void *state),
|
|
bool silent ){
|
|
if ( fd < 0 ) return;
|
|
// from reading
|
|
unregisterCallback ( m_readSlots , fd , state , callback ,
|
|
silent );
|
|
}
|
|
|
|
void Loop::unregisterWriteCallback ( int fd, void *state ,
|
|
void (* callback)(int fd,void *state)){
|
|
// from writing
|
|
unregisterCallback ( m_writeSlots , fd , state , callback );
|
|
}
|
|
|
|
void Loop::unregisterSleepCallback ( void *state ,
|
|
void (* callback)(int fd,void *state)){
|
|
unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback);
|
|
}
|
|
|
|
void Loop::unregisterCallback ( Slot **slots , int fd , void *state ,
|
|
void (* callback)(int fd,void *state) ,
|
|
bool silent ) {
|
|
// bad fd
|
|
if ( fd < 0 ) {log(LOG_LOGIC,
|
|
"loop: fd to unregister is negative.");return;}
|
|
// set a flag if we found it
|
|
bool found = false;
|
|
// slots is m_readSlots OR m_writeSlots
|
|
Slot *s = slots [ fd ];
|
|
Slot *lastSlot = NULL;
|
|
// . keep track of new min tick for sleep callbacks
|
|
// . sleep a min of 40ms so g_now is somewhat up to date
|
|
long min = 40; // 0x7fffffff;
|
|
long lastMin = min;
|
|
// chain through all callbacks registerd with this fd
|
|
while ( s ) {
|
|
// get the next slot (NULL if no more)
|
|
Slot *next = s->m_next;
|
|
// if we're unregistering a sleep callback
|
|
// we might have to recalculate m_minTick
|
|
if ( s->m_tick < min ) { lastMin = min; min = s->m_tick; }
|
|
// skip this slot if callbacks don't match
|
|
if ( s->m_callback != callback ) { lastSlot = s; goto skip; }
|
|
// skip this slot if states don't match
|
|
if ( s->m_state != state ) { lastSlot = s; goto skip; }
|
|
// free this slot since it callback matches "callback"
|
|
//mfree ( s , sizeof(Slot) , "Loop" );
|
|
returnSlot ( s );
|
|
found = true;
|
|
// debug msg
|
|
//log("Loop::unregistered fd=%li state=%lu", fd, (long)state );
|
|
// revert back to old min if this is the Slot we're removing
|
|
min = lastMin;
|
|
// excise the previous slot from linked list
|
|
if ( lastSlot ) lastSlot->m_next = next;
|
|
else slots[fd] = next;
|
|
// watch out if we're in the previous callback, we need to
|
|
// fix the linked list in callCallbacks_ass
|
|
if ( s_callbacksNext == s ) s_callbacksNext = next;
|
|
skip:
|
|
// advance to the next slot
|
|
s = next;
|
|
}
|
|
// set our new minTick if we were unregistering a sleep callback
|
|
if ( fd == MAX_NUM_FDS ) {
|
|
m_minTick = min;
|
|
// . set s_sigWaitTime to m_minTick
|
|
// . 1 billion nanoseconds = 1 second
|
|
// . m_minTick is in milliseconds, 1000 ms in a second
|
|
// . multiply m_minTick in ms by 1 million to get nano
|
|
setSigWaitTime ( m_minTick );
|
|
}
|
|
|
|
// return now if found
|
|
if ( found ) return;
|
|
// . otherwise, bitch if we're not silent
|
|
// . HttpServer.cpp always calls this even if it did not register its
|
|
// File's fd just to make sure.
|
|
if ( silent ) return;
|
|
log(LOG_LOGIC,
|
|
"loop: unregisterCallback: callback not found (fd=%i).",fd);
|
|
}
|
|
|
|
bool Loop::registerReadCallback ( int fd,
|
|
void *state,
|
|
void (* callback)(int fd,void *state ) ,
|
|
long niceness ) {
|
|
// the "true" answers the question "for reading?"
|
|
if ( addSlot ( true, fd, state, callback, niceness ) ) return true;
|
|
return log("loop: Unable to register read callback.");
|
|
}
|
|
|
|
|
|
bool Loop::registerWriteCallback ( int fd,
|
|
void *state,
|
|
void (* callback)(int fd, void *state ) ,
|
|
long niceness ) {
|
|
// the "false" answers the question "for reading?"
|
|
if ( addSlot ( false, fd, state, callback, niceness ) )return true;
|
|
return log("loop: Unable to register write callback.");
|
|
}
|
|
|
|
// tick is in milliseconds
|
|
bool Loop::registerSleepCallback ( long tick ,
|
|
void *state,
|
|
void (* callback)(int fd,void *state ) ,
|
|
long niceness ) {
|
|
if ( ! addSlot ( true, MAX_NUM_FDS, state, callback , niceness ,tick) )
|
|
return log("loop: Unable to register sleep callback");
|
|
if ( tick < m_minTick ) m_minTick = tick;
|
|
// wait this long in the sig wait loop
|
|
setSigWaitTime ( m_minTick );
|
|
return true;
|
|
}
|
|
|
|
// . returns false and sets g_errno on error
|
|
bool Loop::addSlot ( bool forReading , int fd, void *state,
|
|
void (* callback)(int fd, void *state), long niceness ,
|
|
long tick ) {
|
|
|
|
// ensure fd is >= 0
|
|
if ( fd < 0 ) {
|
|
g_errno = EBADENGINEER;
|
|
return log(LOG_LOGIC,"loop: fd to register is negative.");
|
|
}
|
|
// sanity
|
|
if ( fd > MAX_NUM_FDS ) {
|
|
log("loop: bad fd of %li",(long)fd);
|
|
char *xx=NULL;*xx=0;
|
|
}
|
|
// . ensure fd not already registered with this callback/state
|
|
// . prevent dups so you can keep calling register w/o fear
|
|
Slot *s;
|
|
if ( forReading ) s = m_readSlots [ fd ];
|
|
else s = m_writeSlots [ fd ];
|
|
while ( s ) {
|
|
if ( s->m_callback == callback &&
|
|
s->m_state == state ) {
|
|
// don't set g_errno for this anymore, just bitch
|
|
//g_errno = EBADENGINEER;
|
|
log(LOG_LOGIC,"loop: fd %i is already registered.",fd);
|
|
return true;
|
|
}
|
|
s = s->m_next;
|
|
}
|
|
// . make a new slot
|
|
// . TODO: implement mprimealloc() to pre-alloc slots for us for speed
|
|
//s = (Slot *) mmalloc ( sizeof(Slot ) ,"Loop");
|
|
s = getEmptySlot ( );
|
|
if ( ! s ) return false;
|
|
// for pointing to slot already in position for fd
|
|
Slot *next ;
|
|
// store ourselves in the slot for this fd
|
|
if ( forReading ) {
|
|
next = m_readSlots [ fd ];
|
|
m_readSlots [ fd ] = s;
|
|
// fd == MAX_NUM_FDS if it's a sleep callback
|
|
//if ( fd < MAX_NUM_FDS ) {
|
|
//FD_SET ( fd , &m_readfds );
|
|
//FD_SET ( fd , &m_exceptfds );
|
|
//}
|
|
}
|
|
else {
|
|
next = m_writeSlots [ fd ];
|
|
m_writeSlots [ fd ] = s;
|
|
//FD_SET ( fd , &m_writefds );
|
|
}
|
|
// set our callback and state
|
|
s->m_callback = callback;
|
|
s->m_state = state;
|
|
// point to the guy that was registered for fd before us
|
|
s->m_next = next;
|
|
// save our niceness for doPoll()
|
|
s->m_niceness = niceness;
|
|
// store the tick for sleep wrappers (should be max for others)
|
|
s->m_tick = tick;
|
|
// and the last called time for sleep wrappers only really
|
|
if ( fd == MAX_NUM_FDS ) s->m_lastCall = gettimeofdayInMilliseconds();
|
|
// debug msg
|
|
//log("Loop::registered fd=%i state=%lu",fd,state);
|
|
// if fd == MAX_NUM_FDS if it's a sleep callback
|
|
if ( fd == MAX_NUM_FDS ) return true;
|
|
// watch out for big bogus fds used for thread exit callbacks
|
|
if ( fd > MAX_NUM_FDS ) return true;
|
|
// set fd non-blocking
|
|
return setNonBlocking ( fd , niceness ) ;
|
|
}
|
|
|
|
// . now make sure we're listening for an interrupt on this fd
|
|
// . set it non-blocing and enable signal catching for it
|
|
// . listen for an interrupt for this fd
|
|
bool Loop::setNonBlocking ( int fd , long niceness ) {
|
|
retry:
|
|
int flags = fcntl ( fd , F_GETFL ) ;
|
|
if ( flags < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry;
|
|
g_errno = errno;
|
|
return log("loop: fcntl(F_GETFL): %s.",strerror(errno));
|
|
}
|
|
retry9:
|
|
if ( fcntl ( fd, F_SETFL, flags|O_NONBLOCK|O_ASYNC) < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry9;
|
|
g_errno = errno;
|
|
return log("loop: fcntl(NONBLOCK): %s.",strerror(errno));
|
|
}
|
|
retry8:
|
|
// tell kernel to send the signal to us when fd is ready for read/write
|
|
if ( fcntl (fd, F_SETOWN , getpid() ) < 0 ) {
|
|
g_errno = errno;
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry8;
|
|
return log("loop: fcntl(F_SETOWN): %s.",strerror(errno));
|
|
}
|
|
|
|
// . tell kernel what signal we'd like to recieve when this happens
|
|
// . additional signal info (including fd) should be available
|
|
#ifdef _POLLONLY_
|
|
return true;
|
|
#endif
|
|
// truncate nicess cuz we only get GB_SIGRTMIN+1 to GB_SIGRTMIN+2 signals
|
|
if ( niceness < -1 ) niceness = -1;
|
|
if ( niceness > MAX_NICENESS ) niceness = MAX_NICENESS;
|
|
// debug msg
|
|
//log("fd on niceness = %li sig = %li",niceness,GB_SIGRTMIN +1+niceness);
|
|
retry6:
|
|
// . tell kernel to send this signal when fd is ready for read/write
|
|
// . reserve GB_SIGRTMIN for unmaskable interrupts (niceness = -1)
|
|
// as used by high priority udp server, g_udpServer2
|
|
if ( fcntl (fd, F_SETSIG , GB_SIGRTMIN/*32?*/ + 1 + niceness ) < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto retry6;
|
|
g_errno = errno;
|
|
return log("loop: fcntl(F_SETSIG): %s.",strerror(errno));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// . if "forReading" is true call callbacks registered for reading on "fd"
|
|
// . if "forReading" is false call callbacks registered for writing on "fd"
|
|
// . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks
|
|
void Loop::callCallbacks_ass ( bool forReading , int fd , long long now ,
|
|
long niceness ) {
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass start");
|
|
//if ( fd != 1024 ) {
|
|
//if (forReading) fprintf(stderr,"got read sig on fd=%li\n",(long)fd);
|
|
//else fprintf(stderr,"got write sig on fd=%li\n",(long)fd);
|
|
//}
|
|
// save the g_errno to send to all callbacks
|
|
int saved_errno = g_errno;
|
|
// get the first Slot in the chain that is waiting on this fd
|
|
Slot *s ;
|
|
if ( forReading ) s = m_readSlots [ fd ];
|
|
else s = m_writeSlots [ fd ];
|
|
// ensure we called something
|
|
long numCalled = 0;
|
|
|
|
// a hack fix
|
|
if ( niceness == -1 && m_inQuickPoll ) niceness = 0;
|
|
|
|
// . now call all the callbacks
|
|
// . most will re-register themselves (i.e. call registerCallback...()
|
|
//long long startTime = gettimeofdayInMilliseconds();
|
|
while ( s ) {
|
|
// skip this slot if he has no callback
|
|
if ( ! s->m_callback ) continue;
|
|
// NOTE: callback can unregister fd for Slot s, so get next
|
|
//Slot *next = s->m_next;
|
|
s_callbacksNext = s->m_next;
|
|
// watch out if clock was set back
|
|
if ( s->m_lastCall > now ) s->m_lastCall = now;
|
|
// if we're a sleep callback, check to make sure not premature
|
|
if ( fd == MAX_NUM_FDS && s->m_lastCall + s->m_tick > now ) {
|
|
s = s_callbacksNext; continue; }
|
|
// skip if not a niceness match
|
|
if ( niceness == 0 && s->m_niceness != 0 ) {
|
|
s = s_callbacksNext; continue; }
|
|
// update the lastCall timestamp for this slot
|
|
if ( fd == MAX_NUM_FDS ) s->m_lastCall = now;
|
|
// . debug msg
|
|
// . this is called a lot cuz we process all dgrams/whatever
|
|
// in one clump so there's a lot of redundant signals
|
|
//if ( g_conf.m_logDebugUdp && fd != 1024 )
|
|
// log("Loop::callCallbacks_ass: for fd=%li state=%lu",
|
|
// fd,(long)s->m_state);
|
|
// do the callback
|
|
//long address = 0;
|
|
// unsigned long long profilerStart,profilerEnd;
|
|
// unsigned long long statStart, statEnd;
|
|
/*
|
|
if(g_conf.m_profilingEnabled){
|
|
address=(long)s->m_callback;
|
|
g_profiler.startTimer(address,
|
|
__PRETTY_FUNCTION__);
|
|
//profilerStart=gettimeofdayInMillisecondsLocal();
|
|
//statStart = gettimeofdayInMilliseconds();
|
|
}
|
|
*/
|
|
//startBlockedCpuTimer();
|
|
|
|
// log it now
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: enter fd callback fd=%li "
|
|
"nice=%li",(long)fd,(long)s->m_niceness);
|
|
|
|
// sanity check. -1 no longer supported
|
|
if ( s->m_niceness < 0 ) { char *xx=NULL;*xx=0; }
|
|
|
|
// save it
|
|
long saved = g_niceness;
|
|
// set the niceness
|
|
g_niceness = s->m_niceness;
|
|
// make sure not 2
|
|
if ( g_niceness >= 2 ) g_niceness = 1;
|
|
|
|
// sanity check -- need to be able to quickpoll!
|
|
//if ( s->m_niceness > 0 && ! g_loop.m_canQuickPoll ) {
|
|
// char *xx=NULL;*xx=0; }
|
|
|
|
s->m_callback ( fd , s->m_state );
|
|
|
|
// restore it
|
|
g_niceness = saved;
|
|
|
|
// log it now
|
|
if ( g_conf.m_logDebugLoop )
|
|
log(LOG_DEBUG,"loop: exit fd callback fd=%li "
|
|
"nice=%li", (long)fd,(long)s->m_niceness);
|
|
|
|
/*
|
|
if(g_conf.m_profilingEnabled){
|
|
//profilerEnd =gettimeofdayInMillisecondsLocal();
|
|
if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__))
|
|
log(LOG_WARN,"admin: Couldn't add the fn %li",
|
|
(long)address);
|
|
}
|
|
*/
|
|
// . debug msg
|
|
// . this is called a lot cuz we process all dgrams/whatever
|
|
// in one clump so there's a lot of redundant signals
|
|
//if ( g_conf.m_logDebugUdp && fd != 1024 )
|
|
// log("Loop::callCallbacks_ass: back");
|
|
// inc the flag
|
|
numCalled++;
|
|
// reset g_errno so all callbacks for this fd get same g_errno
|
|
g_errno = saved_errno;
|
|
// get the next n (will be -1 if no slot after it)
|
|
s = s_callbacksNext;
|
|
}
|
|
s_callbacksNext = NULL;
|
|
// long long now2 = gettimeofdayInMilliseconds();
|
|
// long long took = now2 - startTime;
|
|
|
|
// if(g_conf.m_profilingEnabled && took > 10) {
|
|
// g_stats.addStat_r ( 0 ,
|
|
// startTime,
|
|
// now2,
|
|
// 0 ,
|
|
// STAT_GENERIC,
|
|
// __PRETTY_FUNCTION__,__LINE__);
|
|
|
|
|
|
// if(g_conf.m_sequentialProfiling) {
|
|
// log(LOG_TIMING,
|
|
// "admin: loop time to do %li callbacks: %lli ms",
|
|
// numCalled,took);
|
|
// }
|
|
// }
|
|
|
|
// log an error if nothing called
|
|
//if ( ! called ) log("Loop::callCallbacks: no callback for signal");
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass end");
|
|
}
|
|
|
|
Loop::Loop ( ) {
|
|
// . default sig wait time to 10 ms (10,000,000 nanoseconds)
|
|
// . 1 billion nanoseconds = 1 second
|
|
setSigWaitTime ( 1000 /*ms*/ );
|
|
|
|
s_sigWaitTime2.tv_sec = 0;
|
|
s_sigWaitTime2.tv_nsec = 0;
|
|
s_sigWaitTimePtr = &s_sigWaitTime;
|
|
|
|
m_inQuickPoll = false;
|
|
m_needsToQuickPoll = false;
|
|
m_canQuickPoll = false;
|
|
|
|
|
|
// set all callbacks to NULL so we know they're empty
|
|
for ( long i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
|
|
m_readSlots [i] = NULL;
|
|
m_writeSlots[i] = NULL;
|
|
}
|
|
// the extra sleep slots
|
|
//m_readSlots [ MAX_NUM_FDS ] = NULL;
|
|
m_slots = NULL;
|
|
}
|
|
|
|
// free all slots from addSlots
|
|
Loop::~Loop ( ) {
|
|
reset();
|
|
}
|
|
|
|
// returns NULL and sets g_errno if none are left
|
|
Slot *Loop::getEmptySlot ( ) {
|
|
Slot *s = m_head;
|
|
if ( ! s ) {
|
|
g_errno = EBUFTOOSMALL;
|
|
log("loop: No empty slots available. "
|
|
"Increase #define MAX_SLOTS.");
|
|
return NULL;
|
|
}
|
|
m_head = s->m_nextAvail;
|
|
return s;
|
|
}
|
|
|
|
void Loop::returnSlot ( Slot *s ) {
|
|
s->m_nextAvail = m_head;
|
|
m_head = s;
|
|
}
|
|
|
|
bool Loop::init ( ) {
|
|
// redhat 9's NPTL doesn't like our async signals
|
|
if ( ! g_conf.m_allowAsyncSignals ) g_isHot = false;
|
|
#ifdef _VALGRIND_
|
|
g_isHot = false;
|
|
#endif
|
|
// sighupHandler() will set this to true so we know when to shutdown
|
|
m_shutdown = 0;
|
|
// . reset this cuz we have no sleep callbacks right now
|
|
// . sleep a min of 40ms so g_now is somewhat up to date
|
|
m_minTick = 40; //0x7fffffff;
|
|
// reset the need to poll flag
|
|
m_needToPoll = false;
|
|
// let 'em know if we're hot
|
|
if ( g_isHot ) log ( LOG_INIT , "loop: Using asynchronous signals "
|
|
"for udp server.");
|
|
// make slots
|
|
m_slots = (Slot *) mmalloc ( MAX_SLOTS * (long)sizeof(Slot) , "Loop" );
|
|
if ( ! m_slots ) return false;
|
|
// log it
|
|
log(LOG_DEBUG,"loop: Allocated %li bytes for %li callbacks.",
|
|
MAX_SLOTS * (long)sizeof(Slot),(long)MAX_SLOTS);
|
|
// init link list ptr
|
|
for ( long i = 0 ; i < MAX_SLOTS - 1 ; i++ ) {
|
|
m_slots[i].m_nextAvail = &m_slots[i+1];
|
|
}
|
|
m_slots[MAX_SLOTS - 1].m_nextAvail = NULL;
|
|
m_head = &m_slots[0];
|
|
m_tail = &m_slots[MAX_SLOTS - 1];
|
|
// an innocent log msg
|
|
//log ( 0 , "Loop: starting the i/o loop");
|
|
// . when using threads GB_SIGRTMIN becomes 35, not 32 anymore
|
|
// since threads use these signals to reactivate suspended threads
|
|
// . debug msg
|
|
//log("GB_SIGRTMIN=%li", GB_SIGRTMIN );
|
|
// . block the GB_SIGRTMIN signal
|
|
// . anytime this is raised it goes onto the signal queue
|
|
// . we use sigtimedwait() to get signals off the queue
|
|
// . sigtimedwait() selects the lowest signo first for handling
|
|
// . therefore, GB_SIGRTMIN is higher priority than (GB_SIGRTMIN + 1)
|
|
//sigfillset ( &sigs );
|
|
// set of signals to block
|
|
sigset_t sigs;
|
|
sigemptyset ( &sigs );
|
|
sigaddset ( &sigs , SIGPIPE ); //if we write to a close socket
|
|
#ifndef _VALGRIND_
|
|
sigaddset ( &sigs , GB_SIGRTMIN );
|
|
#endif
|
|
sigaddset ( &sigs , GB_SIGRTMIN + 1 );
|
|
sigaddset ( &sigs , GB_SIGRTMIN + 2 );
|
|
sigaddset ( &sigs , GB_SIGRTMIN + 3 );
|
|
sigaddset ( &sigs , SIGCHLD );
|
|
|
|
#ifdef PTHREADS
|
|
// now since we took out SIGIO... (see below)
|
|
// we should ignore this signal so it doesn't suddenly stop the gb
|
|
// process since we took out the SIGIO handler because newer kernels
|
|
// were throwing SIGIO signals ALL the time, on every datagram
|
|
// send/receive it seemed and bogged us down.
|
|
sigaddset ( &sigs , SIGIO );
|
|
#endif
|
|
// . block on any signals in this set (in addition to current sigs)
|
|
// . use SIG_UNBLOCK to remove signals from block list
|
|
// . this returns -1 and sets g_errno on error
|
|
if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) {
|
|
g_errno = errno;
|
|
return log("loop: sigprocmask: %s.",strerror(g_errno));
|
|
}
|
|
// . we turn this signal on/off to turn interrupts off/on
|
|
// . clear all signals from the set
|
|
//sigemptyset ( &m_sigrtmin );
|
|
// tmp debug hack, so we don't have real time signals now...
|
|
//sigaddset ( &m_sigrtmin, GB_SIGRTMIN );
|
|
// now set up a signal handler to handle just/only SIGIO
|
|
struct sigaction sa;
|
|
// . sa_mask is the set of signals that should be blocked when
|
|
// we're handling the GB_SIGRTMIN, make this empty
|
|
// . GB_SIGRTMIN signals will be automatically blocked while we're
|
|
// handling a SIGIO signal, so don't worry about that
|
|
sigemptyset (&sa.sa_mask);
|
|
// . these flags determine the signal handling process
|
|
// . SA_SIGINFO means to use sa.sa_sigaction() as the sig handler
|
|
// and not sa_.sa_handler() (added in Linux 2.1.86)
|
|
// . this allows us to get the siginfo_t structure to get the fd
|
|
// that generated the signal
|
|
// . SA_ONESHOT restores the handler to the default state when
|
|
// our sig handler is called so we don't get interuppted by another
|
|
// signal when we're handling that one
|
|
// . incoming GB_SIGRTMIN sigs should be queued (sigtimedwait())
|
|
sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
|
|
// the handler for unblocked signals, same as other signals really
|
|
sa.sa_sigaction = sigHandlerRT;
|
|
// clear g_errno
|
|
g_errno = 0;
|
|
// now when we got an unblocked GB_SIGRTMIN signal go here right away
|
|
#ifndef _VALGRIND_
|
|
if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
|
|
#endif
|
|
|
|
// set it this way for SIGIO's
|
|
sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT;
|
|
// . define the actual routine that handles the SIGIO signal
|
|
// . void (*sa_sigaction)(int, siginfo_t *, void *);
|
|
sa.sa_sigaction = sigioHandler;
|
|
// . register our sigHandler() to handle the GB_SIGRTMIN signal
|
|
// . when a file/socket is made non-blocking it should have done a:
|
|
// fcntl(fd,SET_SIG,GB_SIGRTMIN) so we're notified with that signal
|
|
// . this returns -1 and sets g_errno on error
|
|
// . TODO: is this the SOURCE signal or the altered signal?
|
|
|
|
#ifndef PTHREADS
|
|
// i think this was supposed to be sent when the signal queue was
|
|
// overflowing so we needed to do a poll operation when we got this
|
|
// signal, however, newer kernel seems to throw this signal all the
|
|
// time when it just gets IO causing cpu to be 100% floored!
|
|
// i'm afraid without this code we might miss data on a socket
|
|
// or not read it promptly, but let's see how it goes.
|
|
if ( sigaction ( SIGIO, &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGIO: %s.", mstrerror(errno));
|
|
#endif
|
|
|
|
|
|
// handle HUP signals gracefully by saving and shutting down
|
|
sa.sa_sigaction = sighupHandler;
|
|
if ( sigaction ( SIGHUP , &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGHUP: %s.", mstrerror(errno));
|
|
if ( sigaction ( SIGTERM, &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGTERM: %s.", mstrerror(errno));
|
|
|
|
// we should save our data on segv, sigill, sigfpe, sigbus
|
|
sa.sa_sigaction = sigbadHandler;
|
|
if ( sigaction ( SIGSEGV, &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGSEGV: %s.", mstrerror(errno));
|
|
if ( sigaction ( SIGILL , &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGILL: %s.", mstrerror(errno));
|
|
if ( sigaction ( SIGFPE , &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGFPE: %s.", mstrerror(errno));
|
|
if ( sigaction ( SIGBUS , &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
|
|
|
|
|
|
// if the UPS is about to go off it sends a SIGPWR
|
|
sa.sa_sigaction = sigpwrHandler;
|
|
if ( sigaction ( SIGPWR, &sa, 0 ) < 0 ) g_errno = errno;
|
|
|
|
|
|
//now set up our alarm for quickpoll
|
|
m_quickInterrupt.it_value.tv_sec = 0;
|
|
m_quickInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000;
|
|
m_quickInterrupt.it_interval.tv_sec = 0;
|
|
m_quickInterrupt.it_interval.tv_usec = QUICKPOLL_INTERVAL * 1000;
|
|
m_noInterrupt.it_value.tv_sec = 0;
|
|
m_noInterrupt.it_value.tv_usec = 0;
|
|
m_noInterrupt.it_interval.tv_sec = 0;
|
|
m_noInterrupt.it_interval.tv_usec = 0;
|
|
|
|
// set the interrupts to off for now
|
|
disableTimer();
|
|
|
|
//setitimer(ITIMER_REAL, &m_quickInterrupt, NULL);
|
|
//setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
|
|
|
|
sa.sa_sigaction = sigalrmHandler;
|
|
//if ( sigaction ( SIGALRM, &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( sigaction ( SIGVTALRM, &sa, 0 ) < 0 ) g_errno = errno;
|
|
if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
|
|
|
|
|
|
if ( g_errno ) return log("loop: sigaction: %s.", mstrerror(errno));
|
|
|
|
// success
|
|
return true;
|
|
}
|
|
|
|
// TODO: if we get a segfault while saving, what then?
|
|
void sigpwrHandler ( int x , siginfo_t *info , void *y ) {
|
|
// let main process know to shutdown
|
|
g_loop.m_shutdown = 3;
|
|
}
|
|
|
|
// TODO: if we get a segfault while saving, what then?
|
|
void sigbadHandler ( int x , siginfo_t *info , void *y ) {
|
|
|
|
// thread should set it errno to 0x7fffffff which means that
|
|
// Threads.cpp should not look for its ThreadEntry::m_isDone flag
|
|
// to be set before calling waitpid() on it
|
|
if ( g_threads.amThread() ) errno = 0x7fffffff;
|
|
|
|
log("loop: sigbadhandler. disabling handler from recall.");
|
|
// . don't allow this handler to be called again
|
|
// . does this work if we're in a thread?
|
|
struct sigaction sa;
|
|
sigemptyset (&sa.sa_mask);
|
|
sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
|
|
sa.sa_sigaction = NULL;
|
|
sigaction ( SIGSEGV, &sa, 0 ) ;
|
|
sigaction ( SIGILL , &sa, 0 ) ;
|
|
sigaction ( SIGFPE , &sa, 0 ) ;
|
|
sigaction ( SIGBUS , &sa, 0 ) ;
|
|
sigaction ( SIGALRM, &sa, 0 ) ;
|
|
// if we've already been here, or don't need to be, then bail
|
|
if ( g_loop.m_shutdown ) {
|
|
log("loop: sigbadhandler. shutdown already called.");
|
|
return;
|
|
}
|
|
// if we're a thread, let main process know to shutdown
|
|
g_loop.m_shutdown = 2;
|
|
log("loop: sigbadhandler. trying to save now. mode=%li",
|
|
(long)g_process.m_mode);
|
|
// . this will save all Rdb's
|
|
// . if "urgent" is true it will dump core
|
|
// . if "urgent" is true it won't broadcast its shutdown to all hosts
|
|
//#ifndef NO_MAIN
|
|
// mainShutdown ( true ); // urgent?
|
|
//#endif
|
|
g_process.shutdown ( true );
|
|
}
|
|
|
|
|
|
void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
|
|
|
|
// see if a niceness 0 algo is hogging the cpu
|
|
if ( g_callSlot && g_niceness == 0 ) {
|
|
// are we handling the same request or callback?
|
|
if ( g_callSlot->m_transId == g_lastTransId ) g_transIdCount++;
|
|
else g_transIdCount=1;
|
|
// set it
|
|
g_lastTransId = g_callSlot->m_transId;
|
|
// sanity check
|
|
//if ( g_transIdCount >= 10 ) { char *xx=NULL;*xx=0; }
|
|
bool logIt = false;
|
|
if ( g_transIdCount >= 4 ) logIt = true;
|
|
// do not spam for msg99 handler so much
|
|
if ( g_callSlot->m_msgType == 0x99 && g_transIdCount != 50 )
|
|
logIt = false;
|
|
// it's not safe to call fprintf() even with
|
|
// mutex locks for sig handlers with pthreads
|
|
// going on!!!
|
|
#ifdef PTHREADS
|
|
logIt = false;
|
|
#endif
|
|
// panic if hogging
|
|
if ( logIt ) {
|
|
if ( g_callSlot->m_callback )
|
|
log("loop: msg type 0x%hhx reply callback "
|
|
"hogging cpu for %li ticks",
|
|
g_callSlot->m_msgType,
|
|
g_transIdCount);
|
|
else
|
|
log("loop: msg type 0x%hhx handler "
|
|
"hogging cpu for %li ticks",
|
|
g_callSlot->m_msgType,
|
|
g_transIdCount);
|
|
}
|
|
}
|
|
|
|
g_nowApprox += QUICKPOLL_INTERVAL; // 10 ms
|
|
// stats
|
|
g_numAlarms++;
|
|
|
|
// sanity check
|
|
if ( g_loop.m_inQuickPoll &&
|
|
g_niceness != 0 &&
|
|
// seems to happen a lot when doing a qa test because we slow
|
|
// things down a lot when that happens
|
|
! g_conf.m_testParserEnabled &&
|
|
! g_conf.m_testSpiderEnabled &&
|
|
! g_conf.m_testSearchEnabled &&
|
|
// likewise if doing a page parser test...
|
|
! g_inPageParser &&
|
|
! g_inPageInject ) {
|
|
#ifndef PTHREADS
|
|
// i guess sometimes niceness 1 things call niceness 0 things?
|
|
log("loop: crap crap crap!!!");
|
|
#endif
|
|
//char *xx=NULL;*xx=0; }
|
|
}
|
|
// basically ignore this alarm if already in a quickpoll
|
|
if ( g_loop.m_inQuickPoll ) return;
|
|
|
|
if ( ! g_conf.m_useQuickpoll ) return;
|
|
|
|
g_loop.m_needsToQuickPoll = true;
|
|
|
|
//fprintf(stderr,"missed=%li\n",g_missedQuickPolls);
|
|
|
|
// another missed quickpoll
|
|
if ( g_niceness == 1 ) g_missedQuickPolls++;
|
|
// reset if niceness is 0
|
|
else if ( g_niceness == 0 ) g_missedQuickPolls = 0;
|
|
|
|
// if we missed to many, then dump core
|
|
if ( g_niceness == 1 && g_missedQuickPolls >= 4 ) {
|
|
//g_inSigHandler = true;
|
|
// NOT SAFE for pthreads cuz we're in sig handler
|
|
#ifndef PTHREADS
|
|
log("loop: missed quickpoll");
|
|
#endif
|
|
//g_inSigHandler = false;
|
|
// seems to core a lot in gbcompress() we need to
|
|
// put a quickpoll into zlib deflate() or
|
|
// deflat_slot() or logest_match() function
|
|
// for now do not dump core --- re-enable this later
|
|
// mdw TODO
|
|
//char *xx=NULL;*xx=0;
|
|
}
|
|
|
|
// . see where we are in the code
|
|
// . for computing cpu usage
|
|
// . if idling we will be in sigtimedwait() at the lowest level
|
|
Host *h = g_hostdb.m_myHost;
|
|
// . i guess this means we were doing something... (otherwise idle)
|
|
// . this is KINDA like a 100 point sample, but it has crazy decay
|
|
// logic built into it
|
|
if (h) {
|
|
if ( ! g_inWaitState )
|
|
h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 100;
|
|
else
|
|
h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
|
|
}
|
|
|
|
// if it has been a while since heartbeat (> 10000ms) dump core so
|
|
// we can see where the process was... that is a missed quick poll?
|
|
if ( g_process.m_lastHeartbeatApprox == 0 ) return;
|
|
if ( g_conf.m_maxHeartbeatDelay <= 0 ) return;
|
|
if ( g_nowApprox - g_process.m_lastHeartbeatApprox >
|
|
g_conf.m_maxHeartbeatDelay ) {
|
|
#ifndef PTHREADS
|
|
logf(LOG_DEBUG,"gb: CPU seems blocked. Forcing core.");
|
|
#endif
|
|
//char *xx=NULL; *xx=0;
|
|
}
|
|
|
|
//logf(LOG_DEBUG, "xxx now: %lli! approx: %lli", g_now, g_nowApprox);
|
|
}
|
|
|
|
|
|
// shit, we can't make this realtime!! RdbClose() cannot be called by a
|
|
// real time sig handler
|
|
void sighupHandler ( int x , siginfo_t *info , void *y ) {
|
|
// let main process know to shutdown
|
|
g_loop.m_shutdown = 1;
|
|
}
|
|
|
|
// . keep a timestamp for the last time we called the sleep callbacks
|
|
// . we have to call those every 1 second
|
|
long long s_lastTime = 0;
|
|
|
|
bool Loop::runLoop ( ) {
|
|
#ifndef _POLLONLY_
|
|
// set of signals to watch for
|
|
sigset_t sigs0;
|
|
//sigset_t sigs1;
|
|
// clear all signals from the set
|
|
sigemptyset ( &sigs0 );
|
|
//sigemptyset ( &sigs1 );
|
|
// . set sigs on which sigtimedwait() listens for
|
|
// . add this signal to our set of signals to watch (currently NONE)
|
|
sigaddset ( &sigs0, SIGPIPE );
|
|
#ifndef _VALGRIND_
|
|
sigaddset ( &sigs0, GB_SIGRTMIN );
|
|
#endif
|
|
sigaddset ( &sigs0, GB_SIGRTMIN + 1 );
|
|
sigaddset ( &sigs0, GB_SIGRTMIN + 2 );
|
|
sigaddset ( &sigs0, GB_SIGRTMIN + 3 );
|
|
sigaddset ( &sigs0, SIGCHLD );
|
|
//sigaddset ( &sigs0, SIGVTALRM );
|
|
// . TODO: do we need to mask SIGIO too? (sig queue overflow?)
|
|
// . i would think so, because what if we tried to queue an important
|
|
// handler to be called in the high priority UdpServer but the queue
|
|
// was full? Then we would finish processing the signals on the queue
|
|
// before we would address the excluded high priority signals by
|
|
// calling doPoll()
|
|
sigaddset ( &sigs0, SIGIO );
|
|
// . set up a time to block waiting for signals to be 1/2 a second
|
|
// . 1 billion nanoseconds = 1 second
|
|
//struct timespec t = { 0 /*seconds*/, 500000000 /*nanoseconds*/};
|
|
//struct timespec t = { 0 /*seconds*/, 10000000 /*nanoseconds*/};
|
|
|
|
// grab any high priority sig first
|
|
siginfo_t info ;
|
|
long sigNum ; //= sigwaitinfo ( &sigs1, &info );
|
|
#endif
|
|
s_lastTime = 0;
|
|
|
|
// . allow us to be interrupted
|
|
// . UNBLOCKs GB_SIGRTMIN
|
|
// . makes g_udpServer2 quite jumpy
|
|
g_loop.interruptsOn();
|
|
|
|
enableTimer();
|
|
|
|
long long elapsed;
|
|
|
|
// . now loop forever waiting for signals
|
|
// . but every second check for timer-based events
|
|
do {
|
|
|
|
g_now = gettimeofdayInMilliseconds();
|
|
|
|
//set the time back to its exact value and reset
|
|
//the timer.
|
|
g_nowApprox = g_now;
|
|
// MDW: won't this hog cpu? just don't disable it in
|
|
// Process::save2() any more and it should be ok
|
|
//enableTimer();
|
|
m_lastPollTime = g_now;
|
|
m_needsToQuickPoll = false;
|
|
|
|
|
|
/*
|
|
// test the heartbeat core...
|
|
if ( g_numAlarms > 100 ) {
|
|
goo:
|
|
long j;
|
|
for ( long k = 0 ; k < 2000000000 ; k++ ) {
|
|
j=k *5;
|
|
}
|
|
goto goo;
|
|
}
|
|
*/
|
|
|
|
g_errno = 0;
|
|
|
|
if ( m_shutdown ) {
|
|
// a msg
|
|
if (m_shutdown==1)
|
|
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
|
|
else if (m_shutdown==2)
|
|
log(LOG_INIT,"loop: got SIGBAD in thread.");
|
|
else
|
|
log(LOG_INIT,"loop: got SIGPWR.");
|
|
// . turn off interrupts here because it doesn't help to do
|
|
// it in the thread
|
|
// . TODO: turn off signals for sigbadhandler()
|
|
interruptsOff();
|
|
// if thread got the signal, just wait for him to save all
|
|
// Rdbs and then dump core
|
|
if ( m_shutdown == 2 ) {
|
|
//log (0,"Thread is saving and shutting down urgently.");
|
|
//while ( 1 == 1 ) sleep (50000);
|
|
log("loop: Resuming despite thread crash.");
|
|
m_shutdown = 0;
|
|
continue;
|
|
}
|
|
// otherwise, thread did not save, so we must do it
|
|
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
|
|
// . this will save all Rdb's and dump core
|
|
// . since "urgent" is true it won't broadcast its shutdown
|
|
// to all hosts
|
|
//#ifndef NO_MAIN
|
|
//mainShutdown( true ); // urgent?
|
|
//#endif
|
|
g_process.shutdown ( true );
|
|
}
|
|
|
|
//g_udpServer2.sendPoll_ass(true,g_now);
|
|
//g_udpServer2.process_ass ( g_now );
|
|
// MDW: see if this works without this junk, if not then
|
|
// put it back in
|
|
g_udpServer.sendPoll_ass (true,g_now);
|
|
g_udpServer.process_ass ( g_now );
|
|
// and dns now too
|
|
g_dns.m_udpServer.sendPoll_ass(true,g_now);
|
|
g_dns.m_udpServer.process_ass ( g_now );
|
|
|
|
// if there was a high niceness http request within a
|
|
// quickpoll, we stored it and now we'll call it here.
|
|
//g_httpServer.callQueuedPages();
|
|
|
|
//g_udpServer.printState ( );
|
|
|
|
if ( g_someAreQueued ) {
|
|
// assume none are queued now, we may get interrupted
|
|
// and it may get set back to true
|
|
g_someAreQueued = false;
|
|
//g_udpServer2.makeCallbacks_ass ( 0 );
|
|
//g_udpServer2.makeCallbacks_ass ( 1 );
|
|
}
|
|
|
|
// if ( g_threads.m_needsCleanup ) {
|
|
// // bitch about
|
|
// static bool s_bitched = false;
|
|
// if ( ! s_bitched ) {
|
|
// log(LOG_REMIND,"loop: Lost thread signal.");
|
|
// s_bitched = true;
|
|
// }
|
|
|
|
|
|
// }
|
|
//cleanup and launch threads:
|
|
//g_threads.printState();
|
|
g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms
|
|
|
|
// do it anyway
|
|
doPoll();
|
|
|
|
while ( m_needToPoll ) doPoll();
|
|
|
|
long elapsed = g_now - s_lastTime;
|
|
// if someone changed the system clock on us, this could be negative
|
|
// so fix it! otherwise, times may NEVER get called in our lifetime
|
|
if ( elapsed < 0 ) elapsed = m_minTick;
|
|
// call this every (about) 1 second
|
|
if ( elapsed >= m_minTick ) {
|
|
// MAX_NUM_FDS is the fd for sleep callbacks
|
|
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
|
|
// note the last time we called them
|
|
//g_now = gettimeofdayInMilliseconds();
|
|
s_lastTime = g_now;
|
|
}
|
|
|
|
#ifndef _POLLONLY_
|
|
|
|
// hack
|
|
//char buffer[100];
|
|
//if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) {
|
|
// logf(LOG_DEBUG,"CLOSED CLOSED!!");
|
|
//}
|
|
//g_errno = 0;
|
|
|
|
//check for pending signals, return right away if none.
|
|
//then we'll do the low priority stuff while we were
|
|
//supposed to be sleeping.
|
|
g_inWaitState = true;
|
|
sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
|
|
|
|
// if no signal, we just waited 20 ms and nothing happened
|
|
if ( sigNum == -1 )
|
|
sigalrmHandler( 0,&info,NULL);
|
|
//logf(LOG_DEBUG,"loop: sigNum=%li signo=%li alrm=%li",
|
|
// (long)sigNum,info.si_signo,(long)SIGVTALRM);
|
|
// no longer in a wait state...
|
|
g_inWaitState = false;
|
|
|
|
if ( sigNum < 0 ) {
|
|
if ( errno == EAGAIN || errno == EINTR ||
|
|
errno == EILSEQ || errno == 0 ) {
|
|
sigNum = 0;
|
|
errno = 0;
|
|
}
|
|
else if ( errno != ENOMEM ) {
|
|
log("loop: sigtimedwait(): %s.",strerror(errno));
|
|
continue;
|
|
}
|
|
}
|
|
if ( sigNum == 0 ) {
|
|
//no signals pending, try to take care of anything left undone:
|
|
|
|
long long startTime = gettimeofdayInMillisecondsLocal();
|
|
if(g_now & 1) {
|
|
if(g_udpServer.needBottom())
|
|
g_udpServer.makeCallbacks_ass ( 2 );
|
|
//if(g_udpServer2.needBottom())
|
|
// g_udpServer2.makeCallbacks_ass ( 2 );
|
|
|
|
if(gettimeofdayInMillisecondsLocal() -
|
|
startTime > 10)
|
|
goto notime;
|
|
|
|
if(g_conf.m_sequentialProfiling)
|
|
g_threads.printState();
|
|
if(g_threads.m_needsCleanup)
|
|
g_threads.timedCleanUp(4 , // ms
|
|
MAX_NICENESS);
|
|
}
|
|
else {
|
|
if(g_conf.m_sequentialProfiling)
|
|
g_threads.printState();
|
|
if(g_threads.m_needsCleanup)
|
|
g_threads.timedCleanUp(4 , // ms
|
|
MAX_NICENESS);
|
|
|
|
if(gettimeofdayInMillisecondsLocal() -
|
|
startTime > 10)
|
|
goto notime;
|
|
|
|
if(g_udpServer.needBottom())
|
|
g_udpServer.makeCallbacks_ass ( 2 );
|
|
//if(g_udpServer2.needBottom())
|
|
// g_udpServer2.makeCallbacks_ass ( 2 );
|
|
}
|
|
|
|
notime:
|
|
//if we still didn't get all of them cleaned up set
|
|
//sleep time to none.
|
|
if(g_udpServer.needBottom() ) {
|
|
//g_udpServer2.needBottom()) {
|
|
s_sigWaitTimePtr = &s_sigWaitTime2;
|
|
}
|
|
else {
|
|
//otherwise set it to minTick
|
|
s_sigWaitTimePtr = &s_sigWaitTime;
|
|
}
|
|
}
|
|
else {
|
|
if ( info.si_code == SIGIO ) {
|
|
log("loop: got sigio");
|
|
m_needToPoll = true;
|
|
}
|
|
// handle the signal
|
|
else sigHandler_r ( 0 , &info , NULL );
|
|
}
|
|
#endif
|
|
} while (1);
|
|
|
|
|
|
|
|
loop:
|
|
g_now = gettimeofdayInMilliseconds();
|
|
g_errno = 0;
|
|
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugUdp ) log("Loop: top of loop");
|
|
// shutdown if we got a critical signal
|
|
if ( m_shutdown ) {
|
|
// a msg
|
|
if ( m_shutdown == 1 )
|
|
log("loop: got SIGHUP or SIGTERM.");
|
|
else if ( m_shutdown == 2 )
|
|
log("loop: got SIGBAD in thread.");
|
|
else
|
|
log("loop: got SIGPWR.");
|
|
// . turn off interrupts here because it doesn't help to do
|
|
// it in the thread
|
|
// . TODO: turn off signals for sigbadhandler()
|
|
interruptsOff();
|
|
// if thread got the signal, just wait for him to save all
|
|
// Rdbs and then dump core
|
|
if ( m_shutdown == 2 ) {
|
|
log("loop: Cored in thread.");
|
|
//log ("Thread is saving and shutting down urgently.");
|
|
//while ( 1 == 1 ) sleep (50000);
|
|
//log("loop: Resuming despite thread crash.");
|
|
//m_shutdown = 0;
|
|
//goto resume;
|
|
}
|
|
// otherwise, thread did not save, so we must do it
|
|
log ( "loop: Saving and shutting down urgently.");
|
|
// sleep forver now so we can call findPtr() function
|
|
// after calling g_mem.printBreeches(0)
|
|
log("loop: sleeping forever.");
|
|
sleep(1000000);
|
|
// . this will save all Rdb's and dump core
|
|
// . since "urgent" is true it won't broadcast its shutdown
|
|
// to all hosts
|
|
//#ifndef NO_MAIN
|
|
//mainShutdown( true ); // urgent?
|
|
//#endif
|
|
// urgent = true
|
|
g_process.shutdown ( true );
|
|
}
|
|
// resume:
|
|
// . get the time now, sigHandlerRT() can use this, cuz gettimeofday()
|
|
// ain't async signal safe
|
|
// . g_now only used in hot udpServer for doing time outs really
|
|
g_now = gettimeofdayInMilliseconds();
|
|
// clear any g_errno before possibly calling sendPoll_ass()
|
|
g_errno = 0;
|
|
// occasionaly call to sendto() will not send a dgram and since we
|
|
// don't count on receiving ready-to-write signals on our
|
|
// UdpServer's fds we check them here... it sucks, but hopefully it
|
|
// fixes the problem of requests not getting fully transmitted
|
|
// and stagnating in the UdpServer.
|
|
//if (g_udpServer2.m_needToSend) g_udpServer2.sendPoll_ass(true,g_now);
|
|
//if (g_udpServer.m_needToSend ) g_udpServer.sendPoll_ass (true,g_now);
|
|
// . well, sender may be choking even with m_needsToSend var
|
|
// . i bet we're just losing signals
|
|
// . TODO: do we lose them when in the handler? if so, send a signal
|
|
// so loop will read/write after coming out of the async handler
|
|
//g_udpServer2.sendPoll_ass(true,g_now);
|
|
// and also read just in case, too
|
|
//g_udpServer2.process_ass ( g_now );
|
|
// and the low priority guy
|
|
g_udpServer.sendPoll_ass (true,g_now);
|
|
g_udpServer.process_ass ( g_now );
|
|
// and dns now too
|
|
g_dns.m_udpServer.sendPoll_ass(true,g_now);
|
|
g_dns.m_udpServer.process_ass ( g_now );
|
|
// tcp server contained in http server has the same problem
|
|
//if ( g_httpServer.m_tcp.m_numQueued > 0 )
|
|
// g_httpServer.m_tcp.writeSocketsInQueue();
|
|
|
|
// . likewise, we may have not got the SIGQUEUE signal from when
|
|
// UdpServer wanted to call a callback but couldn't because it was
|
|
// in an async signal handler, and then at SIGQUEUE sig got lost...
|
|
// . this should clear those completed transactions we sometimes
|
|
// see in the UdpServer socket table
|
|
if ( g_someAreQueued ) {
|
|
// assume none are queued now, we may get interrupted
|
|
// and it may get set back to true
|
|
g_someAreQueued = false;
|
|
//g_udpServer2.makeCallbacks_ass ( 0 );
|
|
//g_udpServer2.makeCallbacks_ass ( 1 );
|
|
}
|
|
// clean up threads in case signal got lost somehow
|
|
if ( g_threads.m_needsCleanup ) {
|
|
// bitch about
|
|
static bool s_bitched = false;
|
|
if ( ! s_bitched ) {
|
|
log(LOG_REMIND,"loop: Lost thread signal.");
|
|
s_bitched = true;
|
|
}
|
|
// assume not any more
|
|
g_threads.m_needsCleanup = false;
|
|
// check thread queue for any threads that completed
|
|
// so we can call their callbacks and remove them
|
|
g_threads.cleanUp ( 0 , 1000/*max niceness*/);
|
|
// launch any threads in waiting since this sig was
|
|
// from a terminating one
|
|
g_threads.launchThreads();
|
|
}
|
|
// clear any g_errno before calling sleep callbacks
|
|
g_errno = 0;
|
|
// get time difference
|
|
elapsed = g_now - s_lastTime;
|
|
// if someone changed the system clock on us, this could be negative
|
|
// so fix it! otherwise, times may NEVER get called in our lifetime
|
|
if ( elapsed < 0 ) elapsed = m_minTick;
|
|
// print log msgs we accumulated while in a signal handler
|
|
//if ( g_log.needsPrinting() ) g_log.printBuf();
|
|
// . poll if we need to
|
|
// . make it a while loop incase a hot sig handler resets m_needToPoll
|
|
// . CAUTION: WE CAN LOOP IN HERE FOR ~ A MINUTE! I've seen it happen
|
|
// when RdbDump was going...
|
|
while ( m_needToPoll ) doPoll();
|
|
// call this every (about) 1 second
|
|
if ( elapsed >= m_minTick ) {
|
|
// MAX_NUM_FDS is the fd for sleep callbacks
|
|
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
|
|
// note the last time we called them
|
|
s_lastTime = g_now;
|
|
}
|
|
// just do polls if this linux doesn't support this:
|
|
#ifdef _POLLONLY_
|
|
doPoll();
|
|
#endif
|
|
#ifndef _POLLONLY_
|
|
// cancel silly errors
|
|
//if ( g_errno == EAGAIN ) { sigNum = 0; g_errno = 0; }
|
|
//if ( g_errno == EINTR ) { sigNum = 0; g_errno = 0; }
|
|
// if sigNum is valid then handle it
|
|
//sigHandler_r ( 0 , &info , NULL );
|
|
// subloop:
|
|
// debug msg
|
|
|
|
|
|
//if ( g_conf.m_logDebugUdp ) log("Loop: entering sigwait");
|
|
// . this has a timer resolution of 20ms, I imagine due to how the
|
|
// kernel time slices between processes
|
|
// . this means UdpServer can not effectively have a wait between
|
|
// resends of less than 20ms which makes it a little less zippy
|
|
sigNum = sigtimedwait (&sigs0, &info, &s_sigWaitTime ) ;
|
|
// cancel silly errors
|
|
if ( sigNum < 0 && errno == EAGAIN ) {
|
|
sigNum = 0; errno = 0; }
|
|
if ( sigNum < 0 && errno == EINTR ) {
|
|
sigNum = 0; errno = 0; }
|
|
// a zero signal is no signal, just a wake up call
|
|
if ( sigNum == 0 ) goto loop;
|
|
// sigNum is < 0 on error
|
|
if ( sigNum < 0 ) {
|
|
// this error happens on the newer libc for some reason
|
|
// so ignore it
|
|
if ( errno != ENOMEM )
|
|
log("loop: sigtimedwait(): %s.",strerror(errno));
|
|
goto loop;
|
|
}
|
|
// sleep test
|
|
//log("SLEEPING");
|
|
//sleep(10);
|
|
//for ( long long i = 0 ; i < 1000000000000LL ; i++ );
|
|
// . we use g_now in UdpServer.cpp and should make it
|
|
// as accurate as possible
|
|
// . but it's main use is because gettimeofday() is not async sig safe
|
|
g_now = gettimeofdayInMilliseconds();
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugUdp ) log("Loop: processing signal");
|
|
// call sighandler on other queued signals and continue looping
|
|
//log("Loop::runLoop:got queued signum=%li",sigNum);
|
|
// was it a SIGIO?
|
|
if ( info.si_code == SIGIO ) doPoll();
|
|
// handle the signal
|
|
else sigHandler_r ( 0 , &info , NULL );
|
|
#endif
|
|
// don't call any time handlers until no more signals waiting
|
|
//goto subloop;
|
|
// we need to make g_now as accurate as possible for hot UdpServer...
|
|
goto loop;
|
|
// make compiler happy
|
|
return 0;
|
|
}
|
|
|
|
// . the kernel sends a SIGIO signal when the sig queue overflows
|
|
// . we resort to polling the fd's when that happens
|
|
void sigioHandler ( int x , siginfo_t *info , void *y ) {
|
|
// set the m_needToPoll flag
|
|
g_loop.m_needToPoll = true;
|
|
return;
|
|
}
|
|
|
|
//--- TODO: flush the signal queue after polling until done
|
|
//--- are we getting stale signals resolved by flush so we get
|
|
//--- read event on a socket that isnt in read mode???
|
|
// TODO: set signal handler to SIG_DFL to prevent signals from queuing up now
|
|
// . this handles high priority fds first (lowest niceness)
|
|
void Loop::doPoll ( ) {
|
|
// set time
|
|
g_now = gettimeofdayInMilliseconds();
|
|
// debug msg
|
|
//log("**************** GOT SIGIO *************");
|
|
// . turn it off here so it can be turned on again after we've
|
|
// called select() so we don't lose any fd events through the cracks
|
|
// . some callbacks we call make trigger another SIGIO, but if they
|
|
// fail they should set Loop::g_needToPoll to true
|
|
m_needToPoll = false;
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Entered doPoll.");
|
|
log(LOG_DEBUG,"loop: Entered doPoll.");
|
|
// print log
|
|
if ( g_log.needsPrinting() ) g_log.printBuf();
|
|
|
|
// sigqueue() might have been called from a hot udp server and
|
|
// we queued some handlers to be called
|
|
if ( g_someAreQueued ) {
|
|
// assume none are queued now, we may get interrupted
|
|
// and it may get set back to true
|
|
g_someAreQueued = false;
|
|
//g_udpServer2.makeCallbacks_ass ( 0 );
|
|
//g_udpServer2.makeCallbacks_ass ( 1 );
|
|
}
|
|
if(g_udpServer.needBottom()) g_udpServer.makeCallbacks_ass ( 1 );
|
|
//if(g_udpServer2.needBottom()) g_udpServer2.makeCallbacks_ass ( 1 );
|
|
|
|
|
|
bool processedOne;
|
|
long n;
|
|
// long repeats = 0;
|
|
// skipLowerPriorities:
|
|
// descriptor bits for calling select()
|
|
fd_set readfds;
|
|
fd_set writefds;
|
|
fd_set exceptfds;
|
|
// clear fds for select()
|
|
FD_ZERO ( &readfds );
|
|
FD_ZERO ( &writefds );
|
|
FD_ZERO ( &exceptfds );
|
|
timeval v;
|
|
v.tv_sec = 0;
|
|
v.tv_usec = 0;
|
|
// set descriptors we should watch
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
if ( m_readSlots [i] ) {
|
|
FD_SET ( i , &readfds );
|
|
FD_SET ( i , &exceptfds );
|
|
}
|
|
if ( m_writeSlots[i] ) {
|
|
FD_SET ( i , &writefds );
|
|
FD_SET ( i , &exceptfds );
|
|
}
|
|
}
|
|
again:
|
|
// poll the fd's searching for socket closes
|
|
n = select (MAX_NUM_FDS, &readfds, &writefds, &exceptfds, &v);
|
|
if ( n < 0 ) {
|
|
// valgrind
|
|
if ( errno == EINTR ) goto again;
|
|
g_errno = errno;
|
|
log("loop: select: %s.",strerror(g_errno));
|
|
return;
|
|
}
|
|
// debug msg
|
|
if ( g_conf.m_logDebugLoop)
|
|
log(LOG_DEBUG,"loop: Got %li fds waiting.",n);
|
|
// . reset the need to poll flag if everything is caught up now
|
|
// . let's take this out for now ... won't this leave some
|
|
// threads hanging, they do not always generate SIGIO's if
|
|
// the sigqueue is full!! LET'S SEE... mdw
|
|
//if ( n == 0 ) {
|
|
// // deal with any threads before returning
|
|
// g_threads.cleanUp ( NULL , 1000 /*max niceness*/ );
|
|
// g_threads.launchThreads();
|
|
// return;
|
|
//}
|
|
processedOne = false;
|
|
|
|
// a Slot ptr
|
|
Slot *s;
|
|
g_now = gettimeofdayInMilliseconds();
|
|
/*
|
|
// call g_udpServer sig handlers for niceness -1 here
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
// continue if not set for reading
|
|
if ( ! FD_ISSET ( i , &readfds ) ) continue;
|
|
// if niceness is not -1, handle it below
|
|
s = m_readSlots [ i ];
|
|
if ( s && s->m_niceness != -1 ) continue;
|
|
callCallbacks_ass (true,i, g_now); // for reading = true
|
|
processedOne = true;
|
|
}
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
if ( ! FD_ISSET ( i , &writefds ) ) continue;
|
|
// if niceness is not -1, handle it below
|
|
s = m_writeSlots [ i ];
|
|
if ( s && s->m_niceness != -1 ) continue;
|
|
callCallbacks_ass (false,i, g_now); // for reading = false
|
|
processedOne = true;
|
|
}
|
|
*/
|
|
// handle returned threads for niceness -1
|
|
//g_threads.timedCleanUp(2/*ms*/);
|
|
|
|
// . call the callback for each fd we got
|
|
// . only call callbacks for fds that have a nice of 0 here
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
// continue if not set for reading
|
|
if ( ! FD_ISSET ( i , &readfds ) ) continue;
|
|
// if niceness is not 0, handle it below
|
|
s = m_readSlots [ i /*fd*/ ];
|
|
if ( s && s->m_niceness != 0 ) continue;
|
|
callCallbacks_ass (true/*forReading?*/,i, g_now);
|
|
processedOne = true;
|
|
}
|
|
// only call callbacks for fds that have a nice of 0 here
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
if ( ! FD_ISSET ( i , &writefds ) ) continue;
|
|
// if niceness is not 0, handle it below
|
|
s = m_writeSlots [ i /*fd*/ ];
|
|
if ( s && s->m_niceness != 0 ) continue;
|
|
callCallbacks_ass (false/*forReading?*/,i, g_now);
|
|
processedOne = true;
|
|
}
|
|
// handle returned threads for niceness 0
|
|
g_threads.timedCleanUp(3,0); // 3 ms
|
|
|
|
#if 0
|
|
if(processedOne && repeats < QUERYPRIORITYWEIGHT) {
|
|
//m_needToPoll = true;
|
|
repeats++;
|
|
goto skipLowerPriorities;
|
|
}
|
|
|
|
// log(LOG_WARN,
|
|
// "Loop: repeated %li times before moving to lower priority threads",
|
|
// repeats);
|
|
#endif
|
|
|
|
// handle low priority fds here
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
// continue if not set for reading
|
|
if ( ! FD_ISSET ( i , &readfds ) ) continue;
|
|
// if niceness is 0, we already handled it above
|
|
s = m_readSlots [ i /*fd*/ ];
|
|
if ( s && s->m_niceness <= 0 ) continue;
|
|
callCallbacks_ass (true/*forReading?*/,i, g_now);
|
|
}
|
|
// only call callbacks for fds that have a nice of 0 here
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
if ( ! FD_ISSET ( i , &writefds ) ) continue;
|
|
// if niceness is 0, we already handled it above
|
|
s = m_writeSlots [ i /*fd*/ ];
|
|
if ( s && s->m_niceness <= 0 ) continue;
|
|
callCallbacks_ass (false/*forReading?*/,i, g_now);
|
|
}
|
|
// handle returned threads for all other nicenesses
|
|
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
|
|
|
|
// set time
|
|
g_now = gettimeofdayInMilliseconds();
|
|
// call sleepers if they need it
|
|
// call this every (about) 1 second
|
|
{
|
|
long elapsed = g_now - s_lastTime;
|
|
// if someone changed the system clock on us, this could be negative
|
|
// so fix it! otherwise, times may NEVER get called in our lifetime
|
|
if ( elapsed < 0 ) elapsed = m_minTick;
|
|
if ( elapsed >= m_minTick ) {
|
|
// MAX_NUM_FDS is the fd for sleep callbacks
|
|
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
|
|
// note the last time we called them
|
|
s_lastTime = g_now;
|
|
// handle returned threads for all other nicenesses
|
|
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
|
|
}
|
|
}
|
|
// debug msg
|
|
if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Exited doPoll.");
|
|
}
|
|
|
|
// for FileState class
|
|
#include "BigFile.h"
|
|
|
|
// call this when you don't want to be interrupted
|
|
void Loop::interruptsOff ( ) {
|
|
// . debug
|
|
// . until we have our own malloc, don't turn them on
|
|
if ( ! g_isHot ) return;
|
|
// bail in already in a sig handler
|
|
if ( g_inSigHandler ) return;
|
|
// if interrupts already off bail
|
|
if ( ! g_interruptsOn ) return;
|
|
// looks like sigprocmask is destructive on our sigset
|
|
sigset_t rtmin;
|
|
sigemptyset ( &rtmin );
|
|
// tmp debug hack, so we don't have real time signals now...
|
|
#ifndef _VALGRIND_
|
|
sigaddset ( &rtmin, GB_SIGRTMIN );
|
|
#endif
|
|
// block it
|
|
if ( sigprocmask ( SIG_BLOCK , &rtmin, 0 ) < 0 ) {
|
|
log("loop: interruptsOff: sigprocmask: %s.", strerror(errno));
|
|
return;
|
|
}
|
|
g_interruptsOn = false;
|
|
// debug msg
|
|
//log("interruptsOff");
|
|
}
|
|
// and this to resume being interrupted
|
|
void Loop::interruptsOn ( ) {
|
|
// . debug
|
|
// . until we have our own malloc, don't turn them on
|
|
if ( ! g_isHot ) return;
|
|
// bail in already in a sig handler
|
|
if ( g_inSigHandler ) return;
|
|
// if interrupts already on bail
|
|
if ( g_interruptsOn ) return;
|
|
// looks like sigprocmask is destructive on our sigset
|
|
sigset_t rtmin;
|
|
sigemptyset ( &rtmin );
|
|
// uncomment this next line to easily disable real time interrupts
|
|
#ifndef _VALGRIND_
|
|
sigaddset ( &rtmin, GB_SIGRTMIN );
|
|
#endif
|
|
// debug msg
|
|
//log("interruptsOn");
|
|
// let everyone know before we are vulnerable to an interrupt
|
|
g_interruptsOn = true;
|
|
// unblock it so interrupts flow
|
|
if ( sigprocmask ( SIG_UNBLOCK, &rtmin, 0 ) < 0 ) {
|
|
log("loop: interruptsOn: sigprocmask: %s.", strerror(errno));
|
|
return;
|
|
}
|
|
}
|
|
|
|
// handle hot real time signals here
|
|
void sigHandlerRT ( int x , siginfo_t *info , void *v ) {
|
|
// if we're not hot, what are we doing here?
|
|
if ( ! g_isHot ) {
|
|
fprintf(stderr,"SHIT\n");
|
|
exit(-1);
|
|
}
|
|
// bitch if we shouldn't have gotten this
|
|
// fprintf(stderr,"Hey! why are we here?\n");
|
|
if ( ! g_interruptsOn ) {
|
|
log(LOG_LOGIC,"loop: Interrupts not on. Bad engineer.");
|
|
return;
|
|
}
|
|
//fprintf (stderr,"in rt handler\n");
|
|
// let everyone know it
|
|
// MDW: turn this off for now, how is it getting set? we dont use
|
|
// real time signals any more. maybe a pthread is getting such
|
|
// a signal?
|
|
//g_inSigHandler = true;
|
|
// debug msg
|
|
//if ( g_conf.m_timingDebugEnabled )
|
|
// log("sigHandlerRT entered");
|
|
// save errno
|
|
long old_gerrno = g_errno;
|
|
int old_errno = errno;
|
|
// call normal handler
|
|
sigHandler_r ( x , info , v );
|
|
// restore
|
|
errno = old_errno;
|
|
g_errno = old_gerrno;
|
|
// debug msg
|
|
//if ( g_conf.m_timingDebugEnabled )
|
|
// log("sigHandlerRT exited");
|
|
// revert
|
|
g_inSigHandler = false;
|
|
// debug msg
|
|
//fprintf (stderr,"out of rt handler\n");
|
|
}
|
|
|
|
// come here when we get a GB_SIGRTMIN+X signal
|
|
void sigHandler_r ( int x , siginfo_t *info , void *v ) {
|
|
// extract the file descriptor that needs attention
|
|
int fd = info->si_fd;
|
|
// debug note
|
|
//if ( fd == g_httpServer.m_tcp.m_sock )
|
|
// logf(LOG_DEBUG,"loop: got http server activity");
|
|
// print signal number for debugging purposes (should be SIGPOLL/IO)
|
|
// debug msg
|
|
//fprintf(stderr,"got sigcode=%i\n",info->si_code );
|
|
//fprintf(stderr,"got signo =%i\n",info->si_signo);
|
|
//if ( info->si_signo == SIGCHLD )
|
|
// fprintf(stderr,"got SIGCHLD\n");
|
|
//if ( info->si_code == SIGCHLD )
|
|
// fprintf(stderr,"got SIGCHLD2\n");
|
|
//fprintf(stderr,"got sigval =%i\n",(int)(info->si_value.sival_int) );
|
|
// clear g_errno before callling handlers
|
|
g_errno = 0;
|
|
// info->si_band values:
|
|
//#define POLLIN 0x0001 /* There is data to read */
|
|
//#define POLLPRI 0x0002 /* There is urgent data to read */
|
|
//#define POLLOUT 0x0004 /* Writing now will not block */
|
|
//#define POLLERR 0x0008 /* Error condition */
|
|
//#define POLLHUP 0x0010 /* Hung up */
|
|
//#define POLLNVAL 0x0020 /* Invalid request: fd not open */
|
|
int band = info->si_band;
|
|
/*
|
|
fprintf(stderr,"got fd = %i\n", fd );
|
|
fprintf(stderr,"got band = %i\n", band );
|
|
fprintf(stderr,"band & POLLIN = %i\n", band & POLLIN );
|
|
fprintf(stderr,"band & POLLPRI = %i\n", band & POLLPRI );
|
|
fprintf(stderr,"band & POLLOUT = %i\n", band & POLLOUT );
|
|
fprintf(stderr,"band & POLLERR = %i\n", band & POLLERR );
|
|
fprintf(stderr,"band & POLLHUP = %i\n", band & POLLHUP );
|
|
*/
|
|
// translate SIGPIPE's to band of POLLHUP
|
|
if ( info->si_signo == SIGPIPE ) {
|
|
band = POLLHUP;
|
|
log("loop: Received SIGPIPE signal. Broken pipe.");
|
|
}
|
|
// . SI_QUEUE signals used to just be from BigFile
|
|
// . but now they're used by threads so we can call a callback
|
|
// when the thread is completed
|
|
if ( g_someAreQueued && ! g_inSigHandler ) {
|
|
g_someAreQueued = false;
|
|
//g_udpServer2.makeCallbacks_ass ( 0 );
|
|
//g_udpServer2.makeCallbacks_ass ( 1 );
|
|
}
|
|
|
|
if ( g_threads.m_needsCleanup && ! g_inSigHandler ) {
|
|
// assume not any more
|
|
//g_threads.m_needsCleanup = false;
|
|
// get the value
|
|
//int val = (int)(info->si_value.sival_int);
|
|
// debug msg
|
|
//if ( g_conf.m_logDebugThreadEnabled )
|
|
// log("Loop: got thread done signal");
|
|
// check thread queue for any threads that completed
|
|
// so we can call their callbacks and remove them
|
|
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
|
|
|
|
// // g_threads.cleanUp ( (ThreadEntry *)val , x/*max niceness*/);
|
|
// g_threads.cleanUp ( (ThreadEntry *)val , 1000/*max niceness*/);
|
|
|
|
// // launch any threads in waiting since this sig was
|
|
// // from a terminating one
|
|
// g_threads.launchThreads();
|
|
}
|
|
|
|
// if we just needed a cleanup
|
|
if ( info->si_signo == SIGCHLD ) return;
|
|
|
|
// if we don't got a signal for an fd, just a sigqueue() call, bail now
|
|
if ( info->si_code == SI_QUEUE ) return;
|
|
|
|
// . call the appropriate handler(s)
|
|
// . TODO: bitch if no callback to handle the read!!!!!!!
|
|
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
|
|
// . NOTE: or when a socket is trying to connect to it if it's listener
|
|
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
|
|
// g_loop.callCallbacks_ass ( true/*forReading?*/ , fd );
|
|
if ( band & POLLIN ) {
|
|
//log("Loop: read %lli",gettimeofdayInMilliseconds());
|
|
g_loop.callCallbacks_ass ( true , fd );
|
|
}
|
|
else if ( band & POLLPRI ) {
|
|
//log("Loop: read %lli",gettimeofdayInMilliseconds());
|
|
g_loop.callCallbacks_ass ( true , fd ) ;
|
|
}
|
|
else if ( band & POLLOUT ) {
|
|
//log("Loop: read %lli",gettimeofdayInMilliseconds());
|
|
g_loop.callCallbacks_ass ( false , fd );
|
|
}
|
|
//else if ( band & POLLERR )
|
|
//g_loop.callCallbacks_ass ( false , fd );
|
|
// this happens if the socket closes abruptly
|
|
// or out of band data, etc... see "man 2 poll" for more info
|
|
else {
|
|
// i see these all the time for fd == 0, so don't print it
|
|
if ( fd != 0 )
|
|
log(LOG_INFO,"loop: Received hangup on fd=%i.",fd);
|
|
g_errno = ESOCKETCLOSED;
|
|
g_loop.callCallbacks_ass ( false , fd );
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
#if 1 || (LINUX_VERSION_CODE < KERNEL_VERSION(2,3,31))
|
|
struct pollfd pfd;
|
|
printf ("Trying fallback poll()\n");
|
|
pfd.fd = info.si_fd;
|
|
pfd.events = POLLIN|POLLOUT|POLLHUP;
|
|
if (poll (&pfd, 1, 0) < 0) {
|
|
p_error ("poll(): %s", strerror (g_errno));
|
|
exit (1);
|
|
}
|
|
info.si_band = pfd.revents;
|
|
#endif
|
|
*/
|
|
|
|
|
|
void Loop::startBlockedCpuTimer() {
|
|
|
|
if(m_inQuickPoll) return;
|
|
m_lastPollTime = gettimeofdayInMilliseconds();
|
|
g_profiler.resetLastQpoll();
|
|
}
|
|
|
|
|
|
void Loop::quickPoll(long niceness, const char* caller, long lineno) {
|
|
if ( ! g_conf.m_useQuickpoll ) return;
|
|
|
|
// convert
|
|
//if ( niceness > 1 ) niceness = 1;
|
|
|
|
// sometimes we init HashTableX's with a niceness of 0 even though
|
|
// g_niceness is 1. so prevent a core below.
|
|
if ( niceness == 0 ) return;
|
|
|
|
// sanity check
|
|
if ( g_niceness > niceness ) {
|
|
log(LOG_WARN,"loop: niceness mismatch!");
|
|
//char *xx=NULL;*xx=0; }
|
|
}
|
|
|
|
// sanity -- temporary -- no quickpoll in a thread!!!
|
|
//if ( g_threads.amThread() ) { char *xx=NULL;*xx=0; }
|
|
|
|
// if we are niceness 1 and not in a handler, make it niceness 2
|
|
// so the handlers can be answered and we don't slow other
|
|
// spiders down and we don't slow turks' injections down as much
|
|
if ( ! g_inHandler && niceness == 1 ) niceness = 2;
|
|
|
|
// reset this
|
|
g_missedQuickPolls = 0;
|
|
|
|
if(m_inQuickPoll) {
|
|
log(LOG_WARN,
|
|
"admin: tried to quickpoll from inside quickpoll");
|
|
// this happens when handleRequest3f is called from
|
|
// a quickpoll and it deletes a collection and BigFile::close
|
|
// calls ThreadQueue::removeThreads and Msg3::doneScanning()
|
|
// has niceness 2 and calls quickpoll again!
|
|
return;
|
|
//if(g_conf.m_quickpollCoreOnError) {
|
|
char*xx=NULL;*xx=0;
|
|
// }
|
|
// else return;
|
|
}
|
|
long long now = g_now;
|
|
//long long took = now - m_lastPollTime;
|
|
long long now2 = g_now;
|
|
long gerrno = g_errno;
|
|
|
|
/*
|
|
if(g_conf.m_profilingEnabled){
|
|
now = gettimeofdayInMilliseconds();
|
|
took = now - m_lastPollTime;
|
|
|
|
g_profiler.pause(caller, lineno, took);
|
|
|
|
if(took > g_conf.m_minProfThreshold) {
|
|
if(g_conf.m_dynamicPerfGraph) {
|
|
g_stats.addStat_r ( 0 ,
|
|
m_lastPollTime,
|
|
now,
|
|
0 ,
|
|
STAT_GENERIC,
|
|
caller);
|
|
}
|
|
if(g_conf.m_sequentialProfiling) {
|
|
log(LOG_TIMING, "admin: quickpolling from %s "
|
|
"after %lli ms", caller, took);
|
|
}
|
|
}
|
|
}
|
|
*/
|
|
|
|
g_numQuickPolls++;
|
|
|
|
m_inQuickPoll = true;
|
|
|
|
//g_udpServer2.process_ass ( g_now , 0 );
|
|
g_udpServer.process_ass ( g_now , 0 );
|
|
g_threads.timedCleanUp( 100 , 0 ); // ms ms, niceness 0
|
|
long n;
|
|
|
|
fd_set readfds;
|
|
fd_set writefds;
|
|
fd_set exceptfds;
|
|
// clear fds for select()
|
|
FD_ZERO ( &readfds );
|
|
FD_ZERO ( &writefds );
|
|
FD_ZERO ( &exceptfds );
|
|
timeval v;
|
|
v.tv_sec = 0;
|
|
v.tv_usec = 0;
|
|
// set descriptors we should watch
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
if ( m_readSlots [i] && m_readSlots[i]->m_niceness == 0 ) {
|
|
FD_SET ( i , &readfds );
|
|
FD_SET ( i , &exceptfds );
|
|
}
|
|
if ( m_writeSlots[i] && m_writeSlots[i]->m_niceness == 0 ) {
|
|
FD_SET ( i , &writefds );
|
|
FD_SET ( i , &exceptfds );
|
|
}
|
|
}
|
|
// poll the fd's searching for socket closes
|
|
// this is for httpServer, since we handled
|
|
// udpserver and diskthreads above
|
|
n = select (MAX_NUM_FDS, &readfds, &writefds, &exceptfds, &v);
|
|
// a Slot ptr
|
|
Slot *s;
|
|
if ( n <= 0 ) goto theend;
|
|
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
// continue if not set for reading
|
|
if ( ! FD_ISSET ( i , &readfds ) ) continue;
|
|
// if niceness is not -1, handle it below
|
|
s = m_readSlots [ i /*fd*/ ];
|
|
// now we have niceness 2 if Sections.cpp
|
|
if ( s && s->m_niceness >= niceness ) continue;
|
|
callCallbacks_ass (true/*forReading?*/,i, now);
|
|
// sanity check
|
|
if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; }
|
|
}
|
|
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
|
if ( ! FD_ISSET ( i , &writefds ) ) continue;
|
|
// if niceness is not -1, handle it below
|
|
s = m_writeSlots [ i /*fd*/ ];
|
|
// now we have niceness 2 if Sections.cpp
|
|
if ( s && s->m_niceness >= niceness ) continue;
|
|
callCallbacks_ass (false/*forReading?*/,i, now);
|
|
// sanity check
|
|
if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; }
|
|
}
|
|
|
|
|
|
// now we can have niceness 0 dns slots because of the niceness
|
|
// conversion algorithm...
|
|
g_dns.m_udpServer.sendPoll_ass(true,g_now);
|
|
g_dns.m_udpServer.process_ass ( g_now );
|
|
g_dns.m_udpServer.makeCallbacks_ass(0);
|
|
|
|
theend:
|
|
|
|
// reset this again
|
|
g_missedQuickPolls = 0;
|
|
|
|
// . avoid quickpolls within a quickpoll
|
|
// . was causing seg fault in diskHeartbeatWrapper()
|
|
// which call Threads::bailOnReads()
|
|
m_canQuickPoll = false;
|
|
|
|
// . call sleepcallbacks, like the heartbeat in Process.cpp
|
|
// . MAX_NUM_FDS is the fd for sleep callbacks
|
|
// . specify a niceness of 0 so only niceness 0 sleep callbacks
|
|
// will be called
|
|
callCallbacks_ass ( true , MAX_NUM_FDS , now , 0 );
|
|
// sanity check
|
|
if ( g_niceness > niceness ) {
|
|
log("loop: niceness mismatch");
|
|
//char*xx=NULL;*xx=0; }
|
|
}
|
|
|
|
/*
|
|
now2 = g_now;
|
|
if(g_conf.m_profilingEnabled) {
|
|
took = now2 - now;
|
|
g_profiler.unpause();
|
|
}
|
|
*/
|
|
//log(LOG_WARN, "xx quickpolled took %lli, waited %lli from %s",
|
|
// now2 - now, now - m_lastPollTime, caller);
|
|
m_lastPollTime = now2;
|
|
m_inQuickPoll = false;
|
|
m_needsToQuickPoll = false;
|
|
m_canQuickPoll = true;
|
|
g_errno = gerrno;
|
|
|
|
// reset this again
|
|
g_missedQuickPolls = 0;
|
|
}
|
|
|
|
|
|
void Loop::canQuickPoll(long niceness) {
|
|
if(niceness && !m_shutdown) m_canQuickPoll = true;
|
|
else m_canQuickPoll = false;
|
|
}
|
|
|
|
void Loop::disableTimer() {
|
|
//logf(LOG_WARN,"xxx disabling");
|
|
m_canQuickPoll = false;
|
|
setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL);
|
|
}
|
|
|
|
|
|
void Loop::enableTimer() {
|
|
m_canQuickPoll = true;
|
|
// logf(LOG_WARN, "xxx enabling");
|
|
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
|
|
//setitimer(ITIMER_REAL, &m_quickInterrupt, NULL);
|
|
}
|
|
|
|
|
|
|
|
|
|
//calling with a 0 niceness will turn off the timer interrupt
|
|
void Loop::setitimerInterval(long niceness) {
|
|
if(niceness) {
|
|
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
|
|
m_canQuickPoll = true;
|
|
}
|
|
else {
|
|
setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL);
|
|
m_canQuickPoll = false;
|
|
}
|
|
}
|