open-source-search-engine/Loop.cpp

2648 lines
84 KiB
C++
Raw Normal View History

2013-08-03 00:12:24 +04:00
#include "gb-include.h"
#include "Loop.h"
#include "Threads.h" // g_threads.launchThreads()
#include "UdpServer.h" // g_udpServer2.makeCallbacks()
#include "HttpServer.h" // g_httpServer.m_tcp.m_numQueued
#include "Profiler.h"
#include "Process.h"
#include "PageParser.h"
#include "Threads.h"
#include "Stats.h"
// raised from 5000 to 10000 because we have more UdpSlots now and Multicast
// will call g_loop.registerSleepCallback() if it fails to get a UdpSlot to
// send on.
#define MAX_SLOTS 10000
// apple mac os x does not have real-time signal support
// #ifdef __APPLE__
// #define _POLLONLY_
// #endif
2013-08-03 00:12:24 +04:00
// TODO: . if signal queue overflows another signal is sent
// . capture that signal and use poll or something???
// Tricky Gotchas:
// TODO: if an event happens on a TCP fd/socket before we fully accept it
// we should just register it then call the read callback in case
// we just missed a ready for reading signal!!!!!
// TODO: signals can be gotten off the queue after we've closed an fd
// in which case the handler should be removed from Loop's registry
// BEFORE being closed... so the handler will be NULL... ???
// NOTE: keep in mind that the signals might be delayed or be really fast!
// TODO: don't mask signals, catch them as they arrive? (like in phhttpd)
// . set this to false to disable async signal handling
// . that will make our udp servers less responsive
bool g_isHot = true;
// extern this for all to use
bool g_inSigHandler = false ;
// so we know if interrupts are supposed to be enabled/disabled
bool g_interruptsOn = false;
// are some signals to call g_udpServer2.makeCallbacks() queued?
bool g_someAreQueued = false;
2014-11-11 01:45:11 +03:00
int32_t g_numAlarms = 0;
int32_t g_numVTAlarms = 0;
int32_t g_numQuickPolls = 0;
int32_t g_missedQuickPolls = 0;
int32_t g_numSigChlds = 0;
int32_t g_numSigQueues = 0;
int32_t g_numSigOthers = 0;
2013-08-03 00:12:24 +04:00
// since we can't call gettimeofday() while in a sig handler, we use this
// and update it periodically to keep it somewhat accurate
2014-10-30 22:36:39 +03:00
int64_t g_now = 0;
//int64_t g_nowGlobal = 0;
int64_t g_nowApprox = 0;
2013-08-03 00:12:24 +04:00
char g_inWaitState = false;
// a global class extern'd in .h file
Loop g_loop;
// the global niceness
char g_niceness = 0;
// we make sure the same callback/handler is not hogging the cpu when it is
// niceness 0 and we do not interrupt it, so this is a critical check
class UdpSlot *g_callSlot = NULL;
2014-11-11 01:45:11 +03:00
int32_t g_lastTransId = 0;
int32_t g_transIdCount = 0;
2013-08-03 00:12:24 +04:00
// keep the sig wait time static so we can change it based on m_minTick
static struct timespec s_sigWaitTime ;
static struct timespec s_sigWaitTime2 ;
static struct timespec* s_sigWaitTimePtr ;
// use this in case we unregister the "next" callback
static Slot *s_callbacksNext;
// this is defined in main.cpp
//extern bool mainShutdown ( bool urgent );
// set it from milliseconds
2014-11-11 01:45:11 +03:00
void Loop::setSigWaitTime ( int32_t ms ) {
int32_t secs = ms / 1000;
2013-08-03 00:12:24 +04:00
ms -= secs * 1000;
s_sigWaitTime.tv_sec = secs;
s_sigWaitTime.tv_nsec = ms * 1000000;
}
// free up all our mem
void Loop::reset() {
if ( m_slots ) {
2013-12-17 00:20:13 +04:00
log(LOG_DEBUG,"db: resetting loop");
2013-08-03 00:12:24 +04:00
mfree ( m_slots , MAX_SLOTS * sizeof(Slot) , "Loop" );
}
m_slots = NULL;
/*
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
2013-08-03 00:12:24 +04:00
Slot *s = m_readSlots [ i ];
while ( s ) {
Slot *next = s->m_next;
mfree ( s , sizeof(Slot) ,"Loop" );
s = next;
}
m_readSlots [ i ] = NULL;
s = m_writeSlots [ i ];
while ( s ) {
Slot *next = s->m_next;
mfree ( s , sizeof(Slot) , "Loop" );
s = next;
}
m_writeSlots [ i ] = NULL;
}
*/
}
//static void sigHandler_r ( int x , siginfo_t *info , void *v ) ;
//static void sigHandlerRT ( int x , siginfo_t *info , void *v ) ;
2013-08-03 00:12:24 +04:00
static void sigbadHandler ( int x , siginfo_t *info , void *y ) ;
static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ;
static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
2014-09-03 21:55:29 +04:00
//static void sigioHandler ( int x , siginfo_t *info , void *y ) ;
2013-08-03 00:12:24 +04:00
static void sigalrmHandler( int x , siginfo_t *info , void *y ) ;
2014-09-03 19:38:43 +04:00
static void sigvtalrmHandler( int x , siginfo_t *info , void *y ) ;
2013-08-03 00:12:24 +04:00
2014-11-11 01:45:11 +03:00
//int32_t g_fdWriteBits[MAX_NUM_FDS/32];
//int32_t g_fdReadBits [MAX_NUM_FDS/32];
2013-08-03 00:12:24 +04:00
void Loop::unregisterReadCallback ( int fd, void *state ,
void (* callback)(int fd,void *state),
bool silent ){
if ( fd < 0 ) return;
// from reading
unregisterCallback ( m_readSlots , fd , state , callback ,
silent );
}
2014-09-11 16:26:14 +04:00
void Loop::unregisterWriteCallback ( int fd, void *state ,
void (* callback)(int fd,void *state)){
// from writing
unregisterCallback ( m_writeSlots , fd , state , callback );
}
2013-08-03 00:12:24 +04:00
void Loop::unregisterSleepCallback ( void *state ,
void (* callback)(int fd,void *state)){
unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback);
}
static fd_set s_selectMaskRead;
static fd_set s_selectMaskWrite;
static fd_set s_selectMaskExcept;
static int s_readFds[MAX_NUM_FDS];
2014-11-11 01:45:11 +03:00
static int32_t s_numReadFds = 0;
2014-09-11 16:56:47 +04:00
static int s_writeFds[MAX_NUM_FDS];
2014-11-11 01:45:11 +03:00
static int32_t s_numWriteFds = 0;
2013-08-03 00:12:24 +04:00
void Loop::unregisterCallback ( Slot **slots , int fd , void *state ,
void (* callback)(int fd,void *state) ,
bool silent ) {
// bad fd
if ( fd < 0 ) {log(LOG_LOGIC,
"loop: fd to unregister is negative.");return;}
// set a flag if we found it
bool found = false;
// slots is m_readSlots OR m_writeSlots
Slot *s = slots [ fd ];
Slot *lastSlot = NULL;
// . keep track of new min tick for sleep callbacks
// . sleep a min of 40ms so g_now is somewhat up to date
2014-11-11 01:45:11 +03:00
int32_t min = 40; // 0x7fffffff;
int32_t lastMin = min;
2013-08-03 00:12:24 +04:00
// chain through all callbacks registerd with this fd
while ( s ) {
// get the next slot (NULL if no more)
Slot *next = s->m_next;
// if we're unregistering a sleep callback
// we might have to recalculate m_minTick
if ( s->m_tick < min ) { lastMin = min; min = s->m_tick; }
// skip this slot if callbacks don't match
if ( s->m_callback != callback ) { lastSlot = s; goto skip; }
// skip this slot if states don't match
if ( s->m_state != state ) { lastSlot = s; goto skip; }
// free this slot since it callback matches "callback"
//mfree ( s , sizeof(Slot) , "Loop" );
returnSlot ( s );
found = true;
// if the last one, then remove the FD from s_fdList
// so and clear a bit so doPoll() function is fast
if ( slots[fd] == s && s->m_next == NULL ) {
2014-11-11 01:45:11 +03:00
for (int32_t i = 0; i < s_numReadFds ; i++ ) {
if ( s_readFds[i] != fd ) continue;
s_readFds[i] = s_readFds[s_numReadFds-1];
s_numReadFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskRead );
break;
}
2014-11-11 01:45:11 +03:00
for (int32_t i = 0; i < s_numWriteFds ; i++ ) {
2014-09-11 16:26:14 +04:00
if ( s_writeFds[i] != fd ) continue;
s_writeFds[i] = s_writeFds[s_numWriteFds-1];
s_numWriteFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskWrite);
2014-09-11 16:56:47 +04:00
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log("loop: clearing fd=%"INT32" from "
"write #wrts=%"INT32""
,(int32_t)fd,(int32_t)s_numWriteFds);
// FD_CLR(fd,&s_selectMaskExcept);
2014-09-11 16:26:14 +04:00
break;
}
}
2013-08-03 00:12:24 +04:00
// debug msg
2014-11-11 01:45:11 +03:00
//log("Loop::unregistered fd=%"INT32" state=%"UINT32"", fd, (int32_t)state );
2013-08-03 00:12:24 +04:00
// revert back to old min if this is the Slot we're removing
min = lastMin;
// excise the previous slot from linked list
if ( lastSlot ) lastSlot->m_next = next;
else slots[fd] = next;
// watch out if we're in the previous callback, we need to
// fix the linked list in callCallbacks_ass
if ( s_callbacksNext == s ) s_callbacksNext = next;
skip:
// advance to the next slot
s = next;
}
// set our new minTick if we were unregistering a sleep callback
if ( fd == MAX_NUM_FDS ) {
m_minTick = min;
// . set s_sigWaitTime to m_minTick
// . 1 billion nanoseconds = 1 second
// . m_minTick is in milliseconds, 1000 ms in a second
// . multiply m_minTick in ms by 1 million to get nano
setSigWaitTime ( m_minTick );
}
// return now if found
if ( found ) return;
// . otherwise, bitch if we're not silent
// . HttpServer.cpp always calls this even if it did not register its
// File's fd just to make sure.
if ( silent ) return;
2014-09-11 16:56:47 +04:00
return;
// sometimes the socket is abruptly closed and that calls the
// unregisterWriteCallback() for us... so skip this
2013-08-03 00:12:24 +04:00
log(LOG_LOGIC,
"loop: unregisterCallback: callback not found (fd=%i).",fd);
}
bool Loop::registerReadCallback ( int fd,
void *state,
void (* callback)(int fd,void *state ) ,
2014-11-11 01:45:11 +03:00
int32_t niceness ) {
2013-08-03 00:12:24 +04:00
// the "true" answers the question "for reading?"
if ( addSlot ( true, fd, state, callback, niceness ) ) return true;
return log("loop: Unable to register read callback.");
}
2014-09-11 16:26:14 +04:00
bool Loop::registerWriteCallback ( int fd,
void *state,
void (* callback)(int fd, void *state ) ,
2014-11-11 01:45:11 +03:00
int32_t niceness ) {
2014-09-11 16:26:14 +04:00
// the "false" answers the question "for reading?"
if ( addSlot ( false, fd, state, callback, niceness ) )return true;
return log("loop: Unable to register write callback.");
}
2013-08-03 00:12:24 +04:00
// tick is in milliseconds
2014-11-11 01:45:11 +03:00
bool Loop::registerSleepCallback ( int32_t tick ,
2013-08-03 00:12:24 +04:00
void *state,
void (* callback)(int fd,void *state ) ,
2014-11-11 01:45:11 +03:00
int32_t niceness ) {
2013-08-03 00:12:24 +04:00
if ( ! addSlot ( true, MAX_NUM_FDS, state, callback , niceness ,tick) )
return log("loop: Unable to register sleep callback");
if ( tick < m_minTick ) m_minTick = tick;
2014-11-11 01:45:11 +03:00
// wait this int32_t in the sig wait loop
2013-08-03 00:12:24 +04:00
setSigWaitTime ( m_minTick );
return true;
}
// . returns false and sets g_errno on error
bool Loop::addSlot ( bool forReading , int fd, void *state,
2014-11-11 01:45:11 +03:00
void (* callback)(int fd, void *state), int32_t niceness ,
int32_t tick ) {
2013-08-03 00:12:24 +04:00
// ensure fd is >= 0
if ( fd < 0 ) {
g_errno = EBADENGINEER;
return log(LOG_LOGIC,"loop: fd to register is negative.");
}
// sanity
if ( fd > MAX_NUM_FDS ) {
2014-11-11 01:45:11 +03:00
log("loop: bad fd of %"INT32"",(int32_t)fd);
2013-08-03 00:12:24 +04:00
char *xx=NULL;*xx=0;
}
// . ensure fd not already registered with this callback/state
// . prevent dups so you can keep calling register w/o fear
Slot *s;
2014-09-11 16:26:14 +04:00
if ( forReading ) s = m_readSlots [ fd ];
else s = m_writeSlots [ fd ];
2013-08-03 00:12:24 +04:00
while ( s ) {
if ( s->m_callback == callback &&
s->m_state == state ) {
// don't set g_errno for this anymore, just bitch
//g_errno = EBADENGINEER;
log(LOG_LOGIC,"loop: fd %i is already registered.",fd);
return true;
}
s = s->m_next;
}
// . make a new slot
// . TODO: implement mprimealloc() to pre-alloc slots for us for speed
//s = (Slot *) mmalloc ( sizeof(Slot ) ,"Loop");
s = getEmptySlot ( );
if ( ! s ) return false;
// for pointing to slot already in position for fd
Slot *next ;
// store ourselves in the slot for this fd
2014-09-11 16:26:14 +04:00
if ( forReading ) {
next = m_readSlots [ fd ];
m_readSlots [ fd ] = s;
// if not already registered, add to list
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskRead ) ) {
s_readFds[s_numReadFds++] = fd;
FD_SET ( fd,&s_selectMaskRead );
// sanity
if ( s_numReadFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
}
2013-08-03 00:12:24 +04:00
// fd == MAX_NUM_FDS if it's a sleep callback
//if ( fd < MAX_NUM_FDS ) {
//FD_SET ( fd , &m_readfds );
//FD_SET ( fd , &m_exceptfds );
//}
2014-09-11 16:26:14 +04:00
}
else {
next = m_writeSlots [ fd ];
m_writeSlots [ fd ] = s;
//FD_SET ( fd , &m_writefds );
// if not already registered, add to list
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
s_writeFds[s_numWriteFds++] = fd;
FD_SET ( fd,&s_selectMaskWrite );
// sanity
if ( s_numWriteFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
}
}
2013-08-03 00:12:24 +04:00
// set our callback and state
s->m_callback = callback;
s->m_state = state;
// point to the guy that was registered for fd before us
s->m_next = next;
// save our niceness for doPoll()
s->m_niceness = niceness;
// store the tick for sleep wrappers (should be max for others)
s->m_tick = tick;
// and the last called time for sleep wrappers only really
if ( fd == MAX_NUM_FDS ) s->m_lastCall = gettimeofdayInMilliseconds();
// debug msg
2014-11-11 01:45:11 +03:00
//log("Loop::registered fd=%i state=%"UINT32"",fd,state);
2013-08-03 00:12:24 +04:00
// if fd == MAX_NUM_FDS if it's a sleep callback
if ( fd == MAX_NUM_FDS ) return true;
// watch out for big bogus fds used for thread exit callbacks
if ( fd > MAX_NUM_FDS ) return true;
// set fd non-blocking
return setNonBlocking ( fd , niceness ) ;
}
// . now make sure we're listening for an interrupt on this fd
// . set it non-blocing and enable signal catching for it
// . listen for an interrupt for this fd
2014-11-11 01:45:11 +03:00
bool Loop::setNonBlocking ( int fd , int32_t niceness ) {
2013-08-03 00:12:24 +04:00
retry:
int flags = fcntl ( fd , F_GETFL ) ;
if ( flags < 0 ) {
// valgrind
if ( errno == EINTR ) goto retry;
g_errno = errno;
return log("loop: fcntl(F_GETFL): %s.",strerror(errno));
}
retry9:
if ( fcntl ( fd, F_SETFL, flags|O_NONBLOCK|O_ASYNC) < 0 ) {
// valgrind
if ( errno == EINTR ) goto retry9;
g_errno = errno;
return log("loop: fcntl(NONBLOCK): %s.",strerror(errno));
}
// we use select()/poll now so skip stuff below
return true;
/*
2013-08-03 00:12:24 +04:00
retry8:
// tell kernel to send the signal to us when fd is ready for read/write
if ( fcntl (fd, F_SETOWN , getpid() ) < 0 ) {
g_errno = errno;
// valgrind
if ( errno == EINTR ) goto retry8;
return log("loop: fcntl(F_SETOWN): %s.",strerror(errno));
}
// . tell kernel what signal we'd like to recieve when this happens
// . additional signal info (including fd) should be available
#ifdef _POLLONLY_
return true;
#endif
// trunc nicess cuz we only get GB_SIGRTMIN+1 to GB_SIGRTMIN+2 signals
2013-08-03 00:12:24 +04:00
if ( niceness < -1 ) niceness = -1;
if ( niceness > MAX_NICENESS ) niceness = MAX_NICENESS;
// debug msg
2014-11-11 01:45:11 +03:00
//log("fd on niceness = %"INT32" sig = %"INT32"",niceness,GB_SIGRTMIN +1+niceness);
2013-08-03 00:12:24 +04:00
retry6:
// . tell kernel to send this signal when fd is ready for read/write
// . reserve GB_SIGRTMIN for unmaskable interrupts (niceness = -1)
// as used by high priority udp server, g_udpServer2
if ( fcntl (fd, F_SETSIG , GB_SIGRTMIN + 1 + niceness ) < 0 ) {
2013-08-03 00:12:24 +04:00
// valgrind
if ( errno == EINTR ) goto retry6;
g_errno = errno;
return log("loop: fcntl(F_SETSIG): %s.",strerror(errno));
}
return true;
*/
2013-08-03 00:12:24 +04:00
}
// . if "forReading" is true call callbacks registered for reading on "fd"
// . if "forReading" is false call callbacks registered for writing on "fd"
// . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks
2014-10-30 22:36:39 +03:00
void Loop::callCallbacks_ass ( bool forReading , int fd , int64_t now ,
2014-11-11 01:45:11 +03:00
int32_t niceness ) {
2014-09-11 16:26:14 +04:00
2013-08-03 00:12:24 +04:00
// debug msg
//if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass start");
//if ( fd != 1024 ) {
2014-11-11 01:45:11 +03:00
//if (forReading) fprintf(stderr,"got read sig on fd=%"INT32"\n",(int32_t)fd);
//else fprintf(stderr,"got write sig on fd=%"INT32"\n",(int32_t)fd);
2013-08-03 00:12:24 +04:00
//}
// save the g_errno to send to all callbacks
int saved_errno = g_errno;
// get the first Slot in the chain that is waiting on this fd
Slot *s ;
//if ( forReading ) s = m_readSlots [ fd ];
//else s = m_writeSlots [ fd ];
s = m_readSlots [ fd ];
2013-08-03 00:12:24 +04:00
// ensure we called something
2014-11-11 01:45:11 +03:00
int32_t numCalled = 0;
2013-08-03 00:12:24 +04:00
// a hack fix
if ( niceness == -1 && m_inQuickPoll ) niceness = 0;
// . now call all the callbacks
// . most will re-register themselves (i.e. call registerCallback...()
2014-10-30 22:36:39 +03:00
//int64_t startTime = gettimeofdayInMilliseconds();
2013-08-03 00:12:24 +04:00
while ( s ) {
// skip this slot if he has no callback
if ( ! s->m_callback ) continue;
// NOTE: callback can unregister fd for Slot s, so get next
//Slot *next = s->m_next;
s_callbacksNext = s->m_next;
// watch out if clock was set back
if ( s->m_lastCall > now ) s->m_lastCall = now;
// if we're a sleep callback, check to make sure not premature
if ( fd == MAX_NUM_FDS && s->m_lastCall + s->m_tick > now ) {
s = s_callbacksNext; continue; }
// skip if not a niceness match
if ( niceness == 0 && s->m_niceness != 0 ) {
s = s_callbacksNext; continue; }
// update the lastCall timestamp for this slot
if ( fd == MAX_NUM_FDS ) s->m_lastCall = now;
// . debug msg
// . this is called a lot cuz we process all dgrams/whatever
// in one clump so there's a lot of redundant signals
//if ( g_conf.m_logDebugUdp && fd != 1024 )
2014-11-11 01:45:11 +03:00
// log("Loop::callCallbacks_ass: for fd=%"INT32" state=%"UINT32"",
// fd,(int32_t)s->m_state);
2013-08-03 00:12:24 +04:00
// do the callback
2014-11-11 01:45:11 +03:00
//int32_t address = 0;
// uint64_t profilerStart,profilerEnd;
// uint64_t statStart, statEnd;
2013-08-03 00:12:24 +04:00
/*
if(g_conf.m_profilingEnabled){
2014-11-11 01:45:11 +03:00
address=(int32_t)s->m_callback;
2013-08-03 00:12:24 +04:00
g_profiler.startTimer(address,
__PRETTY_FUNCTION__);
//profilerStart=gettimeofdayInMillisecondsLocal();
//statStart = gettimeofdayInMilliseconds();
}
*/
//startBlockedCpuTimer();
// log it now
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log(LOG_DEBUG,"loop: enter fd callback fd=%"INT32" "
"nice=%"INT32"",(int32_t)fd,(int32_t)s->m_niceness);
2013-08-03 00:12:24 +04:00
2014-11-18 05:13:36 +03:00
// sanity check. -1 no longer supported
2013-08-03 00:12:24 +04:00
if ( s->m_niceness < 0 ) { char *xx=NULL;*xx=0; }
// save it
2014-11-11 01:45:11 +03:00
int32_t saved = g_niceness;
2013-08-03 00:12:24 +04:00
// set the niceness
g_niceness = s->m_niceness;
// make sure not 2
if ( g_niceness >= 2 ) g_niceness = 1;
// sanity check -- need to be able to quickpoll!
//if ( s->m_niceness > 0 && ! g_loop.m_canQuickPoll ) {
// char *xx=NULL;*xx=0; }
s->m_callback ( fd , s->m_state );
// restore it
g_niceness = saved;
// log it now
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log(LOG_DEBUG,"loop: exit fd callback fd=%"INT32" "
"nice=%"INT32"", (int32_t)fd,(int32_t)s->m_niceness);
2013-08-03 00:12:24 +04:00
/*
if(g_conf.m_profilingEnabled){
//profilerEnd =gettimeofdayInMillisecondsLocal();
if(!g_profiler.endTimer(address, __PRETTY_FUNCTION__))
2014-11-11 01:45:11 +03:00
log(LOG_WARN,"admin: Couldn't add the fn %"INT32"",
(int32_t)address);
2013-08-03 00:12:24 +04:00
}
*/
// . debug msg
// . this is called a lot cuz we process all dgrams/whatever
// in one clump so there's a lot of redundant signals
//if ( g_conf.m_logDebugUdp && fd != 1024 )
// log("Loop::callCallbacks_ass: back");
// inc the flag
numCalled++;
// reset g_errno so all callbacks for this fd get same g_errno
g_errno = saved_errno;
// get the next n (will be -1 if no slot after it)
s = s_callbacksNext;
}
s_callbacksNext = NULL;
2014-10-30 22:36:39 +03:00
// int64_t now2 = gettimeofdayInMilliseconds();
// int64_t took = now2 - startTime;
2013-08-03 00:12:24 +04:00
// if(g_conf.m_profilingEnabled && took > 10) {
// g_stats.addStat_r ( 0 ,
// startTime,
// now2,
// 0 ,
// STAT_GENERIC,
// __PRETTY_FUNCTION__,__LINE__);
// if(g_conf.m_sequentialProfiling) {
// log(LOG_TIMING,
2014-11-11 01:45:11 +03:00
// "admin: loop time to do %"INT32" callbacks: %"INT64" ms",
2013-08-03 00:12:24 +04:00
// numCalled,took);
// }
// }
// log an error if nothing called
//if ( ! called ) log("Loop::callCallbacks: no callback for signal");
// debug msg
//if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass end");
}
Loop::Loop ( ) {
// . default sig wait time to 10 ms (10,000,000 nanoseconds)
// . 1 billion nanoseconds = 1 second
setSigWaitTime ( 1000 /*ms*/ );
s_sigWaitTime2.tv_sec = 0;
s_sigWaitTime2.tv_nsec = 0;
s_sigWaitTimePtr = &s_sigWaitTime;
m_inQuickPoll = false;
m_needsToQuickPoll = false;
m_canQuickPoll = false;
m_isDoingLoop = false;
2013-08-03 00:12:24 +04:00
// set all callbacks to NULL so we know they're empty
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS+2 ; i++ ) {
2013-08-03 00:12:24 +04:00
m_readSlots [i] = NULL;
//m_writeSlots[i] = NULL;
2013-08-03 00:12:24 +04:00
}
// the extra sleep slots
//m_readSlots [ MAX_NUM_FDS ] = NULL;
m_slots = NULL;
}
// free all slots from addSlots
Loop::~Loop ( ) {
reset();
}
// returns NULL and sets g_errno if none are left
Slot *Loop::getEmptySlot ( ) {
Slot *s = m_head;
if ( ! s ) {
g_errno = EBUFTOOSMALL;
log("loop: No empty slots available. "
"Increase #define MAX_SLOTS.");
return NULL;
}
m_head = s->m_nextAvail;
return s;
}
void Loop::returnSlot ( Slot *s ) {
s->m_nextAvail = m_head;
m_head = s;
}
// . come here when we get a GB_SIGRTMIN+X signal etc.
// . do not call anything from here because the purpose of this is to just
// queue the signals up and DO DEDUPING which linux does not do causing
// the sigqueue to overflow.
// . we should break out of the sleep loop after the signal is handled
// so we can handle/process the queued signals properly. 'man sleep'
// states "sleep() makes the calling process sleep until seconds
// seconds have elapsed or a signal arrives which is not ignored."
void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
// if we just needed to cleanup a thread
if ( info->si_signo == SIGCHLD ) {
2014-09-03 20:18:30 +04:00
g_numSigChlds++;
// this has no fd really, Threads.cpp just sends it when
// the thread is done
g_threads.m_needsCleanup = true;
return;
}
if ( info->si_code == SI_QUEUE ) {
2014-09-03 20:18:30 +04:00
g_numSigQueues++;
//log("admin: got sigqueue");
// the thread is done
g_threads.m_needsCleanup = true;
return;
}
2014-09-03 20:18:30 +04:00
// wtf is this?
g_numSigOthers++;
2014-11-18 05:13:36 +03:00
// the stuff below should no longer be used since we
// do not use F_SETSIG now
return;
/*
// extract the file descriptor that needs attention
int fd = info->si_fd;
if ( fd >= MAX_NUM_FDS ) {
log("loop: CRITICAL ERROR. fd=%i > %i",fd,(int)MAX_NUM_FDS);
return;
}
// set the right callback
// info->si_band values:
//#define POLLIN 0x0001 // There is data to read
//#define POLLPRI 0x0002 // There is urgent data to read
//#define POLLOUT 0x0004 // Writing now will not block
//#define POLLERR 0x0008 // Error condition
//#define POLLHUP 0x0010 // Hung up
//#define POLLNVAL 0x0020 // Invalid request: fd not open
int band = info->si_band;
// translate SIGPIPE's to band of POLLHUP
if ( info->si_signo == SIGPIPE ) {
band = POLLHUP;
//log("loop: Received SIGPIPE signal. Broken pipe.");
}
// . call the appropriate handler(s)
// . TODO: bitch if no callback to handle the read!!!!!!!
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
// . NOTE: or when a socket is trying to connect to it if it's listener
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
// g_loop.callCallbacks_ass ( true , fd ); // for reading
if ( band & POLLIN ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
2014-11-11 01:45:11 +03:00
//log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd);
//g_loop.callCallbacks_ass ( true , fd );
g_fdReadBits[fd/32] = 1<<(fd%32);
}
else if ( band & POLLPRI ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
2014-11-11 01:45:11 +03:00
//log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd);
//g_loop.callCallbacks_ass ( true , fd ) ;
g_fdReadBits[fd/32] = 1<<(fd%32);
}
else if ( band & POLLOUT ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_writeSignals++;
2014-11-11 01:45:11 +03:00
//log("Loop: write %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd)
//g_loop.callCallbacks_ass ( false , fd );
g_fdWriteBits[fd/32] = 1<<(fd%32);
}
// fix qainject1() test with this
else if ( band & POLLERR ) {
//log(LOG_INFO,"loop: got POLLERR on fd=%i.",fd);
}
//g_loop.callCallbacks_ass ( false , fd );
// this happens if the socket closes abruptly
// or out of band data, etc... see "man 2 poll" for more info
else if ( band & POLLHUP ) {
// i see these all the time for fd == 0, so don't print it
//if ( fd != 0 )
// log(LOG_INFO,"loop: Received hangup on fd=%i.",fd);
// it is ready for writing i guess
g_fdWriteBits[fd/32] = 1<<(fd%32);
}
*/
}
2013-08-03 00:12:24 +04:00
bool Loop::init ( ) {
// clear this up here before using in doPoll()
FD_ZERO(&s_selectMaskRead);
FD_ZERO(&s_selectMaskWrite);
FD_ZERO(&s_selectMaskExcept);
2013-08-03 00:12:24 +04:00
// redhat 9's NPTL doesn't like our async signals
if ( ! g_conf.m_allowAsyncSignals ) g_isHot = false;
#ifdef _VALGRIND_
g_isHot = false;
#endif
// sighupHandler() will set this to true so we know when to shutdown
m_shutdown = 0;
// . reset this cuz we have no sleep callbacks right now
// . sleep a min of 40ms so g_now is somewhat up to date
m_minTick = 40; //0x7fffffff;
// reset the need to poll flag
m_needToPoll = false;
// let 'em know if we're hot
if ( g_isHot ) log ( LOG_INIT , "loop: Using asynchronous signals "
"for udp server.");
// make slots
2014-11-11 01:45:11 +03:00
m_slots = (Slot *) mmalloc ( MAX_SLOTS * (int32_t)sizeof(Slot) , "Loop" );
2013-08-03 00:12:24 +04:00
if ( ! m_slots ) return false;
// log it
2014-11-11 01:45:11 +03:00
log(LOG_DEBUG,"loop: Allocated %"INT32" bytes for %"INT32" callbacks.",
MAX_SLOTS * (int32_t)sizeof(Slot),(int32_t)MAX_SLOTS);
2013-08-03 00:12:24 +04:00
// init link list ptr
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_SLOTS - 1 ; i++ ) {
2013-08-03 00:12:24 +04:00
m_slots[i].m_nextAvail = &m_slots[i+1];
}
m_slots[MAX_SLOTS - 1].m_nextAvail = NULL;
m_head = &m_slots[0];
m_tail = &m_slots[MAX_SLOTS - 1];
// an innocent log msg
//log ( 0 , "Loop: starting the i/o loop");
// . when using threads GB_SIGRTMIN becomes 35, not 32 anymore
// since threads use these signals to reactivate suspended threads
// . debug msg
2014-11-11 01:45:11 +03:00
//log("admin: GB_SIGRTMIN=%"INT32"", (int32_t)GB_SIGRTMIN );
2013-08-03 00:12:24 +04:00
// . block the GB_SIGRTMIN signal
// . anytime this is raised it goes onto the signal queue
// . we use sigtimedwait() to get signals off the queue
// . sigtimedwait() selects the lowest signo first for handling
// . therefore, GB_SIGRTMIN is higher priority than (GB_SIGRTMIN + 1)
//sigfillset ( &sigs );
// set of signals to block
sigset_t sigs;
sigemptyset ( &sigs );
sigaddset ( &sigs , SIGPIPE ); //if we write to a close socket
// #ifndef _VALGRIND_
// sigaddset ( &sigs , GB_SIGRTMIN );
// #endif
// sigaddset ( &sigs , GB_SIGRTMIN + 1 );
// sigaddset ( &sigs , GB_SIGRTMIN + 2 );
// sigaddset ( &sigs , GB_SIGRTMIN + 3 );
2013-08-03 00:12:24 +04:00
sigaddset ( &sigs , SIGCHLD );
2013-09-14 01:37:35 +04:00
#ifdef PTHREADS
// now since we took out SIGIO... (see below)
// we should ignore this signal so it doesn't suddenly stop the gb
// process since we took out the SIGIO handler because newer kernels
// were throwing SIGIO signals ALL the time, on every datagram
// send/receive it seemed and bogged us down.
sigaddset ( &sigs , SIGIO );
#endif
2013-08-03 00:12:24 +04:00
// . block on any signals in this set (in addition to current sigs)
// . use SIG_UNBLOCK to remove signals from block list
// . this returns -1 and sets g_errno on error
// . we block a signal so it does not interrupt us, then we can
// take it off using our call to sigtimedwait()
// . allow it to interrupt us now and we will queue it ourselves
// to prevent the linux queue from overflowing
// . see 'cat /proc/<pid>/status | grep SigQ' output to see if
// overflow occurs. linux does not dedup the signals so when a
// host cpu usage hits 100% it seems to miss a ton of signals.
// i suspect the culprit is pthread_create() so we need to get
// thread pools out soon.
// . now we are handling the signals and queueing them ourselves
// so comment out this sigprocmask() call
// if ( sigprocmask ( SIG_BLOCK, &sigs, 0 ) < 0 ) {
// g_errno = errno;
// return log("loop: sigprocmask: %s.",strerror(g_errno));
// }
struct sigaction sa2;
// . sa_mask is the set of signals that should be blocked when
// we're handling the GB_SIGRTMIN, make this empty
// . GB_SIGRTMIN signals will be automatically blocked while we're
// handling a SIGIO signal, so don't worry about that
// . what sigs should be blocked when in our handler? the same
// sigs we are handling i guess
gbmemcpy ( &sa2.sa_mask , &sigs , sizeof(sigs) );
sa2.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
// call this function
sa2.sa_sigaction = sigHandlerQueue_r;
g_errno = 0;
if ( sigaction ( SIGPIPE, &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN , &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN + 1, &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN + 2, &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN + 3, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGCHLD, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGIO, &sa2, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction(): %s.", mstrerror(g_errno) );
2013-08-03 00:12:24 +04:00
// . we turn this signal on/off to turn interrupts off/on
// . clear all signals from the set
//sigemptyset ( &m_sigrtmin );
// tmp debug hack, so we don't have real time signals now...
//sigaddset ( &m_sigrtmin, GB_SIGRTMIN );
/*
2013-08-03 00:12:24 +04:00
// now set up a signal handler to handle just/only SIGIO
struct sigaction sa;
// . sa_mask is the set of signals that should be blocked when
// we're handling the GB_SIGRTMIN, make this empty
// . GB_SIGRTMIN signals will be automatically blocked while we're
// handling a SIGIO signal, so don't worry about that
sigemptyset (&sa.sa_mask);
// . these flags determine the signal handling process
// . SA_SIGINFO means to use sa.sa_sigaction() as the sig handler
// and not sa_.sa_handler() (added in Linux 2.1.86)
// . this allows us to get the siginfo_t structure to get the fd
// that generated the signal
// . SA_ONESHOT restores the handler to the default state when
// our sig handler is called so we don't get interuppted by another
// signal when we're handling that one
// . incoming GB_SIGRTMIN sigs should be queued (sigtimedwait())
sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
// the handler for unblocked signals, same as other signals really
sa.sa_sigaction = sigHandlerRT;
// clear g_errno
g_errno = 0;
// now when we got an unblocked GB_SIGRTMIN signal go here right away
// #ifndef _VALGRIND_
// if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno;
// if ( g_errno)log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
// #endif
2013-08-03 00:12:24 +04:00
// set it this way for SIGIO's
sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT;
// . define the actual routine that handles the SIGIO signal
// . void (*sa_sigaction)(int, siginfo_t *, void *);
sa.sa_sigaction = sigioHandler;
// . register our sigHandler() to handle the GB_SIGRTMIN signal
// . when a file/socket is made non-blocking it should have done a:
// fcntl(fd,SET_SIG,GB_SIGRTMIN) so we're notified with that signal
// . this returns -1 and sets g_errno on error
// . TODO: is this the SOURCE signal or the altered signal?
2013-09-14 01:37:35 +04:00
#ifndef PTHREADS
// i think this was supposed to be sent when the signal queue was
// overflowing so we needed to do a poll operation when we got this
// signal, however, newer kernel seems to throw this signal all the
// time when it just gets IO causing cpu to be 100% floored!
// i'm afraid without this code we might miss data on a socket
// or not read it promptly, but let's see how it goes.
2013-08-03 00:12:24 +04:00
if ( sigaction ( SIGIO, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGIO: %s.", mstrerror(errno));
#endif
*/
2013-08-03 00:12:24 +04:00
struct sigaction sa;
// . sa_mask is the set of signals that should be blocked when
// we're handling the signal, make this empty
// . GB_SIGRTMIN signals will be automatically blocked while we're
// handling a SIGIO signal, so don't worry about that
sigemptyset (&sa.sa_mask);
sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT;
2013-08-03 00:12:24 +04:00
// handle HUP signals gracefully by saving and shutting down
sa.sa_sigaction = sighupHandler;
if ( sigaction ( SIGHUP , &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGHUP: %s.", mstrerror(errno));
if ( sigaction ( SIGTERM, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGTERM: %s.", mstrerror(errno));
// if ( sigaction ( SIGABRT, &sa, 0 ) < 0 ) g_errno = errno;
// if ( g_errno ) log("loop: sigaction SIGTERM: %s.",mstrerror(errno));
2013-08-03 00:12:24 +04:00
// we should save our data on segv, sigill, sigfpe, sigbus
sa.sa_sigaction = sigbadHandler;
if ( sigaction ( SIGSEGV, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGSEGV: %s.", mstrerror(errno));
if ( sigaction ( SIGILL , &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGILL: %s.", mstrerror(errno));
if ( sigaction ( SIGFPE , &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGFPE: %s.", mstrerror(errno));
if ( sigaction ( SIGBUS , &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
// if ( sigaction ( SIGQUIT , &sa, 0 ) < 0 ) g_errno = errno;
// if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
// if ( sigaction ( SIGSYS , &sa, 0 ) < 0 ) g_errno = errno;
// if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
2013-08-03 00:12:24 +04:00
// if the UPS is about to go off it sends a SIGPWR
sa.sa_sigaction = sigpwrHandler;
if ( sigaction ( SIGPWR, &sa, 0 ) < 0 ) g_errno = errno;
//now set up our alarm for quickpoll
m_quickInterrupt.it_value.tv_sec = 0;
m_quickInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000;
m_quickInterrupt.it_interval.tv_sec = 0;
m_quickInterrupt.it_interval.tv_usec = QUICKPOLL_INTERVAL * 1000;
m_realInterrupt.it_value.tv_sec = 0;
// 1000 microseconds in a millisecond
m_realInterrupt.it_value.tv_usec = 1 * 1000;
m_realInterrupt.it_interval.tv_sec = 0;
m_realInterrupt.it_interval.tv_usec = 1 * 1000;
2013-08-03 00:12:24 +04:00
m_noInterrupt.it_value.tv_sec = 0;
m_noInterrupt.it_value.tv_usec = 0;
m_noInterrupt.it_interval.tv_sec = 0;
m_noInterrupt.it_interval.tv_usec = 0;
2014-09-03 19:38:43 +04:00
//m_realInterrupt.it_value.tv_sec = 0;
//m_realInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000;
2013-08-03 00:12:24 +04:00
// set the interrupts to off for now
2014-09-03 19:38:43 +04:00
//mdw:disableTimer();
2013-08-03 00:12:24 +04:00
2014-09-12 13:42:00 +04:00
// make this 10ms i guess
setitimer(ITIMER_REAL, &m_realInterrupt, NULL);
2014-09-03 19:38:43 +04:00
// this is 10ms
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
2013-08-03 00:12:24 +04:00
sa.sa_sigaction = sigalrmHandler;
// it's gotta be real time, not virtual cpu time now
2014-09-03 19:38:43 +04:00
if ( sigaction ( SIGALRM, &sa, 0 ) < 0 ) g_errno = errno;
2013-08-03 00:12:24 +04:00
if ( g_errno ) return log("loop: sigaction: %s.", mstrerror(errno));
2014-09-03 19:38:43 +04:00
// block sigvtalarm
sa.sa_sigaction = sigvtalrmHandler;
if ( sigaction ( SIGVTALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGVTALRM: %s.", mstrerror(errno));
2013-08-03 00:12:24 +04:00
// success
return true;
}
// TODO: if we get a segfault while saving, what then?
void sigpwrHandler ( int x , siginfo_t *info , void *y ) {
// let main process know to shutdown
g_loop.m_shutdown = 3;
}
2015-01-10 23:05:39 +03:00
#include <execinfo.h>
void printStackTrace ( int signum , siginfo_t *info , void *ptr ) {
// int arch = 32;
// if ( __WORDSIZE == 64 ) arch = 64;
// if ( __WORDSIZE == 128 ) arch = 128;
// right now only works for 32 bit
//if ( arch != 32 ) return;
2015-01-25 18:46:27 +03:00
logf(LOG_DEBUG,"gb: seg fault. printing stack trace. use "
"'addr2line -e gb' to decode the hex below.");
2015-01-10 23:05:39 +03:00
static void *s_bt[200];
int sz = backtrace(s_bt, 200);
//char **strings = backtrace_symbols(s_bt, sz);
for( int i = 0; i < sz; ++i) {
2015-01-25 18:46:27 +03:00
//unsigned long long ba;
//ba = g_profiler.getFuncBaseAddr((PTRTYPE)s_bt[i]);
2015-01-10 23:05:39 +03:00
//sigsegv_outp("%s", strings[i]);
2015-01-25 18:46:27 +03:00
//logf(LOG_DEBUG,"[0x%llx->0x%llx] %s"
logf(LOG_DEBUG,"[0x%"XINT64"]"
,(uint64_t)s_bt[i]
//,ba
//,g_profiler.getFnName(ba,0));
);
2015-01-10 23:05:39 +03:00
}
}
2013-08-03 00:12:24 +04:00
// TODO: if we get a segfault while saving, what then?
void sigbadHandler ( int x , siginfo_t *info , void *y ) {
// thread should set it errno to 0x7fffffff which means that
// Threads.cpp should not look for its ThreadEntry::m_isDone flag
// to be set before calling waitpid() on it
if ( g_threads.amThread() ) errno = 0x7fffffff;
2014-09-12 13:42:00 +04:00
// turn off sigalarms
g_loop.disableTimer();
2013-08-03 00:12:24 +04:00
log("loop: sigbadhandler. disabling handler from recall.");
// . don't allow this handler to be called again
// . does this work if we're in a thread?
struct sigaction sa;
sigemptyset (&sa.sa_mask);
sa.sa_flags = SA_SIGINFO ; //| SA_ONESHOT;
sa.sa_sigaction = NULL;
sigaction ( SIGSEGV, &sa, 0 ) ;
sigaction ( SIGILL , &sa, 0 ) ;
sigaction ( SIGFPE , &sa, 0 ) ;
sigaction ( SIGBUS , &sa, 0 ) ;
sigaction ( SIGQUIT, &sa, 0 ) ;
sigaction ( SIGSYS , &sa, 0 ) ;
2014-09-12 13:42:00 +04:00
//sigaction ( SIGALRM, &sa, 0 ) ;
2013-08-03 00:12:24 +04:00
// if we've already been here, or don't need to be, then bail
if ( g_loop.m_shutdown ) {
log("loop: sigbadhandler. shutdown already called.");
return;
}
2015-01-10 23:05:39 +03:00
// unwind
printStackTrace( x , info , y );
2013-08-03 00:12:24 +04:00
// if we're a thread, let main process know to shutdown
g_loop.m_shutdown = 2;
2014-11-11 01:45:11 +03:00
log("loop: sigbadhandler. trying to save now. mode=%"INT32"",
(int32_t)g_process.m_mode);
2013-08-03 00:12:24 +04:00
// . this will save all Rdb's
// . if "urgent" is true it will dump core
// . if "urgent" is true it won't broadcast its shutdown to all hosts
//#ifndef NO_MAIN
// mainShutdown ( true ); // urgent?
//#endif
g_process.shutdown ( true );
}
2014-09-03 19:38:43 +04:00
void sigvtalrmHandler ( int x , siginfo_t *info , void *y ) {
2013-08-03 00:12:24 +04:00
#ifdef PTHREADS
// do not allow threads
// this call is very fast, can be called like 400M times per second
if ( g_threads.amThread() ) return;
#endif
2014-09-03 19:38:43 +04:00
// stats
g_numVTAlarms++;
2013-08-03 00:12:24 +04:00
// see if a niceness 0 algo is hogging the cpu
if ( g_callSlot && g_niceness == 0 ) {
// are we handling the same request or callback?
if ( g_callSlot->m_transId == g_lastTransId ) g_transIdCount++;
else g_transIdCount=1;
// set it
g_lastTransId = g_callSlot->m_transId;
// sanity check
//if ( g_transIdCount >= 10 ) { char *xx=NULL;*xx=0; }
bool logIt = false;
if ( g_transIdCount >= 4 ) logIt = true;
// do not spam for msg99 handler so much
if ( g_callSlot->m_msgType == 0x99 && g_transIdCount != 50 )
logIt = false;
// it's not safe to call fprintf() even with
// mutex locks for sig handlers with pthreads
// going on!!!
#ifdef PTHREADS
logIt = false;
#endif
2013-08-03 00:12:24 +04:00
// panic if hogging
if ( logIt ) {
if ( g_callSlot->m_callback )
log("loop: msg type 0x%hhx reply callback "
2014-11-11 01:45:11 +03:00
"hogging cpu for %"INT32" ticks",
2013-08-03 00:12:24 +04:00
g_callSlot->m_msgType,
g_transIdCount);
else
log("loop: msg type 0x%hhx handler "
2014-11-11 01:45:11 +03:00
"hogging cpu for %"INT32" ticks",
2013-08-03 00:12:24 +04:00
g_callSlot->m_msgType,
g_transIdCount);
}
}
g_nowApprox += QUICKPOLL_INTERVAL; // 10 ms
// sanity check
if ( g_loop.m_inQuickPoll &&
g_niceness != 0 &&
// seems to happen a lot when doing a qa test because we slow
// things down a lot when that happens
! g_conf.m_testParserEnabled &&
! g_conf.m_testSpiderEnabled &&
! g_conf.m_testSearchEnabled &&
// likewise if doing a page parser test...
! g_inPageParser &&
! g_inPageInject ) {
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
// i guess sometimes niceness 1 things call niceness 0 things?
log("loop: crap crap crap!!!");
#endif
2013-08-03 00:12:24 +04:00
//char *xx=NULL;*xx=0; }
}
// basically ignore this alarm if already in a quickpoll
if ( g_loop.m_inQuickPoll ) return;
if ( ! g_conf.m_useQuickpoll ) return;
g_loop.m_needsToQuickPoll = true;
2014-11-11 01:45:11 +03:00
//fprintf(stderr,"missed=%"INT32"\n",g_missedQuickPolls);
2013-08-03 00:12:24 +04:00
// another missed quickpoll
if ( g_niceness == 1 ) g_missedQuickPolls++;
// reset if niceness is 0
else if ( g_niceness == 0 ) g_missedQuickPolls = 0;
// if we missed to many, then dump core
if ( g_niceness == 1 && g_missedQuickPolls >= 4 ) {
//g_inSigHandler = true;
// NOT SAFE for pthreads cuz we're in sig handler
#ifndef PTHREADS
log("loop: missed quickpoll");
#endif
//g_inSigHandler = false;
2013-08-03 00:12:24 +04:00
// seems to core a lot in gbcompress() we need to
// put a quickpoll into zlib deflate() or
// deflat_slot() or logest_match() function
// for now do not dump core --- re-enable this later
// mdw TODO
//char *xx=NULL;*xx=0;
}
// if it has been a while since heartbeat (> 10000ms) dump core so
// we can see where the process was... that is a missed quick poll?
if ( g_process.m_lastHeartbeatApprox == 0 ) return;
if ( g_conf.m_maxHeartbeatDelay <= 0 ) return;
if ( g_nowApprox - g_process.m_lastHeartbeatApprox >
g_conf.m_maxHeartbeatDelay ) {
#ifndef PTHREADS
2013-08-03 00:12:24 +04:00
logf(LOG_DEBUG,"gb: CPU seems blocked. Forcing core.");
#endif
2013-08-03 00:12:24 +04:00
//char *xx=NULL; *xx=0;
}
2014-11-11 01:45:11 +03:00
//logf(LOG_DEBUG, "xxx now: %"INT64"! approx: %"INT64"", g_now, g_nowApprox);
2014-09-03 19:38:43 +04:00
}
void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
#ifdef PTHREADS
// do not allow threads
// this call is very fast, can be called like 400M times per second
if ( g_threads.amThread() ) return;
#endif
// so we don't call gettimeofday() thousands of times a second...
g_clockNeedsUpdate = true;
2014-09-03 19:38:43 +04:00
// stats
g_numAlarms++;
// . see where we are in the code
// . for computing cpu usage
// . if idling we will be in sigtimedwait() at the lowest level
Host *h = g_hostdb.m_myHost;
// if doing injects...
if ( ! h ) return;
2014-09-03 19:38:43 +04:00
// . i guess this means we were doing something... (otherwise idle)
// . this is KINDA like a 100 point sample, but it has crazy decay
// logic built into it
if ( ! g_inWaitState )
h->m_pingInfo.m_cpuUsage =
.99 * h->m_pingInfo.m_cpuUsage + .01 * 100;
2014-09-03 19:38:43 +04:00
else
h->m_pingInfo.m_cpuUsage =
.99 * h->m_pingInfo.m_cpuUsage + .01 * 000;
2015-01-21 03:02:10 +03:00
if ( g_profiler.m_realTimeProfilerRunning )
g_profiler.getStackFrame(0);
2013-08-03 00:12:24 +04:00
}
static sigset_t s_rtmin;
void maskSignals() {
static bool s_init = false;
if ( ! s_init ) {
s_init = true;
sigemptyset ( &s_rtmin );
// sigaddset ( &s_rtmin, GB_SIGRTMIN );
// sigaddset ( &s_rtmin, GB_SIGRTMIN + 1 );
// sigaddset ( &s_rtmin, GB_SIGRTMIN + 2 );
// sigaddset ( &s_rtmin, GB_SIGRTMIN + 3 );
sigaddset ( &s_rtmin, SIGCHLD );
sigaddset ( &s_rtmin, SIGIO );
sigaddset ( &s_rtmin, SIGPIPE );
}
// block it
if ( sigprocmask ( SIG_BLOCK , &s_rtmin, 0 ) < 0 ) {
log("loop: maskSignals: sigprocmask: %s.", strerror(errno));
return;
}
}
void unmaskSignals() {
// unblock it
if ( sigprocmask ( SIG_UNBLOCK , &s_rtmin, 0 ) < 0 ) {
log("loop: unmaskSignals: sigprocmask: %s.", strerror(errno));
return;
}
}
2013-08-03 00:12:24 +04:00
// shit, we can't make this realtime!! RdbClose() cannot be called by a
// real time sig handler
void sighupHandler ( int x , siginfo_t *info , void *y ) {
// let main process know to shutdown
g_loop.m_shutdown = 1;
}
// . keep a timestamp for the last time we called the sleep callbacks
// . we have to call those every 1 second
2014-10-30 22:36:39 +03:00
int64_t s_lastTime = 0;
2013-08-03 00:12:24 +04:00
bool Loop::runLoop ( ) {
2013-08-03 00:12:24 +04:00
#ifndef _POLLONLY_
// set of signals to watch for
sigset_t sigs0;
//sigset_t sigs1;
// clear all signals from the set
sigemptyset ( &sigs0 );
//sigemptyset ( &sigs1 );
// . set sigs on which sigtimedwait() listens for
// . add this signal to our set of signals to watch (currently NONE)
sigaddset ( &sigs0, SIGPIPE );
// #ifndef _VALGRIND_
// sigaddset ( &sigs0, GB_SIGRTMIN );
// #endif
// sigaddset ( &sigs0, GB_SIGRTMIN + 1 );
// sigaddset ( &sigs0, GB_SIGRTMIN + 2 );
// sigaddset ( &sigs0, GB_SIGRTMIN + 3 );
2013-08-03 00:12:24 +04:00
sigaddset ( &sigs0, SIGCHLD );
//sigaddset ( &sigs0, SIGVTALRM );
// . TODO: do we need to mask SIGIO too? (sig queue overflow?)
// . i would think so, because what if we tried to queue an important
// handler to be called in the high priority UdpServer but the queue
// was full? Then we would finish processing the signals on the queue
// before we would address the excluded high priority signals by
// calling doPoll()
sigaddset ( &sigs0, SIGIO );
// . set up a time to block waiting for signals to be 1/2 a second
// . 1 billion nanoseconds = 1 second
//struct timespec t = { 0 /*seconds*/, 500000000 /*nanoseconds*/};
//struct timespec t = { 0 /*seconds*/, 10000000 /*nanoseconds*/};
// grab any high priority sig first
//siginfo_t info ;
2014-11-11 01:45:11 +03:00
//int32_t sigNum ; //= sigwaitinfo ( &sigs1, &info );
2013-08-03 00:12:24 +04:00
#endif
s_lastTime = 0;
// . allow us to be interrupted
// . UNBLOCKs GB_SIGRTMIN
// . makes g_udpServer2 quite jumpy
g_loop.interruptsOn();
m_isDoingLoop = true;
2014-09-03 19:38:43 +04:00
//mdw:enableTimer();
2013-08-03 00:12:24 +04:00
// . now loop forever waiting for signals
// . but every second check for timer-based events
BIGLOOP:
g_now = gettimeofdayInMilliseconds();
2013-08-03 00:12:24 +04:00
//set the time back to its exact value and reset
//the timer.
g_nowApprox = g_now;
// MDW: won't this hog cpu? just don't disable it in
// Process::save2() any more and it should be ok
//enableTimer();
m_lastPollTime = g_now;
m_needsToQuickPoll = false;
2013-08-03 00:12:24 +04:00
/*
// test the heartbeat core...
if ( g_numAlarms > 100 ) {
2013-08-03 00:12:24 +04:00
goo:
2014-11-11 01:45:11 +03:00
int32_t j;
for ( int32_t k = 0 ; k < 2000000000 ; k++ ) {
j=k *5;
}
goto goo;
}
*/
2013-08-03 00:12:24 +04:00
g_errno = 0;
2013-08-03 00:12:24 +04:00
if ( m_shutdown ) {
// a msg
if (m_shutdown==1)
log(LOG_INIT,"loop: got SIGHUP or SIGTERM.");
else if (m_shutdown==2)
log(LOG_INIT,"loop: got SIGBAD in thread.");
else
log(LOG_INIT,"loop: got SIGPWR.");
// . turn off interrupts here because it doesn't help to do
// it in the thread
// . TODO: turn off signals for sigbadhandler()
interruptsOff();
// if thread got the signal, just wait for him to save all
// Rdbs and then dump core
if ( m_shutdown == 2 ) {
//log(0,"Thread is saving & shutting down urgently.");
//while ( 1 == 1 ) sleep (50000);
log("loop: Resuming despite thread crash.");
m_shutdown = 0;
goto BIGLOOP;
2013-08-03 00:12:24 +04:00
}
// otherwise, thread did not save, so we must do it
log ( LOG_INIT ,"loop: Saving and shutting down urgently.");
// . this will save all Rdb's and dump core
// . since "urgent" is true it won't broadcast its shutdown
// to all hosts
//#ifndef NO_MAIN
//mainShutdown( true ); // urgent?
//#endif
g_process.shutdown ( true );
}
2013-08-03 00:12:24 +04:00
//
//
// THE HEART OF GB. process events/signals on FDs.
//
//
doPoll();
goto BIGLOOP;
// make compiler happy
return 0;
//g_udpServer2.sendPoll_ass(true,g_now);
//g_udpServer2.process_ass ( g_now );
// MDW: see if this works without this junk, if not then
// put it back in
// seems like never getting signals on redhat 16-core box.
// we always process dgrams through this... wtf? try taking
// it out and seeing what's happening
//g_udpServer.sendPoll_ass (true,g_now);
//g_udpServer.process_ass ( g_now );
// and dns now too
//g_dns.m_udpServer.sendPoll_ass(true,g_now);
//g_dns.m_udpServer.process_ass ( g_now );
// if there was a high niceness http request within a
// quickpoll, we stored it and now we'll call it here.
//g_httpServer.callQueuedPages();
//g_udpServer.printState ( );
// if ( g_someAreQueued ) {
// // assume none are queued now, we may get interrupted
// // and it may get set back to true
// g_someAreQueued = false;
// //g_udpServer2.makeCallbacks_ass ( 0 );
// //g_udpServer2.makeCallbacks_ass ( 1 );
// }
// if ( g_threads.m_needsCleanup ) {
// // bitch about
// static bool s_bitched = false;
// if ( ! s_bitched ) {
// log(LOG_REMIND,"loop: Lost thread signal.");
// s_bitched = true;
// }
2013-08-03 00:12:24 +04:00
// }
//cleanup and launch threads:
//g_threads.printState();
//g_threads.timedCleanUp(4, MAX_NICENESS ) ; // 4 ms
// do it anyway
// take this out as well to see if signals are coming in
//doPoll();
//while ( m_needToPoll ) doPoll();
2013-08-03 00:12:24 +04:00
//#ifndef _POLLONLY_
2013-08-03 00:12:24 +04:00
// hack
//char buffer[100];
//if ( recv(27,buffer,99,MSG_PEEK|MSG_DONTWAIT) == 0 ) {
// logf(LOG_DEBUG,"CLOSED CLOSED!!");
//}
//g_errno = 0;
//check for pending signals, return right away if none.
//then we'll do the low priority stuff while we were
//supposed to be sleeping.
//g_inWaitState = true;
//sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
//#undef usleep
// now we just usleep(). an arriving signal will call
// sigHandlerQueue_r() then break us out of this sleep.
// 10000 microseconds is 10 milliseconds. it should break out
// when a signal comes in just like the sleep() function.
//usleep(1000 * 10);
// reinstate the thing that prevents us from non-chalantly adding
// usleeps() which could degrade performance
//#define usleep(a) { char *xx=NULL;*xx=0; }
// if no signal, we just waited 20 ms and nothing happened
// why do we need this now? MDW
//if ( sigNum == -1 )
// sigalrmHandler( 0,&info,NULL);
2014-11-11 01:45:11 +03:00
//logf(LOG_DEBUG,"loop: sigNum=%"INT32" signo=%"INT32" alrm=%"INT32"",
// (int32_t)sigNum,info.si_signo,(int32_t)SIGVTALRM);
2014-11-18 05:13:36 +03:00
// no longer in a wait state...
//g_inWaitState = false;
2014-11-11 01:45:11 +03:00
// int32_t n = MAX_NUM_FDS / 32;
// // process file descriptor callbacks for file descriptors
// // we queued in sigHandlerQueue_r() function above.
// // we use an array of 1024 bits like the poll function i guess.
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < n ; i++ ) {
// // this is a 32-bit number
// if ( ! g_fdReadBits[i] ) continue;
// // scan the individual bits now
2014-11-11 01:45:11 +03:00
// for ( int32_t j = 0 ; j < 32 ; j++ ) {
// // mask mask
2014-11-11 01:45:11 +03:00
// uint32_t mask = 1 << j;
// // skip jth bit if not on
// if ( ! g_fdReadBits[i] & mask ) continue;
// // block signals for just a sec so we can
// // clear it now that we've handled it
// //maskSignals();
// // clear it
// g_fdReadBits[i] &= ~mask;
// // reinstate signals
// //unmaskSignals();
// // construct the file descriptor
2014-11-11 01:45:11 +03:00
// int32_t fd = i*32 + j;
// // . call all callbacks registered on this fd
// // . forReading = true
// callCallbacks_ass ( true , fd , g_now );
// }
// }
// // do the same thing but for writing now
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < n ; i++ ) {
// // this is a 32-bit number
// if ( ! g_fdWriteBits[i] ) continue;
// // scan the individual bits now
2014-11-11 01:45:11 +03:00
// for ( int32_t j = 0 ; j < 32 ; j++ ) {
// // mask mask
2014-11-11 01:45:11 +03:00
// uint32_t mask = 1 << j;
// // skip jth bit if not on
// if ( ! g_fdWriteBits[i] & mask ) continue;
// // block signals for just a sec so we can
// // clear it now that we've handled it
// //maskSignals();
// // clear it
// g_fdWriteBits[i] &= ~mask;
// // reinstate signals
// //unmaskSignals();
// // construct the file descriptor
2014-11-11 01:45:11 +03:00
// int32_t fd = i*32 + j;
// // . call all callbacks registered on this fd.
// // . forReading = false.
// callCallbacks_ass ( false , fd , g_now );
// }
// }
2014-10-30 22:36:39 +03:00
// int64_t elapsed = g_now - s_lastTime;
// // if someone changed the system clock on us, this could be negative
// // so fix it! otherwise, times may NEVER get called in our lifetime
// if ( elapsed < 0 ) elapsed = m_minTick;
// // call this every (about) m_minTicks milliseconds
// if ( elapsed >= m_minTick ) {
// // MAX_NUM_FDS is the fd for sleep callbacks
// callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// // note the last time we called them
// //g_now = gettimeofdayInMilliseconds();
// s_lastTime = g_now;
// }
// // call remaining callbacks for udp msgs
// if ( g_udpServer.needBottom() )
// g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
// if(gettimeofdayInMillisecondsLocal() -
// startTime > 10)
// goto notime;
// if ( g_conf.m_sequentialProfiling )
// g_threads.printState();
// if ( g_threads.m_needsCleanup )
// // limit to 4ms. cleanup any niceness thread.
// g_threads.timedCleanUp(4 ,MAX_NICENESS);
// #endif
/*
if ( sigNum < 0 ) {
if ( errno == EAGAIN || errno == EINTR ||
errno == EILSEQ || errno == 0 ) {
sigNum = 0;
errno = 0;
2013-08-03 00:12:24 +04:00
}
else if ( errno != ENOMEM ) {
log("loop: sigtimedwait(): %s.",
strerror(errno));
continue;
}
}
if ( sigNum == 0 ) {
//no signals pending, try to take care of anything
// left undone:
2014-10-30 22:36:39 +03:00
int64_t startTime =gettimeofdayInMillisecondsLocal();
if(g_now & 1) {
if(g_udpServer.needBottom())
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
if(gettimeofdayInMillisecondsLocal() -
startTime > 10)
goto notime;
2013-08-03 00:12:24 +04:00
if(g_conf.m_sequentialProfiling)
g_threads.printState();
if(g_threads.m_needsCleanup)
g_threads.timedCleanUp(4 , // ms
MAX_NICENESS);
}
else {
if(g_conf.m_sequentialProfiling)
g_threads.printState();
if(g_threads.m_needsCleanup)
g_threads.timedCleanUp(4 , // ms
MAX_NICENESS);
if(gettimeofdayInMillisecondsLocal() -
startTime > 10)
goto notime;
if(g_udpServer.needBottom())
g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
}
2013-08-03 00:12:24 +04:00
notime:
//if we still didn't get all of them cleaned up set
//sleep time to none.
if(g_udpServer.needBottom() ) {
//g_udpServer2.needBottom()) {
s_sigWaitTimePtr = &s_sigWaitTime2;
2013-08-03 00:12:24 +04:00
}
else {
//otherwise set it to minTick
s_sigWaitTimePtr = &s_sigWaitTime;
2013-08-03 00:12:24 +04:00
}
}
//
// we got a signal, process it
//
else {
if ( info.si_code == SIGIO ) {
log("loop: got sigio");
m_needToPoll = true;
}
// handle the signal
else sigHandler_r ( 0 , &info , NULL );
}
*/
2013-08-03 00:12:24 +04:00
// } while(1)
2013-08-03 00:12:24 +04:00
/*
2013-08-03 00:12:24 +04:00
loop:
g_now = gettimeofdayInMilliseconds();
g_errno = 0;
// debug msg
//if ( g_conf.m_logDebugUdp ) log("Loop: top of loop");
// shutdown if we got a critical signal
if ( m_shutdown ) {
// a msg
if ( m_shutdown == 1 )
log("loop: got SIGHUP or SIGTERM.");
else if ( m_shutdown == 2 )
log("loop: got SIGBAD in thread.");
else
log("loop: got SIGPWR.");
// . turn off interrupts here because it doesn't help to do
// it in the thread
// . TODO: turn off signals for sigbadhandler()
interruptsOff();
// if thread got the signal, just wait for him to save all
// Rdbs and then dump core
if ( m_shutdown == 2 ) {
log("loop: Cored in thread.");
//log ("Thread is saving and shutting down urgently.");
//while ( 1 == 1 ) sleep (50000);
//log("loop: Resuming despite thread crash.");
//m_shutdown = 0;
//goto resume;
}
// otherwise, thread did not save, so we must do it
log ( "loop: Saving and shutting down urgently.");
// sleep forver now so we can call findPtr() function
// after calling g_mem.printBreeches(0)
log("loop: sleeping forever.");
sleep(1000000);
// . this will save all Rdb's and dump core
// . since "urgent" is true it won't broadcast its shutdown
// to all hosts
//#ifndef NO_MAIN
//mainShutdown( true ); // urgent?
//#endif
// urgent = true
g_process.shutdown ( true );
}
// resume:
// . get the time now, sigHandlerRT() can use this, cuz gettimeofday()
// ain't async signal safe
// . g_now only used in hot udpServer for doing time outs really
g_now = gettimeofdayInMilliseconds();
// clear any g_errno before possibly calling sendPoll_ass()
g_errno = 0;
// occasionaly call to sendto() will not send a dgram and since we
// don't count on receiving ready-to-write signals on our
// UdpServer's fds we check them here... it sucks, but hopefully it
// fixes the problem of requests not getting fully transmitted
// and stagnating in the UdpServer.
//if (g_udpServer2.m_needToSend) g_udpServer2.sendPoll_ass(true,g_now);
//if (g_udpServer.m_needToSend ) g_udpServer.sendPoll_ass (true,g_now);
// . well, sender may be choking even with m_needsToSend var
// . i bet we're just losing signals
// . TODO: do we lose them when in the handler? if so, send a signal
// so loop will read/write after coming out of the async handler
//g_udpServer2.sendPoll_ass(true,g_now);
// and also read just in case, too
//g_udpServer2.process_ass ( g_now );
// and the low priority guy
g_udpServer.sendPoll_ass (true,g_now);
g_udpServer.process_ass ( g_now );
// and dns now too
g_dns.m_udpServer.sendPoll_ass(true,g_now);
g_dns.m_udpServer.process_ass ( g_now );
// tcp server contained in http server has the same problem
//if ( g_httpServer.m_tcp.m_numQueued > 0 )
// g_httpServer.m_tcp.writeSocketsInQueue();
// . likewise, we may have not got the SIGQUEUE signal from when
// UdpServer wanted to call a callback but couldn't because it was
// in an async signal handler, and then at SIGQUEUE sig got lost...
// . this should clear those completed transactions we sometimes
// see in the UdpServer socket table
if ( g_someAreQueued ) {
// assume none are queued now, we may get interrupted
// and it may get set back to true
g_someAreQueued = false;
//g_udpServer2.makeCallbacks_ass ( 0 );
//g_udpServer2.makeCallbacks_ass ( 1 );
}
// clean up threads in case signal got lost somehow
if ( g_threads.m_needsCleanup ) {
// bitch about
static bool s_bitched = false;
if ( ! s_bitched ) {
log(LOG_REMIND,"loop: Lost thread signal.");
s_bitched = true;
}
// assume not any more
//g_threads.m_needsCleanup = false;
2013-08-03 00:12:24 +04:00
// check thread queue for any threads that completed
// so we can call their callbacks and remove them
g_threads.cleanUp ( 0 , 1000) ; // max niceness
2013-08-03 00:12:24 +04:00
// launch any threads in waiting since this sig was
// from a terminating one
g_threads.launchThreads();
}
// clear any g_errno before calling sleep callbacks
g_errno = 0;
// get time difference
elapsed = g_now - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) elapsed = m_minTick;
// print log msgs we accumulated while in a signal handler
//if ( g_log.needsPrinting() ) g_log.printBuf();
// . poll if we need to
// . make it a while loop incase a hot sig handler resets m_needToPoll
// . CAUTION: WE CAN LOOP IN HERE FOR ~ A MINUTE! I've seen it happen
// when RdbDump was going...
while ( m_needToPoll ) doPoll();
// call this every (about) 1 second
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// note the last time we called them
s_lastTime = g_now;
}
// just do polls if this linux doesn't support this:
#ifdef _POLLONLY_
doPoll();
#endif
#ifndef _POLLONLY_
// cancel silly errors
//if ( g_errno == EAGAIN ) { sigNum = 0; g_errno = 0; }
//if ( g_errno == EINTR ) { sigNum = 0; g_errno = 0; }
// if sigNum is valid then handle it
//sigHandler_r ( 0 , &info , NULL );
// subloop:
// debug msg
//if ( g_conf.m_logDebugUdp ) log("Loop: entering sigwait");
// . this has a timer resolution of 20ms, I imagine due to how the
// kernel time slices between processes
// . this means UdpServer can not effectively have a wait between
// resends of less than 20ms which makes it a little less zippy
sigNum = sigtimedwait (&sigs0, &info, &s_sigWaitTime ) ;
// cancel silly errors
if ( sigNum < 0 && errno == EAGAIN ) {
sigNum = 0; errno = 0; }
if ( sigNum < 0 && errno == EINTR ) {
sigNum = 0; errno = 0; }
// a zero signal is no signal, just a wake up call
if ( sigNum == 0 ) goto loop;
// sigNum is < 0 on error
if ( sigNum < 0 ) {
// this error happens on the newer libc for some reason
// so ignore it
if ( errno != ENOMEM )
log("loop: sigtimedwait(): %s.",strerror(errno));
goto loop;
}
// sleep test
//log("SLEEPING");
//sleep(10);
2014-10-30 22:36:39 +03:00
//for ( int64_t i = 0 ; i < 1000000000000LL ; i++ );
2013-08-03 00:12:24 +04:00
// . we use g_now in UdpServer.cpp and should make it
// as accurate as possible
// . but it's main use is because gettimeofday() is not async sig safe
g_now = gettimeofdayInMilliseconds();
// debug msg
//if ( g_conf.m_logDebugUdp ) log("Loop: processing signal");
// call sighandler on other queued signals and continue looping
2014-11-11 01:45:11 +03:00
//log("Loop::runLoop:got queued signum=%"INT32"",sigNum);
2013-08-03 00:12:24 +04:00
// was it a SIGIO?
if ( info.si_code == SIGIO ) doPoll();
// handle the signal
else sigHandler_r ( 0 , &info , NULL );
#endif
// don't call any time handlers until no more signals waiting
//goto subloop;
// we need to make g_now as accurate as possible for hot UdpServer...
goto loop;
*/
2013-08-03 00:12:24 +04:00
}
// . the kernel sends a SIGIO signal when the sig queue overflows
// . we resort to polling the fd's when that happens
2014-09-03 21:55:29 +04:00
// void sigioHandler ( int x , siginfo_t *info , void *y ) {
// // set the m_needToPoll flag
// g_loop.m_needToPoll = true;
// return;
// }
2013-08-03 00:12:24 +04:00
//--- TODO: flush the signal queue after polling until done
//--- are we getting stale signals resolved by flush so we get
//--- read event on a socket that isnt in read mode???
// TODO: set signal handler to SIG_DFL to prevent signals from queuing up now
// . this handles high priority fds first (lowest niceness)
void Loop::doPoll ( ) {
// set time
//g_now = gettimeofdayInMilliseconds();
2013-08-03 00:12:24 +04:00
// debug msg
//log("**************** GOT SIGIO *************");
// . turn it off here so it can be turned on again after we've
// called select() so we don't lose any fd events through the cracks
// . some callbacks we call make trigger another SIGIO, but if they
// fail they should set Loop::g_needToPoll to true
m_needToPoll = false;
// debug msg
//if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Entered doPoll.");
if ( g_conf.m_logDebugLoop) log(LOG_DEBUG,"loop: Entered doPoll.");
2013-08-03 00:12:24 +04:00
// print log
if ( g_log.needsPrinting() ) g_log.printBuf();
// sigqueue() might have been called from a hot udp server and
// we queued some handlers to be called
if ( g_someAreQueued ) {
// assume none are queued now, we may get interrupted
// and it may get set back to true
g_someAreQueued = false;
//g_udpServer2.makeCallbacks_ass ( 0 );
//g_udpServer2.makeCallbacks_ass ( 1 );
}
if(g_udpServer.needBottom()) g_udpServer.makeCallbacks_ass ( 1 );
//if(g_udpServer2.needBottom()) g_udpServer2.makeCallbacks_ass ( 1 );
2014-09-03 19:38:43 +04:00
//bool processedOne;
2014-11-11 01:45:11 +03:00
int32_t n;
// int32_t repeats = 0;
2013-08-03 00:12:24 +04:00
// skipLowerPriorities:
// descriptor bits for calling select()
// fd_set readfds;
// fd_set writefds;
// fd_set exceptfds;
2013-08-03 00:12:24 +04:00
// clear fds for select()
//FD_ZERO ( &readfds );
//FD_ZERO ( &writefds );
//FD_ZERO ( &exceptfds );
2013-08-03 00:12:24 +04:00
timeval v;
v.tv_sec = 0;
if ( m_inQuickPoll ) v.tv_usec = 0;
2014-09-03 19:38:43 +04:00
// 10ms for sleepcallbacks so they can be called...
// and we need this to be the same as sigalrmhandler() since we
// keep track of cpu usage here too, since sigalrmhandler is "VT"
// based it only goes off when that much "cpu time" has elapsed.
else v.tv_usec = QUICKPOLL_INTERVAL * 1000;
2013-08-03 00:12:24 +04:00
// set descriptors we should watch
2014-11-18 05:13:36 +03:00
// MDW: no longer necessary since we have s_selectMaskRead, etc.
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// if ( m_readSlots [i] ) {
// FD_SET ( i , &readfds );
// FD_SET ( i , &exceptfds );
// }
// if ( m_writeSlots[i] ) {
// FD_SET ( i , &writefds );
// FD_SET ( i , &exceptfds );
// }
// }
2013-08-03 00:12:24 +04:00
again:
// gotta copy to our own since bits get cleared by select() function
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
gbmemcpy ( &readfds, &s_selectMaskRead , sizeof(fd_set) );
gbmemcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) );
//gbmemcpy ( &exceptfds, &s_selectMaskExcept , sizeof(fd_set) );
// what is the point of fds for writing... its for when we
// get a new socket via accept() it is read for writing...
//FD_ZERO ( &writefds );
FD_ZERO ( &exceptfds );
if ( g_conf.m_logDebugLoop )
log("loop: in select");
2014-09-03 19:38:43 +04:00
// used to measure cpu usage. sigalarm needs to know if we are
// sitting idle in select() or are actively doing something w/ the cpu
g_inWaitState = true;
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( FD_ISSET ( i , &s_selectMaskRead ) ||
// FD_ISSET ( i , &writefds ) ||
// FD_ISSET ( i , &exceptfds ) )
// // debug
2014-11-11 01:45:11 +03:00
// log("loop: fd %"INT32" is set",i);
// // if niceness is not -1, handle it below
// }
// . poll the fd's searching for socket closes
// . the sigalrms and sigvtalrms and SIGCHLDs knock us out of this
// select() with n < 0 and errno equal to EINTR
n = select (MAX_NUM_FDS,
&readfds,
&writefds,
&exceptfds,
&v );
g_inWaitState = false;
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log("loop: out select n=%"INT32" errno=%"INT32" errnomsg=%s",
(int32_t)n,(int32_t)errno,mstrerror(errno));
2013-08-03 00:12:24 +04:00
if ( n < 0 ) {
// valgrind
if ( errno == EINTR ) {
// got it. if we get a sig alarm or vt alarm or
// SIGCHLD (from Threads.cpp) we end up here.
2014-11-11 01:45:11 +03:00
//log("loop: got errno=%"INT32"",(int32_t)errno);
// if shutting own was it a sigterm ?
if ( m_shutdown ) goto again;
// handle returned threads for niceness 0
g_threads.timedCleanUp(-3,0); // 3 ms
if ( m_inQuickPoll ) goto again;
// high niceness threads
g_threads.timedCleanUp(-4,MAX_NICENESS); // 3 ms
goto again;
}
2013-08-03 00:12:24 +04:00
g_errno = errno;
log("loop: select: %s.",strerror(g_errno));
return;
}
2014-09-03 19:38:43 +04:00
// if we wait for 10ms with nothing happening, fix cpu usage here too
// if ( n == 0 ) {
// Host *h = g_hostdb.m_myHost;
// h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
// }
2013-08-03 00:12:24 +04:00
// debug msg
if ( g_conf.m_logDebugLoop)
2014-11-11 01:45:11 +03:00
logf(LOG_DEBUG,"loop: Got %"INT32" fds waiting.",n);
2014-09-11 18:16:39 +04:00
if ( g_conf.m_logDebugLoop) {
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
2014-09-11 18:16:39 +04:00
// continue if not set for reading
if ( FD_ISSET ( i , &readfds ) )
2014-11-11 01:45:11 +03:00
log("loop: fd %"INT32" is on for read",i);
2014-09-11 18:16:39 +04:00
if ( FD_ISSET ( i , &writefds ) )
2014-11-11 01:45:11 +03:00
log("loop: fd %"INT32" is on for write",i);
2014-09-11 18:16:39 +04:00
if ( FD_ISSET ( i , &exceptfds ) )
2014-11-11 01:45:11 +03:00
log("loop: fd %"INT32" is on for except",i);
2014-09-11 18:16:39 +04:00
// debug
// if niceness is not -1, handle it below
}
}
2013-08-03 00:12:24 +04:00
// . reset the need to poll flag if everything is caught up now
// . let's take this out for now ... won't this leave some
// threads hanging, they do not always generate SIGIO's if
// the sigqueue is full!! LET'S SEE... mdw
//if ( n == 0 ) {
// // deal with any threads before returning
// g_threads.cleanUp ( NULL , 1000 /*max niceness*/ );
// g_threads.launchThreads();
// return;
//}
2014-09-03 19:38:43 +04:00
//processedOne = false;
2013-08-03 00:12:24 +04:00
// a Slot ptr
Slot *s;
g_now = gettimeofdayInMilliseconds();
/*
// call g_udpServer sig handlers for niceness -1 here
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
2013-08-03 00:12:24 +04:00
// continue if not set for reading
if ( ! FD_ISSET ( i , &readfds ) ) continue;
// if niceness is not -1, handle it below
s = m_readSlots [ i ];
if ( s && s->m_niceness != -1 ) continue;
callCallbacks_ass (true,i, g_now); // for reading = true
processedOne = true;
}
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
2013-08-03 00:12:24 +04:00
if ( ! FD_ISSET ( i , &writefds ) ) continue;
// if niceness is not -1, handle it below
s = m_writeSlots [ i ];
if ( s && s->m_niceness != -1 ) continue;
callCallbacks_ass (false,i, g_now); // for reading = false
processedOne = true;
}
*/
// handle returned threads for niceness -1
//g_threads.timedCleanUp(2/*ms*/);
bool calledOne = false;
// now keep this fast, too. just check fds we need to.
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
2014-09-03 19:38:43 +04:00
if ( n == 0 ) break;
int fd = s_readFds[i];
s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// must be set
if ( FD_ISSET ( fd , &readfds ) ) {
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log("loop: calling cback0 niceness=%"INT32" fd=%i"
, s->m_niceness , fd );
calledOne = true;
callCallbacks_ass (true/*forReading?*/,fd, g_now,0);
}
2014-09-11 16:26:14 +04:00
// fds are always ready for writing so take this out.
// our read callbacks always try to do a write as well.
if ( FD_ISSET ( fd , &writefds ) ) {
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log("loop: calling cback0 niceness=%"INT32" fd=%i"
, s->m_niceness , fd );
calledOne = true;
callCallbacks_ass (false/*forReading?*/,fd, g_now,0);
}
}
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
2014-09-03 19:38:43 +04:00
// if ( n == 0 ) break;
// int fd = s_writeFds[i];
// s = m_writeSlots [ fd ];
// // if niceness is not 0, handle it below
// if ( s && s->m_niceness > 0 ) continue;
// // must be set
// if ( FD_ISSET ( fd , &writefds ) )
// callCallbacks_ass (false/*forReading?*/,fd, g_now,1);
// }
2013-08-03 00:12:24 +04:00
// handle returned threads for niceness 0
g_threads.timedCleanUp(-3,0); // 3 ms
2013-08-03 00:12:24 +04:00
//
// the stuff below is not super urgent, do not do if in quickpoll
//
if ( m_inQuickPoll ) return;
2013-08-03 00:12:24 +04:00
// now for lower priority fds
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < s_numReadFds ; i++ ) {
2014-09-03 19:38:43 +04:00
if ( n == 0 ) break;
int fd = s_readFds[i];
s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
2013-08-03 00:12:24 +04:00
if ( s && s->m_niceness <= 0 ) continue;
// must be set
if ( FD_ISSET ( fd , &readfds ) ) {
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log("loop: calling cback1 niceness=%"INT32" fd=%i"
, s->m_niceness , fd );
calledOne = true;
callCallbacks_ass (true/*forReading?*/,fd, g_now,1);
}
2014-09-11 16:26:14 +04:00
// fds are always ready for writing so take this out.
// our read callbacks always try to do a write as well.
if ( FD_ISSET ( fd , &writefds ) ) {
if ( g_conf.m_logDebugLoop )
2014-11-11 01:45:11 +03:00
log("loop: calling cback1 niceness=%"INT32" fd=%i"
, s->m_niceness , fd );
calledOne = true;
callCallbacks_ass (false/*forReading?*/,fd, g_now,1);
}
}
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < s_numWriteFds ; i++ ) {
2014-09-03 19:38:43 +04:00
// if ( n == 0 ) break;
// int fd = s_writeFds[i];
// s = m_writeSlots [ fd ];
// // if niceness is not 0, handle it below
// if ( s && s->m_niceness <= 0 ) continue;
// // must be set
// if ( FD_ISSET ( fd , &writefds ) )
// callCallbacks_ass (false/*forReading?*/,fd, g_now,1);
// }
//if ( ! calledOne )
2014-11-11 01:45:11 +03:00
// log("loop: select returned n=%"INT32" but nothing called.",n);
// . MDW: replaced this with more efficient logic above
// . call the callback for each fd we got
// . only call callbacks for fds that have a nice of 0 here
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( ! FD_ISSET ( i , &readfds ) ) continue;
// // if niceness is not 0, handle it below
// s = m_readSlots [ i /*fd*/ ];
// if ( s && s->m_niceness != 0 ) continue;
// callCallbacks_ass (true/*forReading?*/,i, g_now);
// processedOne = true;
// }
// // only call callbacks for fds that have a nice of 0 here
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// if ( ! FD_ISSET ( i , &writefds ) ) continue;
// // if niceness is not 0, handle it below
// s = m_writeSlots [ i /*fd*/ ];
// if ( s && s->m_niceness != 0 ) continue;
// callCallbacks_ass (false/*forReading?*/,i, g_now);
// processedOne = true;
// }
// #if 0
// if(processedOne && repeats < QUERYPRIORITYWEIGHT) {
// //m_needToPoll = true;
// repeats++;
// goto skipLowerPriorities;
// }
// // log(LOG_WARN,
2014-11-11 01:45:11 +03:00
// // "Loop: repeated %"INT32" times before moving to lower priority threads",
// // repeats);
// #endif
// // handle low priority fds here
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( ! FD_ISSET ( i , &readfds ) ) continue;
// // if niceness is 0, we already handled it above
// s = m_readSlots [ i /*fd*/ ];
// if ( s && s->m_niceness <= 0 ) continue;
// callCallbacks_ass (true/*forReading?*/,i, g_now);
// }
// // only call callbacks for fds that have a nice of 0 here
2014-11-11 01:45:11 +03:00
// for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// if ( ! FD_ISSET ( i , &writefds ) ) continue;
// // if niceness is 0, we already handled it above
// s = m_writeSlots [ i /*fd*/ ];
// if ( s && s->m_niceness <= 0 ) continue;
// callCallbacks_ass (false/*forReading?*/,i, g_now);
// }
2013-08-03 00:12:24 +04:00
// handle returned threads for all other nicenesses
g_threads.timedCleanUp(-4,MAX_NICENESS); // 4 ms
2013-08-03 00:12:24 +04:00
// set time
g_now = gettimeofdayInMilliseconds();
// call sleepers if they need it
// call this every (about) 1 second
2014-11-11 01:45:11 +03:00
int32_t elapsed = g_now - s_lastTime;
2013-08-03 00:12:24 +04:00
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) elapsed = m_minTick;
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// note the last time we called them
s_lastTime = g_now;
// handle returned threads for all other nicenesses
g_threads.timedCleanUp(-4,MAX_NICENESS); // 4 ms
2013-08-03 00:12:24 +04:00
}
// debug msg
if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Exited doPoll.");
}
// for FileState class
#include "BigFile.h"
// call this when you don't want to be interrupted
void Loop::interruptsOff ( ) {
// . debug
// . until we have our own malloc, don't turn them on
if ( ! g_isHot ) return;
// bail in already in a sig handler
if ( g_inSigHandler ) return;
// if interrupts already off bail
if ( ! g_interruptsOn ) return;
// looks like sigprocmask is destructive on our sigset
sigset_t rtmin;
sigemptyset ( &rtmin );
// tmp debug hack, so we don't have real time signals now...
// #ifndef _VALGRIND_
// sigaddset ( &rtmin, GB_SIGRTMIN );
// #endif
2013-08-03 00:12:24 +04:00
// block it
if ( sigprocmask ( SIG_BLOCK , &rtmin, 0 ) < 0 ) {
log("loop: interruptsOff: sigprocmask: %s.", strerror(errno));
return;
}
g_interruptsOn = false;
// debug msg
//log("interruptsOff");
}
// and this to resume being interrupted
void Loop::interruptsOn ( ) {
// . debug
// . until we have our own malloc, don't turn them on
if ( ! g_isHot ) return;
// bail in already in a sig handler
if ( g_inSigHandler ) return;
// if interrupts already on bail
if ( g_interruptsOn ) return;
// looks like sigprocmask is destructive on our sigset
sigset_t rtmin;
sigemptyset ( &rtmin );
// uncomment this next line to easily disable real time interrupts
// #ifndef _VALGRIND_
// sigaddset ( &rtmin, GB_SIGRTMIN );
// #endif
2013-08-03 00:12:24 +04:00
// debug msg
//log("interruptsOn");
// let everyone know before we are vulnerable to an interrupt
g_interruptsOn = true;
// unblock it so interrupts flow
if ( sigprocmask ( SIG_UNBLOCK, &rtmin, 0 ) < 0 ) {
log("loop: interruptsOn: sigprocmask: %s.", strerror(errno));
return;
}
}
/*
2013-08-03 00:12:24 +04:00
// handle hot real time signals here
void sigHandlerRT ( int x , siginfo_t *info , void *v ) {
// if we're not hot, what are we doing here?
if ( ! g_isHot ) {
fprintf(stderr,"SHIT\n");
exit(-1);
}
// bitch if we shouldn't have gotten this
// fprintf(stderr,"Hey! why are we here?\n");
if ( ! g_interruptsOn ) {
log(LOG_LOGIC,"loop: Interrupts not on. Bad engineer.");
return;
}
//fprintf (stderr,"in rt handler\n");
// let everyone know it
// MDW: turn this off for now, how is it getting set? we dont use
// real time signals any more. maybe a pthread is getting such
// a signal?
//g_inSigHandler = true;
2013-08-03 00:12:24 +04:00
// debug msg
//if ( g_conf.m_timingDebugEnabled )
// log("sigHandlerRT entered");
// save errno
2014-11-11 01:45:11 +03:00
int32_t old_gerrno = g_errno;
2013-08-03 00:12:24 +04:00
int old_errno = errno;
// call normal handler
sigHandler_r ( x , info , v );
// restore
errno = old_errno;
g_errno = old_gerrno;
// debug msg
//if ( g_conf.m_timingDebugEnabled )
// log("sigHandlerRT exited");
// revert
g_inSigHandler = false;
// debug msg
//fprintf (stderr,"out of rt handler\n");
}
*/
2013-08-03 00:12:24 +04:00
/*
2013-08-03 00:12:24 +04:00
// come here when we get a GB_SIGRTMIN+X signal
void sigHandler_r ( int x , siginfo_t *info , void *v ) {
2014-06-08 01:37:21 +04:00
// cygwin lacks the si_fd and si_band members
#ifdef CYGWIN
g_loop.doPoll();
#else
2013-08-03 00:12:24 +04:00
// extract the file descriptor that needs attention
int fd = info->si_fd;
// debug note
//if ( fd == g_httpServer.m_tcp.m_sock )
// logf(LOG_DEBUG,"loop: got http server activity");
// print signal number for debugging purposes (should be SIGPOLL/IO)
// debug msg
//fprintf(stderr,"got sigcode=%i\n",info->si_code );
//fprintf(stderr,"got signo =%i\n",info->si_signo);
//if ( info->si_signo == SIGCHLD )
// fprintf(stderr,"got SIGCHLD\n");
//if ( info->si_code == SIGCHLD )
// fprintf(stderr,"got SIGCHLD2\n");
//fprintf(stderr,"got sigval =%i\n",(int)(info->si_value.sival_int) );
// clear g_errno before callling handlers
g_errno = 0;
// info->si_band values:
//#define POLLIN 0x0001 // There is data to read
//#define POLLPRI 0x0002 // There is urgent data to read
//#define POLLOUT 0x0004 // Writing now will not block
//#define POLLERR 0x0008 // Error condition
//#define POLLHUP 0x0010 // Hung up
//#define POLLNVAL 0x0020 // Invalid request: fd not open
2013-08-03 00:12:24 +04:00
int band = info->si_band;
// fprintf(stderr,"got fd = %i\n", fd );
// fprintf(stderr,"got band = %i\n", band );
// fprintf(stderr,"band & POLLIN = %i\n", band & POLLIN );
// fprintf(stderr,"band & POLLPRI = %i\n", band & POLLPRI );
// fprintf(stderr,"band & POLLOUT = %i\n", band & POLLOUT );
// fprintf(stderr,"band & POLLERR = %i\n", band & POLLERR );
// fprintf(stderr,"band & POLLHUP = %i\n", band & POLLHUP );
2013-08-03 00:12:24 +04:00
// translate SIGPIPE's to band of POLLHUP
if ( info->si_signo == SIGPIPE ) {
band = POLLHUP;
log("loop: Received SIGPIPE signal. Broken pipe.");
}
// . SI_QUEUE signals used to just be from BigFile
// . but now they're used by threads so we can call a callback
// when the thread is completed
if ( g_someAreQueued && ! g_inSigHandler ) {
g_someAreQueued = false;
//g_udpServer2.makeCallbacks_ass ( 0 );
//g_udpServer2.makeCallbacks_ass ( 1 );
}
if ( g_threads.m_needsCleanup && ! g_inSigHandler ) {
// assume not any more
//g_threads.m_needsCleanup = false;
// get the value
//int val = (int)(info->si_value.sival_int);
// debug msg
//if ( g_conf.m_logDebugThreadEnabled )
// log("Loop: got thread done signal");
// check thread queue for any threads that completed
// so we can call their callbacks and remove them
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
// // g_threads.cleanUp ( (ThreadEntry *)val , x);// max niceness
// g_threads.cleanUp ( (ThreadEntry *)val , 1000);//max niceness
2013-08-03 00:12:24 +04:00
// // launch any threads in waiting since this sig was
// // from a terminating one
// g_threads.launchThreads();
}
// if we just needed to cleanup a thread
2013-08-03 00:12:24 +04:00
if ( info->si_signo == SIGCHLD ) return;
// if we don't got a signal for an fd, just a sigqueue() call, bail now
if ( info->si_code == SI_QUEUE ) return;
// . call the appropriate handler(s)
// . TODO: bitch if no callback to handle the read!!!!!!!
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
// . NOTE: or when a socket is trying to connect to it if it's listener
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
// g_loop.callCallbacks_ass ( true , fd ); // for reading
2013-08-03 00:12:24 +04:00
if ( band & POLLIN ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
2014-11-11 01:45:11 +03:00
//log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd);
2013-08-03 00:12:24 +04:00
g_loop.callCallbacks_ass ( true , fd );
}
else if ( band & POLLPRI ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
2014-11-11 01:45:11 +03:00
//log("Loop: read %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd);
2013-08-03 00:12:24 +04:00
g_loop.callCallbacks_ass ( true , fd ) ;
}
else if ( band & POLLOUT ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_writeSignals++;
2014-11-11 01:45:11 +03:00
//log("Loop: write %"INT64" fd=%i",gettimeofdayInMilliseconds(),fd)
2013-08-03 00:12:24 +04:00
g_loop.callCallbacks_ass ( false , fd );
}
2014-07-29 23:21:22 +04:00
// fix qainject1() test with this
else if ( band & POLLERR ) {
log(LOG_INFO,"loop: got POLLERR on fd=%i.",fd);
}
2013-08-03 00:12:24 +04:00
//g_loop.callCallbacks_ass ( false , fd );
// this happens if the socket closes abruptly
// or out of band data, etc... see "man 2 poll" for more info
2014-07-29 23:21:22 +04:00
else if ( band & POLLHUP ) {
2013-08-03 00:12:24 +04:00
// i see these all the time for fd == 0, so don't print it
if ( fd != 0 )
log(LOG_INFO,"loop: Received hangup on fd=%i.",fd);
g_errno = ESOCKETCLOSED;
g_loop.callCallbacks_ass ( false , fd );
}
2014-06-08 01:37:21 +04:00
// end ifdef CYGWIN
#endif
2013-08-03 00:12:24 +04:00
}
*/
2013-08-03 00:12:24 +04:00
/*
#if 1 || (LINUX_VERSION_CODE < KERNEL_VERSION(2,3,31))
struct pollfd pfd;
printf ("Trying fallback poll()\n");
pfd.fd = info.si_fd;
pfd.events = POLLIN|POLLOUT|POLLHUP;
if (poll (&pfd, 1, 0) < 0) {
p_error ("poll(): %s", strerror (g_errno));
exit (1);
}
info.si_band = pfd.revents;
#endif
*/
void Loop::startBlockedCpuTimer() {
if(m_inQuickPoll) return;
m_lastPollTime = gettimeofdayInMilliseconds();
g_profiler.resetLastQpoll();
}
2014-11-11 01:45:11 +03:00
void Loop::quickPoll(int32_t niceness, const char* caller, int32_t lineno) {
2013-08-03 00:12:24 +04:00
if ( ! g_conf.m_useQuickpoll ) return;
// convert
//if ( niceness > 1 ) niceness = 1;
// sometimes we init HashTableX's with a niceness of 0 even though
// g_niceness is 1. so prevent a core below.
if ( niceness == 0 ) return;
// sanity check
if ( g_niceness > niceness ) {
log(LOG_WARN,"loop: niceness mismatch!");
//char *xx=NULL;*xx=0; }
}
// sanity -- temporary -- no quickpoll in a thread!!!
//if ( g_threads.amThread() ) { char *xx=NULL;*xx=0; }
// if we are niceness 1 and not in a handler, make it niceness 2
// so the handlers can be answered and we don't slow other
// spiders down and we don't slow turks' injections down as much
if ( ! g_inHandler && niceness == 1 ) niceness = 2;
// reset this
g_missedQuickPolls = 0;
if(m_inQuickPoll) {
log(LOG_WARN,
"admin: tried to quickpoll from inside quickpoll");
// this happens when handleRequest3f is called from
// a quickpoll and it deletes a collection and BigFile::close
// calls ThreadQueue::removeThreads and Msg3::doneScanning()
// has niceness 2 and calls quickpoll again!
return;
2013-08-03 00:12:24 +04:00
//if(g_conf.m_quickpollCoreOnError) {
char*xx=NULL;*xx=0;
// }
// else return;
}
2014-10-30 22:36:39 +03:00
int64_t now = g_now;
//int64_t took = now - m_lastPollTime;
int64_t now2 = g_now;
2014-11-11 01:45:11 +03:00
int32_t gerrno = g_errno;
2013-08-03 00:12:24 +04:00
/*
if(g_conf.m_profilingEnabled){
now = gettimeofdayInMilliseconds();
took = now - m_lastPollTime;
g_profiler.pause(caller, lineno, took);
if(took > g_conf.m_minProfThreshold) {
if(g_conf.m_dynamicPerfGraph) {
g_stats.addStat_r ( 0 ,
m_lastPollTime,
now,
0 ,
STAT_GENERIC,
caller);
}
if(g_conf.m_sequentialProfiling) {
log(LOG_TIMING, "admin: quickpolling from %s "
2014-11-11 01:45:11 +03:00
"after %"INT64" ms", caller, took);
2013-08-03 00:12:24 +04:00
}
}
}
*/
g_numQuickPolls++;
m_inQuickPoll = true;
// doPoll() will since we are in quickpoll and only call niceness 0
// callbacks for all the fds. and it will set the timer to 0.
doPoll ();
/*
2013-08-03 00:12:24 +04:00
//g_udpServer2.process_ass ( g_now , 0 );
g_udpServer.process_ass ( g_now , 0 );
g_threads.timedCleanUp( 100 , 0 ); // ms ms, niceness 0
2014-11-11 01:45:11 +03:00
int32_t n;
2013-08-03 00:12:24 +04:00
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
// clear fds for select()
FD_ZERO ( &readfds );
FD_ZERO ( &writefds );
FD_ZERO ( &exceptfds );
timeval v;
v.tv_sec = 0;
v.tv_usec = 0;
// set descriptors we should watch
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
2013-08-03 00:12:24 +04:00
if ( m_readSlots [i] && m_readSlots[i]->m_niceness == 0 ) {
FD_SET ( i , &readfds );
FD_SET ( i , &exceptfds );
}
if ( m_writeSlots[i] && m_writeSlots[i]->m_niceness == 0 ) {
FD_SET ( i , &writefds );
FD_SET ( i , &exceptfds );
}
}
// poll the fd's searching for socket closes
// this is for httpServer, since we handled
// udpserver and diskthreads above
n = select (MAX_NUM_FDS, &readfds, &writefds, &exceptfds, &v);
// a Slot ptr
Slot *s;
if ( n <= 0 ) goto theend;
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
2013-08-03 00:12:24 +04:00
// continue if not set for reading
if ( ! FD_ISSET ( i , &readfds ) ) continue;
// if niceness is not -1, handle it below
s = m_readSlots [ i ]; // i = fd
2013-08-03 00:12:24 +04:00
// now we have niceness 2 if Sections.cpp
if ( s && s->m_niceness >= niceness ) continue;
callCallbacks_ass (true,i, now); // reading = true
2013-08-03 00:12:24 +04:00
// sanity check
if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; }
}
2014-11-11 01:45:11 +03:00
for ( int32_t i = 0 ; i < MAX_NUM_FDS ; i++ ) {
2013-08-03 00:12:24 +04:00
if ( ! FD_ISSET ( i , &writefds ) ) continue;
// if niceness is not -1, handle it below
s = m_writeSlots [ i ]; // i = fd
2013-08-03 00:12:24 +04:00
// now we have niceness 2 if Sections.cpp
if ( s && s->m_niceness >= niceness ) continue;
callCallbacks_ass (false,i, now); // forReading = false
2013-08-03 00:12:24 +04:00
// sanity check
if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; }
}
// now we can have niceness 0 dns slots because of the niceness
// conversion algorithm...
g_dns.m_udpServer.sendPoll_ass(true,g_now);
g_dns.m_udpServer.process_ass ( g_now );
g_dns.m_udpServer.makeCallbacks_ass(0);
theend:
*/
2013-08-03 00:12:24 +04:00
// reset this again
g_missedQuickPolls = 0;
// . avoid quickpolls within a quickpoll
// . was causing seg fault in diskHeartbeatWrapper()
// which call Threads::bailOnReads()
m_canQuickPoll = false;
// . call sleepcallbacks, like the heartbeat in Process.cpp
// . MAX_NUM_FDS is the fd for sleep callbacks
// . specify a niceness of 0 so only niceness 0 sleep callbacks
// will be called
callCallbacks_ass ( true , MAX_NUM_FDS , now , 0 );
// sanity check
if ( g_niceness > niceness ) {
log("loop: niceness mismatch");
//char*xx=NULL;*xx=0; }
}
/*
now2 = g_now;
if(g_conf.m_profilingEnabled) {
took = now2 - now;
g_profiler.unpause();
}
*/
2014-11-11 01:45:11 +03:00
//log(LOG_WARN, "xx quickpolled took %"INT64", waited %"INT64" from %s",
2013-08-03 00:12:24 +04:00
// now2 - now, now - m_lastPollTime, caller);
m_lastPollTime = now2;
m_inQuickPoll = false;
m_needsToQuickPoll = false;
m_canQuickPoll = true;
g_errno = gerrno;
// reset this again
g_missedQuickPolls = 0;
}
2014-11-11 01:45:11 +03:00
void Loop::canQuickPoll(int32_t niceness) {
2013-08-03 00:12:24 +04:00
if(niceness && !m_shutdown) m_canQuickPoll = true;
else m_canQuickPoll = false;
}
void Loop::disableTimer() {
//logf(LOG_WARN,"xxx disabling");
m_canQuickPoll = false;
setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL);
2014-09-12 13:42:00 +04:00
setitimer(ITIMER_REAL, &m_noInterrupt, NULL);
2013-08-03 00:12:24 +04:00
}
int gbsystem(char *cmd ) {
// if ( ! g_conf.m_runAsDaemon )
// setitimer(ITIMER_REAL, &g_loop.m_noInterrupt, NULL);
g_loop.disableTimer();
log("gb: running system(\"%s\")",cmd);
int ret = system(cmd);
g_loop.enableTimer();
// if ( ! g_conf.m_runAsDaemon )
// setitimer(ITIMER_REAL, &g_loop.m_realInterrupt, NULL);
return ret;
}
2013-08-03 00:12:24 +04:00
void Loop::enableTimer() {
m_canQuickPoll = true;
// logf(LOG_WARN, "xxx enabling");
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
setitimer(ITIMER_REAL, &m_realInterrupt, NULL);
}
2013-08-03 00:12:24 +04:00
//calling with a 0 niceness will turn off the timer interrupt
2014-11-11 01:45:11 +03:00
// void Loop::setitimerInterval(int32_t niceness) {
2014-09-03 19:38:43 +04:00
// if(niceness) {
// setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
// m_canQuickPoll = true;
// }
// else {
// setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL);
// m_canQuickPoll = false;
// }
// }