try to fix request rate limiting jam up in udpserver.cpp.

pretty up Loop.cpp some more which now uses select().
This commit is contained in:
mwells 2014-09-04 10:28:40 -07:00
parent 5931fb5ff1
commit ea30e8d9b2
2 changed files with 250 additions and 148 deletions

348
Loop.cpp
View File

@ -16,9 +16,9 @@
#define MAX_SLOTS 10000
// apple mac os x does not have real-time signal support
#ifdef __APPLE__
#define _POLLONLY_
#endif
// #ifdef __APPLE__
// #define _POLLONLY_
// #endif
// TODO: . if signal queue overflows another signal is sent
// . capture that signal and use poll or something???
@ -48,8 +48,12 @@ bool g_interruptsOn = false;
bool g_someAreQueued = false;
long g_numAlarms = 0;
long g_numVTAlarms = 0;
long g_numQuickPolls = 0;
long g_missedQuickPolls = 0;
long g_numSigChlds = 0;
long g_numSigQueues = 0;
long g_numSigOthers = 0;
// since we can't call gettimeofday() while in a sig handler, we use this
// and update it periodically to keep it somewhat accurate
@ -117,13 +121,14 @@ void Loop::reset() {
*/
}
static void sigHandler_r ( int x , siginfo_t *info , void *v ) ;
static void sigHandlerRT ( int x , siginfo_t *info , void *v ) ;
//static void sigHandler_r ( int x , siginfo_t *info , void *v ) ;
//static void sigHandlerRT ( int x , siginfo_t *info , void *v ) ;
static void sigbadHandler ( int x , siginfo_t *info , void *y ) ;
static void sigpwrHandler ( int x , siginfo_t *info , void *y ) ;
static void sighupHandler ( int x , siginfo_t *info , void *y ) ;
static void sigioHandler ( int x , siginfo_t *info , void *y ) ;
//static void sigioHandler ( int x , siginfo_t *info , void *y ) ;
static void sigalrmHandler( int x , siginfo_t *info , void *y ) ;
static void sigvtalrmHandler( int x , siginfo_t *info , void *y ) ;
long g_fdWriteBits[MAX_NUM_FDS/32];
long g_fdReadBits [MAX_NUM_FDS/32];
@ -380,6 +385,12 @@ bool Loop::setNonBlocking ( int fd , long niceness ) {
g_errno = errno;
return log("loop: fcntl(NONBLOCK): %s.",strerror(errno));
}
// we use select()/poll now so skip stuff below
return true;
/*
retry8:
// tell kernel to send the signal to us when fd is ready for read/write
if ( fcntl (fd, F_SETOWN , getpid() ) < 0 ) {
@ -403,13 +414,14 @@ bool Loop::setNonBlocking ( int fd , long niceness ) {
// . tell kernel to send this signal when fd is ready for read/write
// . reserve GB_SIGRTMIN for unmaskable interrupts (niceness = -1)
// as used by high priority udp server, g_udpServer2
if ( fcntl (fd, F_SETSIG , GB_SIGRTMIN/*32?*/ + 1 + niceness ) < 0 ) {
if ( fcntl (fd, F_SETSIG , GB_SIGRTMIN + 1 + niceness ) < 0 ) {
// valgrind
if ( errno == EINTR ) goto retry6;
g_errno = errno;
return log("loop: fcntl(F_SETSIG): %s.",strerror(errno));
}
return true;
*/
}
// . if "forReading" is true call callbacks registered for reading on "fd"
@ -610,6 +622,7 @@ void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
// if we just needed to cleanup a thread
if ( info->si_signo == SIGCHLD ) {
g_numSigChlds++;
// this has no fd really, Threads.cpp just sends it when
// the thread is done
g_threads.m_needsCleanup = true;
@ -617,12 +630,21 @@ void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
}
if ( info->si_code == SI_QUEUE ) {
g_numSigQueues++;
//log("admin: got sigqueue");
// the thread is done
g_threads.m_needsCleanup = true;
return;
}
// wtf is this?
g_numSigOthers++;
// the stuff below should no longer be used since we
// do not use F_SETSIG now
return;
/*
// extract the file descriptor that needs attention
int fd = info->si_fd;
@ -634,12 +656,12 @@ void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
// set the right callback
// info->si_band values:
//#define POLLIN 0x0001 /* There is data to read */
//#define POLLPRI 0x0002 /* There is urgent data to read */
//#define POLLOUT 0x0004 /* Writing now will not block */
//#define POLLERR 0x0008 /* Error condition */
//#define POLLHUP 0x0010 /* Hung up */
//#define POLLNVAL 0x0020 /* Invalid request: fd not open */
//#define POLLIN 0x0001 // There is data to read
//#define POLLPRI 0x0002 // There is urgent data to read
//#define POLLOUT 0x0004 // Writing now will not block
//#define POLLERR 0x0008 // Error condition
//#define POLLHUP 0x0010 // Hung up
//#define POLLNVAL 0x0020 // Invalid request: fd not open
int band = info->si_band;
// translate SIGPIPE's to band of POLLHUP
if ( info->si_signo == SIGPIPE ) {
@ -652,7 +674,7 @@ void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
// . NOTE: or when a socket is trying to connect to it if it's listener
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
// g_loop.callCallbacks_ass ( true/*forReading?*/ , fd );
// g_loop.callCallbacks_ass ( true , fd ); // for reading
if ( band & POLLIN ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
@ -688,6 +710,7 @@ void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
// it is ready for writing i guess
g_fdWriteBits[fd/32] = 1<<(fd%32);
}
*/
}
@ -743,12 +766,12 @@ bool Loop::init ( ) {
sigset_t sigs;
sigemptyset ( &sigs );
sigaddset ( &sigs , SIGPIPE ); //if we write to a close socket
#ifndef _VALGRIND_
sigaddset ( &sigs , GB_SIGRTMIN );
#endif
sigaddset ( &sigs , GB_SIGRTMIN + 1 );
sigaddset ( &sigs , GB_SIGRTMIN + 2 );
sigaddset ( &sigs , GB_SIGRTMIN + 3 );
// #ifndef _VALGRIND_
// sigaddset ( &sigs , GB_SIGRTMIN );
// #endif
// sigaddset ( &sigs , GB_SIGRTMIN + 1 );
// sigaddset ( &sigs , GB_SIGRTMIN + 2 );
// sigaddset ( &sigs , GB_SIGRTMIN + 3 );
sigaddset ( &sigs , SIGCHLD );
#ifdef PTHREADS
@ -790,10 +813,10 @@ bool Loop::init ( ) {
sa2.sa_sigaction = sigHandlerQueue_r;
g_errno = 0;
if ( sigaction ( SIGPIPE, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN , &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN + 1, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN + 2, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( GB_SIGRTMIN + 3, &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN , &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN + 1, &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN + 2, &sa2, 0 ) < 0 ) g_errno = errno;
// if ( sigaction ( GB_SIGRTMIN + 3, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGCHLD, &sa2, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGIO, &sa2, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction(): %s.", mstrerror(g_errno) );
@ -804,6 +827,8 @@ bool Loop::init ( ) {
//sigemptyset ( &m_sigrtmin );
// tmp debug hack, so we don't have real time signals now...
//sigaddset ( &m_sigrtmin, GB_SIGRTMIN );
/*
// now set up a signal handler to handle just/only SIGIO
struct sigaction sa;
// . sa_mask is the set of signals that should be blocked when
@ -826,10 +851,10 @@ bool Loop::init ( ) {
// clear g_errno
g_errno = 0;
// now when we got an unblocked GB_SIGRTMIN signal go here right away
#ifndef _VALGRIND_
if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno)log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
#endif
// #ifndef _VALGRIND_
// if ( sigaction ( GB_SIGRTMIN, &sa, 0 ) < 0 ) g_errno = errno;
// if ( g_errno)log("loop: sigaction GB_SIGRTMIN: %s.", mstrerror(errno));
// #endif
// set it this way for SIGIO's
sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT;
@ -852,7 +877,15 @@ bool Loop::init ( ) {
if ( sigaction ( SIGIO, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGIO: %s.", mstrerror(errno));
#endif
*/
struct sigaction sa;
// . sa_mask is the set of signals that should be blocked when
// we're handling the signal, make this empty
// . GB_SIGRTMIN signals will be automatically blocked while we're
// handling a SIGIO signal, so don't worry about that
sigemptyset (&sa.sa_mask);
sa.sa_flags = SA_SIGINFO ; // | SA_ONESHOT;
// handle HUP signals gracefully by saving and shutting down
sa.sa_sigaction = sighupHandler;
@ -881,28 +914,33 @@ bool Loop::init ( ) {
//now set up our alarm for quickpoll
m_quickInterrupt.it_value.tv_sec = 0;
m_quickInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000;
m_quickInterrupt.it_interval.tv_sec = 0;
m_quickInterrupt.it_interval.tv_usec = QUICKPOLL_INTERVAL * 1000;
m_realInterrupt.it_interval.tv_sec = 0;
m_realInterrupt.it_interval.tv_usec = 7 * 1000;
m_noInterrupt.it_value.tv_sec = 0;
m_noInterrupt.it_value.tv_usec = 0;
m_noInterrupt.it_interval.tv_sec = 0;
m_noInterrupt.it_interval.tv_usec = 0;
//m_realInterrupt.it_value.tv_sec = 0;
//m_realInterrupt.it_value.tv_usec = QUICKPOLL_INTERVAL * 1000;
// set the interrupts to off for now
disableTimer();
//mdw:disableTimer();
//setitimer(ITIMER_REAL, &m_quickInterrupt, NULL);
//setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
// make this 7ms i guess
setitimer(ITIMER_REAL, &m_realInterrupt, NULL);
// this is 10ms
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
sa.sa_sigaction = sigalrmHandler;
// it's gotta be real time, not virtual cpu time now
//if ( sigaction ( SIGALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( sigaction ( SIGVTALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGBUS: %s.", mstrerror(errno));
if ( sigaction ( SIGALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) return log("loop: sigaction: %s.", mstrerror(errno));
// block sigvtalarm
sa.sa_sigaction = sigvtalrmHandler;
if ( sigaction ( SIGVTALRM, &sa, 0 ) < 0 ) g_errno = errno;
if ( g_errno ) log("loop: sigaction SIGVTALRM: %s.", mstrerror(errno));
// success
return true;
}
@ -951,8 +989,10 @@ void sigbadHandler ( int x , siginfo_t *info , void *y ) {
g_process.shutdown ( true );
}
void sigvtalrmHandler ( int x , siginfo_t *info , void *y ) {
void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
// stats
g_numVTAlarms++;
// see if a niceness 0 algo is hogging the cpu
if ( g_callSlot && g_niceness == 0 ) {
@ -990,8 +1030,6 @@ void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
}
g_nowApprox += QUICKPOLL_INTERVAL; // 10 ms
// stats
g_numAlarms++;
// sanity check
if ( g_loop.m_inQuickPoll &&
@ -1040,20 +1078,6 @@ void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
//char *xx=NULL;*xx=0;
}
// . see where we are in the code
// . for computing cpu usage
// . if idling we will be in sigtimedwait() at the lowest level
Host *h = g_hostdb.m_myHost;
// . i guess this means we were doing something... (otherwise idle)
// . this is KINDA like a 100 point sample, but it has crazy decay
// logic built into it
if (h) {
if ( ! g_inWaitState )
h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 100;
else
h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
}
// if it has been a while since heartbeat (> 10000ms) dump core so
// we can see where the process was... that is a missed quick poll?
if ( g_process.m_lastHeartbeatApprox == 0 ) return;
@ -1067,6 +1091,24 @@ void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
}
//logf(LOG_DEBUG, "xxx now: %lli! approx: %lli", g_now, g_nowApprox);
}
void sigalrmHandler ( int x , siginfo_t *info , void *y ) {
// stats
g_numAlarms++;
// . see where we are in the code
// . for computing cpu usage
// . if idling we will be in sigtimedwait() at the lowest level
Host *h = g_hostdb.m_myHost;
// . i guess this means we were doing something... (otherwise idle)
// . this is KINDA like a 100 point sample, but it has crazy decay
// logic built into it
if ( ! g_inWaitState )
h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 100;
else
h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
}
static sigset_t s_rtmin;
@ -1077,10 +1119,10 @@ void maskSignals() {
if ( ! s_init ) {
s_init = true;
sigemptyset ( &s_rtmin );
sigaddset ( &s_rtmin, GB_SIGRTMIN );
sigaddset ( &s_rtmin, GB_SIGRTMIN + 1 );
sigaddset ( &s_rtmin, GB_SIGRTMIN + 2 );
sigaddset ( &s_rtmin, GB_SIGRTMIN + 3 );
// sigaddset ( &s_rtmin, GB_SIGRTMIN );
// sigaddset ( &s_rtmin, GB_SIGRTMIN + 1 );
// sigaddset ( &s_rtmin, GB_SIGRTMIN + 2 );
// sigaddset ( &s_rtmin, GB_SIGRTMIN + 3 );
sigaddset ( &s_rtmin, SIGCHLD );
sigaddset ( &s_rtmin, SIGIO );
sigaddset ( &s_rtmin, SIGPIPE );
@ -1125,12 +1167,12 @@ bool Loop::runLoop ( ) {
// . set sigs on which sigtimedwait() listens for
// . add this signal to our set of signals to watch (currently NONE)
sigaddset ( &sigs0, SIGPIPE );
#ifndef _VALGRIND_
sigaddset ( &sigs0, GB_SIGRTMIN );
#endif
sigaddset ( &sigs0, GB_SIGRTMIN + 1 );
sigaddset ( &sigs0, GB_SIGRTMIN + 2 );
sigaddset ( &sigs0, GB_SIGRTMIN + 3 );
// #ifndef _VALGRIND_
// sigaddset ( &sigs0, GB_SIGRTMIN );
// #endif
// sigaddset ( &sigs0, GB_SIGRTMIN + 1 );
// sigaddset ( &sigs0, GB_SIGRTMIN + 2 );
// sigaddset ( &sigs0, GB_SIGRTMIN + 3 );
sigaddset ( &sigs0, SIGCHLD );
//sigaddset ( &sigs0, SIGVTALRM );
// . TODO: do we need to mask SIGIO too? (sig queue overflow?)
@ -1157,7 +1199,7 @@ bool Loop::runLoop ( ) {
// . makes g_udpServer2 quite jumpy
g_loop.interruptsOn();
enableTimer();
//mdw:enableTimer();
// . now loop forever waiting for signals
// . but every second check for timer-based events
@ -1679,11 +1721,11 @@ bool Loop::runLoop ( ) {
// . the kernel sends a SIGIO signal when the sig queue overflows
// . we resort to polling the fd's when that happens
void sigioHandler ( int x , siginfo_t *info , void *y ) {
// set the m_needToPoll flag
g_loop.m_needToPoll = true;
return;
}
// void sigioHandler ( int x , siginfo_t *info , void *y ) {
// // set the m_needToPoll flag
// g_loop.m_needToPoll = true;
// return;
// }
//--- TODO: flush the signal queue after polling until done
//--- are we getting stale signals resolved by flush so we get
@ -1719,7 +1761,7 @@ void Loop::doPoll ( ) {
//if(g_udpServer2.needBottom()) g_udpServer2.makeCallbacks_ass ( 1 );
bool processedOne;
//bool processedOne;
long n;
// long repeats = 0;
// skipLowerPriorities:
@ -1734,7 +1776,11 @@ void Loop::doPoll ( ) {
timeval v;
v.tv_sec = 0;
if ( m_inQuickPoll ) v.tv_usec = 0;
else v.tv_usec = 10 * 1000; // 10ms for sleepcallbacks
// 10ms for sleepcallbacks so they can be called...
// and we need this to be the same as sigalrmhandler() since we
// keep track of cpu usage here too, since sigalrmhandler is "VT"
// based it only goes off when that much "cpu time" has elapsed.
else v.tv_usec = QUICKPOLL_INTERVAL * 1000;
// set descriptors we should watch
// MDW: no longer necessary since we have s_selectMaskRead, etc.
@ -1750,10 +1796,6 @@ void Loop::doPoll ( ) {
// }
again:
// used to measure cpu usage. sigalarm needs to know if we are
// sitting idle in select() or are actively doing something w/ the cpu
g_inWaitState = true;
// gotta copy to our own since bits get cleared by select() function
fd_set readfds;
fd_set writefds;
@ -1766,6 +1808,13 @@ void Loop::doPoll ( ) {
FD_ZERO ( &writefds );
FD_ZERO ( &exceptfds );
if ( g_conf.m_logDebugLoop )
log("loop: in select");
// used to measure cpu usage. sigalarm needs to know if we are
// sitting idle in select() or are actively doing something w/ the cpu
g_inWaitState = true;
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( FD_ISSET ( i , &s_selectMaskRead ) ||
@ -1776,7 +1825,9 @@ void Loop::doPoll ( ) {
// // if niceness is not -1, handle it below
// }
// poll the fd's searching for socket closes
// . poll the fd's searching for socket closes
// . the sigalrms and sigvtalrms and SIGCHLDs knock us out of this
// select() with n < 0 and errno equal to EINTR
n = select (MAX_NUM_FDS,
&readfds,
&writefds,
@ -1785,14 +1836,36 @@ void Loop::doPoll ( ) {
g_inWaitState = false;
if ( g_conf.m_logDebugLoop )
log("loop: out select n=%li errno=%li errnomsg=%s",
(long)n,(long)errno,mstrerror(errno));
if ( n < 0 ) {
// valgrind
if ( errno == EINTR ) goto again;
if ( errno == EINTR ) {
// got it. if we get a sig alarm or vt alarm or
// SIGCHLD (from Threads.cpp) we end up here.
//log("loop: got errno=%li",(long)errno);
// if shutting own was it a sigterm ?
if ( m_shutdown ) goto again;
// handle returned threads for niceness 0
g_threads.timedCleanUp(-3,0); // 3 ms
if ( m_inQuickPoll ) goto again;
// high niceness threads
g_threads.timedCleanUp(-4,MAX_NICENESS); // 3 ms
goto again;
}
g_errno = errno;
log("loop: select: %s.",strerror(g_errno));
return;
}
// if we wait for 10ms with nothing happening, fix cpu usage here too
// if ( n == 0 ) {
// Host *h = g_hostdb.m_myHost;
// h->m_cpuUsage = .99 * h->m_cpuUsage + .01 * 000;
// }
// debug msg
if ( g_conf.m_logDebugLoop)
logf(LOG_DEBUG,"loop: Got %li fds waiting.",n);
@ -1817,7 +1890,7 @@ void Loop::doPoll ( ) {
// g_threads.launchThreads();
// return;
//}
processedOne = false;
//processedOne = false;
// a Slot ptr
Slot *s;
@ -1845,17 +1918,26 @@ void Loop::doPoll ( ) {
// handle returned threads for niceness -1
//g_threads.timedCleanUp(2/*ms*/);
bool calledOne = false;
// now keep this fast, too. just check fds we need to.
for ( long i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_readFds[i];
s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// must be set
if ( FD_ISSET ( fd , &readfds ) )
if ( FD_ISSET ( fd , &readfds ) ) {
if ( g_conf.m_logDebugLoop )
log("loop: calling cback0 niceness=%li fd=%i"
, s->m_niceness , fd );
calledOne = true;
callCallbacks_ass (true/*forReading?*/,fd, g_now,0);
}
}
// for ( long i = 0 ; i < s_numWriteFds ; i++ ) {
// if ( n == 0 ) break;
// int fd = s_writeFds[i];
// s = m_writeSlots [ fd ];
// // if niceness is not 0, handle it below
@ -1867,7 +1949,7 @@ void Loop::doPoll ( ) {
// handle returned threads for niceness 0
g_threads.timedCleanUp(3,0); // 3 ms
g_threads.timedCleanUp(-3,0); // 3 ms
//
// the stuff below is not super urgent, do not do if in quickpoll
@ -1876,15 +1958,22 @@ void Loop::doPoll ( ) {
// now for lower priority fds
for ( long i = 0 ; i < s_numReadFds ; i++ ) {
if ( n == 0 ) break;
int fd = s_readFds[i];
s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness <= 0 ) continue;
// must be set
if ( FD_ISSET ( fd , &readfds ) )
if ( FD_ISSET ( fd , &readfds ) ) {
if ( g_conf.m_logDebugLoop )
log("loop: calling cback1 niceness=%li fd=%i"
, s->m_niceness , fd );
calledOne = true;
callCallbacks_ass (true/*forReading?*/,fd, g_now,1);
}
}
// for ( long i = 0 ; i < s_numWriteFds ; i++ ) {
// if ( n == 0 ) break;
// int fd = s_writeFds[i];
// s = m_writeSlots [ fd ];
// // if niceness is not 0, handle it below
@ -1894,7 +1983,9 @@ void Loop::doPoll ( ) {
// callCallbacks_ass (false/*forReading?*/,fd, g_now,1);
// }
//if ( ! calledOne )
// log("loop: select returned n=%li but nothing called.",n);
// . MDW: replaced this with more efficient logic above
// . call the callback for each fd we got
@ -1949,13 +2040,12 @@ void Loop::doPoll ( ) {
// }
// handle returned threads for all other nicenesses
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
g_threads.timedCleanUp(-4,MAX_NICENESS); // 4 ms
// set time
g_now = gettimeofdayInMilliseconds();
// call sleepers if they need it
// call this every (about) 1 second
{
long elapsed = g_now - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
@ -1966,8 +2056,7 @@ void Loop::doPoll ( ) {
// note the last time we called them
s_lastTime = g_now;
// handle returned threads for all other nicenesses
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
}
g_threads.timedCleanUp(-4,MAX_NICENESS); // 4 ms
}
// debug msg
if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Exited doPoll.");
@ -1989,9 +2078,9 @@ void Loop::interruptsOff ( ) {
sigset_t rtmin;
sigemptyset ( &rtmin );
// tmp debug hack, so we don't have real time signals now...
#ifndef _VALGRIND_
sigaddset ( &rtmin, GB_SIGRTMIN );
#endif
// #ifndef _VALGRIND_
// sigaddset ( &rtmin, GB_SIGRTMIN );
// #endif
// block it
if ( sigprocmask ( SIG_BLOCK , &rtmin, 0 ) < 0 ) {
log("loop: interruptsOff: sigprocmask: %s.", strerror(errno));
@ -2014,9 +2103,9 @@ void Loop::interruptsOn ( ) {
sigset_t rtmin;
sigemptyset ( &rtmin );
// uncomment this next line to easily disable real time interrupts
#ifndef _VALGRIND_
sigaddset ( &rtmin, GB_SIGRTMIN );
#endif
// #ifndef _VALGRIND_
// sigaddset ( &rtmin, GB_SIGRTMIN );
// #endif
// debug msg
//log("interruptsOn");
// let everyone know before we are vulnerable to an interrupt
@ -2028,6 +2117,7 @@ void Loop::interruptsOn ( ) {
}
}
/*
// handle hot real time signals here
void sigHandlerRT ( int x , siginfo_t *info , void *v ) {
// if we're not hot, what are we doing here?
@ -2066,7 +2156,9 @@ void sigHandlerRT ( int x , siginfo_t *info , void *v ) {
// debug msg
//fprintf (stderr,"out of rt handler\n");
}
*/
/*
// come here when we get a GB_SIGRTMIN+X signal
void sigHandler_r ( int x , siginfo_t *info , void *v ) {
@ -2092,22 +2184,20 @@ void sigHandler_r ( int x , siginfo_t *info , void *v ) {
// clear g_errno before callling handlers
g_errno = 0;
// info->si_band values:
//#define POLLIN 0x0001 /* There is data to read */
//#define POLLPRI 0x0002 /* There is urgent data to read */
//#define POLLOUT 0x0004 /* Writing now will not block */
//#define POLLERR 0x0008 /* Error condition */
//#define POLLHUP 0x0010 /* Hung up */
//#define POLLNVAL 0x0020 /* Invalid request: fd not open */
//#define POLLIN 0x0001 // There is data to read
//#define POLLPRI 0x0002 // There is urgent data to read
//#define POLLOUT 0x0004 // Writing now will not block
//#define POLLERR 0x0008 // Error condition
//#define POLLHUP 0x0010 // Hung up
//#define POLLNVAL 0x0020 // Invalid request: fd not open
int band = info->si_band;
/*
fprintf(stderr,"got fd = %i\n", fd );
fprintf(stderr,"got band = %i\n", band );
fprintf(stderr,"band & POLLIN = %i\n", band & POLLIN );
fprintf(stderr,"band & POLLPRI = %i\n", band & POLLPRI );
fprintf(stderr,"band & POLLOUT = %i\n", band & POLLOUT );
fprintf(stderr,"band & POLLERR = %i\n", band & POLLERR );
fprintf(stderr,"band & POLLHUP = %i\n", band & POLLHUP );
*/
// fprintf(stderr,"got fd = %i\n", fd );
// fprintf(stderr,"got band = %i\n", band );
// fprintf(stderr,"band & POLLIN = %i\n", band & POLLIN );
// fprintf(stderr,"band & POLLPRI = %i\n", band & POLLPRI );
// fprintf(stderr,"band & POLLOUT = %i\n", band & POLLOUT );
// fprintf(stderr,"band & POLLERR = %i\n", band & POLLERR );
// fprintf(stderr,"band & POLLHUP = %i\n", band & POLLHUP );
// translate SIGPIPE's to band of POLLHUP
if ( info->si_signo == SIGPIPE ) {
band = POLLHUP;
@ -2134,8 +2224,8 @@ void sigHandler_r ( int x , siginfo_t *info , void *v ) {
// so we can call their callbacks and remove them
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
// // g_threads.cleanUp ( (ThreadEntry *)val , x/*max niceness*/);
// g_threads.cleanUp ( (ThreadEntry *)val , 1000/*max niceness*/);
// // g_threads.cleanUp ( (ThreadEntry *)val , x);// max niceness
// g_threads.cleanUp ( (ThreadEntry *)val , 1000);//max niceness
// // launch any threads in waiting since this sig was
// // from a terminating one
@ -2153,7 +2243,7 @@ void sigHandler_r ( int x , siginfo_t *info , void *v ) {
// . NOTE: when it's connected it sets both POLLIN and POLLOUT
// . NOTE: or when a socket is trying to connect to it if it's listener
//if ( band & (POLLIN | POLLOUT) == (POLLIN | POLLOUT) )
// g_loop.callCallbacks_ass ( true/*forReading?*/ , fd );
// g_loop.callCallbacks_ass ( true , fd ); // for reading
if ( band & POLLIN ) {
// keep stats on this now since some linuxes dont work right
g_stats.m_readSignals++;
@ -2189,7 +2279,7 @@ void sigHandler_r ( int x , siginfo_t *info , void *v ) {
// end ifdef CYGWIN
#endif
}
*/
/*
#if 1 || (LINUX_VERSION_CODE < KERNEL_VERSION(2,3,31))
@ -2409,24 +2499,24 @@ void Loop::disableTimer() {
}
void Loop::enableTimer() {
m_canQuickPoll = true;
// logf(LOG_WARN, "xxx enabling");
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
//setitimer(ITIMER_REAL, &m_quickInterrupt, NULL);
}
// void Loop::enableTimer() {
// m_canQuickPoll = true;
// // logf(LOG_WARN, "xxx enabling");
// setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
// //setitimer(ITIMER_REAL, &m_quickInterrupt, NULL);
// }
//calling with a 0 niceness will turn off the timer interrupt
void Loop::setitimerInterval(long niceness) {
if(niceness) {
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
m_canQuickPoll = true;
}
else {
setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL);
m_canQuickPoll = false;
}
}
// void Loop::setitimerInterval(long niceness) {
// if(niceness) {
// setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
// m_canQuickPoll = true;
// }
// else {
// setitimer(ITIMER_VIRTUAL, &m_noInterrupt, NULL);
// m_canQuickPoll = false;
// }
// }

View File

@ -358,11 +358,14 @@ bool UdpServer::init ( unsigned short port, UdpProtocol *proto, long niceness,
return false;
// . also register for writing to the socket, when it's ready
// . use the original niceness for this, too
if ( ! g_loop.registerWriteCallback ( m_sock,
this,
sendPollWrapper_ass,
0 )) // niceness ) )
return false;
// . what does this really mean? shouldn't we only use it
// when we try to write but the write buf is full so we have
// to try again later when it becomes unfull?
// if ( ! g_loop.registerWriteCallback ( m_sock,
// this,
// sendPollWrapper_ass,
// 0 )) // niceness ) )
// return false;
// . also register for 30 ms tix (was 15ms)
// but we aren't using tokens any more so I raised it
// . it's low so we can claim any unclaimed tokens!
@ -867,11 +870,11 @@ bool UdpServer::doSending_ass (UdpSlot *slot,bool allowResends,long long now) {
// . this wrapper is called when m_sock is ready for writing
// . should only be called by Loop.cpp since it calls callbacks
// . should only be called if in an interrupt or interrupts are off!!
void sendPollWrapper_ass ( int fd , void *state ) {
UdpServer *THIS = (UdpServer *)state;
// begin the read/send/callback loop
THIS->process_ass ( g_now );
}
// void sendPollWrapper_ass ( int fd , void *state ) {
// UdpServer *THIS = (UdpServer *)state;
// // begin the read/send/callback loop
// THIS->process_ass ( g_now );
// }
// . should only be called from process_ass() since this is not re-entrant
// . sends all the awaiting dgrams it can
@ -1198,6 +1201,12 @@ long UdpServer::readSock_ass ( UdpSlot **slotPtr , long long now ) {
MSG_PEEK ,
(sockaddr *)&from ,
&fromLen );
// note it
if ( g_conf.m_logDebugLoop )
log("loop: readsock_ass: peekSize=%i m_sock/fd=%i",
peekSize,m_sock);
// cancel silly g_errnos and return 0 since we blocked
if ( peekSize < 0 ) {
g_errno = errno;
@ -1448,7 +1457,7 @@ long UdpServer::readSock_ass ( UdpSlot **slotPtr , long long now ) {
getSlot = false;
// try to prevent another lockup condition of msg20 spawing
// a msg22 request to self but failing...
if ( msgType == 0x20 && m_msg20sInWaiting >= 100 && niceness )
if ( msgType == 0x20 && m_msg20sInWaiting >= 50 && niceness )
getSlot = false;
// if running short on mem, do not accept any more requests
// because we can lock up from that, too
@ -1485,7 +1494,10 @@ long UdpServer::readSock_ass ( UdpSlot **slotPtr , long long now ) {
// in loop, the proper way is to throttle back the # of
// outstanding tagdb lookups or whatever at the source
// otherwise we jam up
if ( msgType == 0x00 && m_numUsedSlots > 500 && niceness )
// . tagdb lookups were being dropped because of this being
// 500 so i raised to 900. a lot of them were from
// 'get outlink tag recs' or 'get link info' (0x20)
if ( msgType == 0x00 && m_numUsedSlots > 1000 && niceness )
getSlot = false;
// added this because host #14 was clogging on
@ -2631,13 +2643,13 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
// happen since we're already in an interrupt handler, so we have
// to let g_loop know to poll
// . TODO: won't he have to wakeup before he'll poll?????
#ifndef _POLLONLY_
if ( ! g_loop.m_needToPoll &&
sigqueue ( s_pid, GB_SIGRTMIN + 1 , svt ) < 0 )
g_loop.m_needToPoll = true;
#else
g_loop.m_needToPoll = true;
#endif
// #ifndef _POLLONLY_
// if ( ! g_loop.m_needToPoll &&
// sigqueue ( s_pid, GB_SIGRTMIN + 1 , svt ) < 0 )
// g_loop.m_needToPoll = true;
// #else
// g_loop.m_needToPoll = true;
// #endif
// . tell g_loop that we did a queue
// . he sets this to false before calling our makeCallbacks_ass()
g_someAreQueued = true;