try polling fds now with select(). real-time signals

were somewhat of an artifact from bygone days and the
select() call when done right seems to not use any more
cpu. plus it should work well with apple os x and cygwin etc.
This commit is contained in:
mwells 2014-09-02 22:05:15 -07:00
parent 2c81157031
commit 048aa60fd9

476
Loop.cpp
View File

@ -148,6 +148,15 @@ void Loop::unregisterSleepCallback ( void *state ,
unregisterCallback (m_readSlots,MAX_NUM_FDS,state,callback);
}
static fd_set s_selectMaskRead;
static fd_set s_selectMaskWrite;
static fd_set s_selectMaskExcept;
static int s_readFds[MAX_NUM_FDS];
static int s_writeFds[MAX_NUM_FDS];
static long s_numReadFds = 0;
static long s_numWriteFds = 0;
void Loop::unregisterCallback ( Slot **slots , int fd , void *state ,
void (* callback)(int fd,void *state) ,
bool silent ) {
@ -178,6 +187,27 @@ void Loop::unregisterCallback ( Slot **slots , int fd , void *state ,
//mfree ( s , sizeof(Slot) , "Loop" );
returnSlot ( s );
found = true;
// if the last one, then remove the FD from s_fdList
// so and clear a bit so doPoll() function is fast
if ( slots[fd] == s && s->m_next == NULL ) {
for (long i = 0; i < s_numReadFds ; i++ ) {
if ( s_readFds[i] != fd ) continue;
s_readFds[i] = s_readFds[s_numReadFds-1];
s_numReadFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskRead );
break;
}
for (long i = 0; i < s_numWriteFds ; i++ ) {
if ( s_writeFds[i] != fd ) continue;
s_writeFds[i] = s_writeFds[s_numWriteFds-1];
s_numWriteFds--;
// remove from select mask too
FD_CLR(fd,&s_selectMaskWrite);
FD_CLR(fd,&s_selectMaskExcept);
break;
}
}
// debug msg
//log("Loop::unregistered fd=%li state=%lu", fd, (long)state );
// revert back to old min if this is the Slot we're removing
@ -285,6 +315,13 @@ bool Loop::addSlot ( bool forReading , int fd, void *state,
if ( forReading ) {
next = m_readSlots [ fd ];
m_readSlots [ fd ] = s;
// if not already registered, add to list
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskRead ) ) {
s_readFds[s_numReadFds++] = fd;
FD_SET ( fd,&s_selectMaskRead );
// sanity
if ( s_numReadFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
}
// fd == MAX_NUM_FDS if it's a sleep callback
//if ( fd < MAX_NUM_FDS ) {
//FD_SET ( fd , &m_readfds );
@ -295,6 +332,13 @@ bool Loop::addSlot ( bool forReading , int fd, void *state,
next = m_writeSlots [ fd ];
m_writeSlots [ fd ] = s;
//FD_SET ( fd , &m_writefds );
// if not already registered, add to list
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
s_writeFds[s_numWriteFds++] = fd;
FD_SET ( fd,&s_selectMaskWrite );
// sanity
if ( s_numWriteFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
}
}
// set our callback and state
s->m_callback = callback;
@ -649,6 +693,12 @@ void sigHandlerQueue_r ( int x , siginfo_t *info , void *v ) {
bool Loop::init ( ) {
// clear this up here before using in doPoll()
FD_ZERO(&s_selectMaskRead);
FD_ZERO(&s_selectMaskWrite);
FD_ZERO(&s_selectMaskExcept);
// redhat 9's NPTL doesn't like our async signals
if ( ! g_conf.m_allowAsyncSignals ) g_isHot = false;
#ifdef _VALGRIND_
@ -1064,6 +1114,7 @@ void sighupHandler ( int x , siginfo_t *info , void *y ) {
long long s_lastTime = 0;
bool Loop::runLoop ( ) {
#ifndef _POLLONLY_
// set of signals to watch for
sigset_t sigs0;
@ -1171,6 +1222,21 @@ bool Loop::runLoop ( ) {
g_process.shutdown ( true );
}
//
//
// THE HEART OF GB. process events/signals on FDs.
//
//
doPoll();
goto BIGLOOP;
// make compiler happy
return 0;
//g_udpServer2.sendPoll_ass(true,g_now);
//g_udpServer2.process_ass ( g_now );
// MDW: see if this works without this junk, if not then
@ -1218,7 +1284,7 @@ bool Loop::runLoop ( ) {
//while ( m_needToPoll ) doPoll();
#ifndef _POLLONLY_
//#ifndef _POLLONLY_
// hack
//char buffer[100];
@ -1230,21 +1296,21 @@ bool Loop::runLoop ( ) {
//check for pending signals, return right away if none.
//then we'll do the low priority stuff while we were
//supposed to be sleeping.
g_inWaitState = true;
//g_inWaitState = true;
//sigNum = sigtimedwait (&sigs0, &info, s_sigWaitTimePtr ) ;
#undef usleep
//#undef usleep
// now we just usleep(). an arriving signal will call
// sigHandlerQueue_r() then break us out of this sleep.
// 10000 microseconds is 10 milliseconds. it should break out
// when a signal comes in just like the sleep() function.
usleep(1000 * 10);
//usleep(1000 * 10);
// reinstate the thing that prevents us from non-chalantly adding
// usleeps() which could degrade performance
#define usleep(a) { char *xx=NULL;*xx=0; }
//#define usleep(a) { char *xx=NULL;*xx=0; }
// if no signal, we just waited 20 ms and nothing happened
// why do we need this now? MDW
@ -1254,79 +1320,79 @@ bool Loop::runLoop ( ) {
//logf(LOG_DEBUG,"loop: sigNum=%li signo=%li alrm=%li",
// (long)sigNum,info.si_signo,(long)SIGVTALRM);
// no longer in a wait state...
g_inWaitState = false;
//g_inWaitState = false;
long n = MAX_NUM_FDS / 32;
// long n = MAX_NUM_FDS / 32;
// process file descriptor callbacks for file descriptors
// we queued in sigHandlerQueue_r() function above.
// we use an array of 1024 bits like the poll function i guess.
for ( long i = 0 ; i < n ; i++ ) {
// this is a 32-bit number
if ( ! g_fdReadBits[i] ) continue;
// scan the individual bits now
for ( long j = 0 ; j < 32 ; j++ ) {
// mask mask
unsigned long mask = 1 << j;
// skip jth bit if not on
if ( ! g_fdReadBits[i] & mask ) continue;
// block signals for just a sec so we can
// clear it now that we've handled it
//maskSignals();
// clear it
g_fdReadBits[i] &= ~mask;
// reinstate signals
//unmaskSignals();
// construct the file descriptor
long fd = i*32 + j;
// . call all callbacks registered on this fd
// . forReading = true
callCallbacks_ass ( true , fd , g_now );
}
}
// // process file descriptor callbacks for file descriptors
// // we queued in sigHandlerQueue_r() function above.
// // we use an array of 1024 bits like the poll function i guess.
// for ( long i = 0 ; i < n ; i++ ) {
// // this is a 32-bit number
// if ( ! g_fdReadBits[i] ) continue;
// // scan the individual bits now
// for ( long j = 0 ; j < 32 ; j++ ) {
// // mask mask
// unsigned long mask = 1 << j;
// // skip jth bit if not on
// if ( ! g_fdReadBits[i] & mask ) continue;
// // block signals for just a sec so we can
// // clear it now that we've handled it
// //maskSignals();
// // clear it
// g_fdReadBits[i] &= ~mask;
// // reinstate signals
// //unmaskSignals();
// // construct the file descriptor
// long fd = i*32 + j;
// // . call all callbacks registered on this fd
// // . forReading = true
// callCallbacks_ass ( true , fd , g_now );
// }
// }
// do the same thing but for writing now
for ( long i = 0 ; i < n ; i++ ) {
// this is a 32-bit number
if ( ! g_fdWriteBits[i] ) continue;
// scan the individual bits now
for ( long j = 0 ; j < 32 ; j++ ) {
// mask mask
unsigned long mask = 1 << j;
// skip jth bit if not on
if ( ! g_fdWriteBits[i] & mask ) continue;
// block signals for just a sec so we can
// clear it now that we've handled it
//maskSignals();
// clear it
g_fdWriteBits[i] &= ~mask;
// reinstate signals
//unmaskSignals();
// construct the file descriptor
long fd = i*32 + j;
// . call all callbacks registered on this fd.
// . forReading = false.
callCallbacks_ass ( false , fd , g_now );
}
}
// // do the same thing but for writing now
// for ( long i = 0 ; i < n ; i++ ) {
// // this is a 32-bit number
// if ( ! g_fdWriteBits[i] ) continue;
// // scan the individual bits now
// for ( long j = 0 ; j < 32 ; j++ ) {
// // mask mask
// unsigned long mask = 1 << j;
// // skip jth bit if not on
// if ( ! g_fdWriteBits[i] & mask ) continue;
// // block signals for just a sec so we can
// // clear it now that we've handled it
// //maskSignals();
// // clear it
// g_fdWriteBits[i] &= ~mask;
// // reinstate signals
// //unmaskSignals();
// // construct the file descriptor
// long fd = i*32 + j;
// // . call all callbacks registered on this fd.
// // . forReading = false.
// callCallbacks_ass ( false , fd , g_now );
// }
// }
long long elapsed = g_now - s_lastTime;
// if someone changed the system clock on us, this could be negative
// so fix it! otherwise, times may NEVER get called in our lifetime
if ( elapsed < 0 ) elapsed = m_minTick;
// call this every (about) m_minTicks milliseconds
if ( elapsed >= m_minTick ) {
// MAX_NUM_FDS is the fd for sleep callbacks
callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// note the last time we called them
//g_now = gettimeofdayInMilliseconds();
s_lastTime = g_now;
}
// long long elapsed = g_now - s_lastTime;
// // if someone changed the system clock on us, this could be negative
// // so fix it! otherwise, times may NEVER get called in our lifetime
// if ( elapsed < 0 ) elapsed = m_minTick;
// // call this every (about) m_minTicks milliseconds
// if ( elapsed >= m_minTick ) {
// // MAX_NUM_FDS is the fd for sleep callbacks
// callCallbacks_ass ( true , MAX_NUM_FDS , g_now );
// // note the last time we called them
// //g_now = gettimeofdayInMilliseconds();
// s_lastTime = g_now;
// }
// call remaining callbacks for udp msgs
if ( g_udpServer.needBottom() )
g_udpServer.makeCallbacks_ass ( 2 );
// // call remaining callbacks for udp msgs
// if ( g_udpServer.needBottom() )
// g_udpServer.makeCallbacks_ass ( 2 );
//if(g_udpServer2.needBottom())
// g_udpServer2.makeCallbacks_ass ( 2 );
@ -1335,19 +1401,15 @@ bool Loop::runLoop ( ) {
// startTime > 10)
// goto notime;
if ( g_conf.m_sequentialProfiling )
g_threads.printState();
// if ( g_conf.m_sequentialProfiling )
// g_threads.printState();
if ( g_threads.m_needsCleanup )
// limit to 4ms. cleanup any niceness thread.
g_threads.timedCleanUp(4 ,MAX_NICENESS);
// if ( g_threads.m_needsCleanup )
// // limit to 4ms. cleanup any niceness thread.
// g_threads.timedCleanUp(4 ,MAX_NICENESS);
#endif
// #endif
goto BIGLOOP;
// make compiler happy
return 0;
/*
@ -1630,7 +1692,7 @@ void sigioHandler ( int x , siginfo_t *info , void *y ) {
// . this handles high priority fds first (lowest niceness)
void Loop::doPoll ( ) {
// set time
g_now = gettimeofdayInMilliseconds();
//g_now = gettimeofdayInMilliseconds();
// debug msg
//log("**************** GOT SIGIO *************");
// . turn it off here so it can be turned on again after we've
@ -1640,7 +1702,7 @@ void Loop::doPoll ( ) {
m_needToPoll = false;
// debug msg
//if ( g_conf.m_logDebugLoop ) log(LOG_DEBUG,"loop: Entered doPoll.");
log(LOG_DEBUG,"loop: Entered doPoll.");
if ( g_conf.m_logDebugLoop) log(LOG_DEBUG,"loop: Entered doPoll.");
// print log
if ( g_log.needsPrinting() ) g_log.printBuf();
@ -1662,30 +1724,67 @@ void Loop::doPoll ( ) {
// long repeats = 0;
// skipLowerPriorities:
// descriptor bits for calling select()
// fd_set readfds;
// fd_set writefds;
// fd_set exceptfds;
// clear fds for select()
//FD_ZERO ( &readfds );
//FD_ZERO ( &writefds );
//FD_ZERO ( &exceptfds );
timeval v;
v.tv_sec = 0;
if ( m_inQuickPoll ) v.tv_usec = 0;
else v.tv_usec = 10 * 1000; // 10ms for sleepcallbacks
// set descriptors we should watch
// MDW: no longer necessary since we have s_selectMaskRead, etc.
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// if ( m_readSlots [i] ) {
// FD_SET ( i , &readfds );
// FD_SET ( i , &exceptfds );
// }
// if ( m_writeSlots[i] ) {
// FD_SET ( i , &writefds );
// FD_SET ( i , &exceptfds );
// }
// }
again:
// used to measure cpu usage. sigalarm needs to know if we are
// sitting idle in select() or are actively doing something w/ the cpu
g_inWaitState = true;
// gotta copy to our own since bits get cleared by select() function
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
// clear fds for select()
FD_ZERO ( &readfds );
FD_ZERO ( &writefds );
memcpy ( &readfds, &s_selectMaskRead , sizeof(fd_set) );
//memcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) );
//memcpy ( &exceptfds, &s_selectMaskExcept , sizeof(fd_set) );
// what is the point of fds for writing... skip it
FD_ZERO ( &writefds );
FD_ZERO ( &exceptfds );
timeval v;
v.tv_sec = 0;
v.tv_usec = 0;
// set descriptors we should watch
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
if ( m_readSlots [i] ) {
FD_SET ( i , &readfds );
FD_SET ( i , &exceptfds );
}
if ( m_writeSlots[i] ) {
FD_SET ( i , &writefds );
FD_SET ( i , &exceptfds );
}
}
again:
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( FD_ISSET ( i , &s_selectMaskRead ) ||
// FD_ISSET ( i , &writefds ) ||
// FD_ISSET ( i , &exceptfds ) )
// // debug
// log("loop: fd %li is set",i);
// // if niceness is not -1, handle it below
// }
// poll the fd's searching for socket closes
n = select (MAX_NUM_FDS, &readfds, &writefds, &exceptfds, &v);
n = select (MAX_NUM_FDS,
&readfds,
&writefds,
&exceptfds,
&v );
g_inWaitState = false;
if ( n < 0 ) {
// valgrind
if ( errno == EINTR ) goto again;
@ -1693,9 +1792,21 @@ void Loop::doPoll ( ) {
log("loop: select: %s.",strerror(g_errno));
return;
}
// debug msg
if ( g_conf.m_logDebugLoop)
log(LOG_DEBUG,"loop: Got %li fds waiting.",n);
logf(LOG_DEBUG,"loop: Got %li fds waiting.",n);
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( FD_ISSET ( i , &readfds ) ||
// FD_ISSET ( i , &writefds ) ||
// FD_ISSET ( i , &exceptfds ) )
// // debug
// log("loop: fd %li is on",i);
// // if niceness is not -1, handle it below
// }
// . reset the need to poll flag if everything is caught up now
// . let's take this out for now ... won't this leave some
// threads hanging, they do not always generate SIGIO's if
@ -1734,58 +1845,109 @@ void Loop::doPoll ( ) {
// handle returned threads for niceness -1
//g_threads.timedCleanUp(2/*ms*/);
// . call the callback for each fd we got
// . only call callbacks for fds that have a nice of 0 here
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// continue if not set for reading
if ( ! FD_ISSET ( i , &readfds ) ) continue;
// if niceness is not 0, handle it below
s = m_readSlots [ i /*fd*/ ];
if ( s && s->m_niceness != 0 ) continue;
callCallbacks_ass (true/*forReading?*/,i, g_now);
processedOne = true;
}
// only call callbacks for fds that have a nice of 0 here
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
if ( ! FD_ISSET ( i , &writefds ) ) continue;
// if niceness is not 0, handle it below
s = m_writeSlots [ i /*fd*/ ];
if ( s && s->m_niceness != 0 ) continue;
callCallbacks_ass (false/*forReading?*/,i, g_now);
processedOne = true;
// now keep this fast, too. just check fds we need to.
for ( long i = 0 ; i < s_numReadFds ; i++ ) {
int fd = s_readFds[i];
s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness > 0 ) continue;
// must be set
if ( FD_ISSET ( fd , &readfds ) )
callCallbacks_ass (true/*forReading?*/,fd, g_now,0);
}
// for ( long i = 0 ; i < s_numWriteFds ; i++ ) {
// int fd = s_writeFds[i];
// s = m_writeSlots [ fd ];
// // if niceness is not 0, handle it below
// if ( s && s->m_niceness > 0 ) continue;
// // must be set
// if ( FD_ISSET ( fd , &writefds ) )
// callCallbacks_ass (false/*forReading?*/,fd, g_now,1);
// }
// handle returned threads for niceness 0
g_threads.timedCleanUp(3,0); // 3 ms
#if 0
if(processedOne && repeats < QUERYPRIORITYWEIGHT) {
//m_needToPoll = true;
repeats++;
goto skipLowerPriorities;
}
//
// the stuff below is not super urgent, do not do if in quickpoll
//
if ( m_inQuickPoll ) return;
// log(LOG_WARN,
// "Loop: repeated %li times before moving to lower priority threads",
// repeats);
#endif
// now for lower priority fds
for ( long i = 0 ; i < s_numReadFds ; i++ ) {
int fd = s_readFds[i];
s = m_readSlots [ fd ];
// if niceness is not 0, handle it below
if ( s && s->m_niceness <= 0 ) continue;
// must be set
if ( FD_ISSET ( fd , &readfds ) )
callCallbacks_ass (true/*forReading?*/,fd, g_now,1);
}
// for ( long i = 0 ; i < s_numWriteFds ; i++ ) {
// int fd = s_writeFds[i];
// s = m_writeSlots [ fd ];
// // if niceness is not 0, handle it below
// if ( s && s->m_niceness <= 0 ) continue;
// // must be set
// if ( FD_ISSET ( fd , &writefds ) )
// callCallbacks_ass (false/*forReading?*/,fd, g_now,1);
// }
// . MDW: replaced this with more efficient logic above
// . call the callback for each fd we got
// . only call callbacks for fds that have a nice of 0 here
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( ! FD_ISSET ( i , &readfds ) ) continue;
// // if niceness is not 0, handle it below
// s = m_readSlots [ i /*fd*/ ];
// if ( s && s->m_niceness != 0 ) continue;
// callCallbacks_ass (true/*forReading?*/,i, g_now);
// processedOne = true;
// }
// // only call callbacks for fds that have a nice of 0 here
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// if ( ! FD_ISSET ( i , &writefds ) ) continue;
// // if niceness is not 0, handle it below
// s = m_writeSlots [ i /*fd*/ ];
// if ( s && s->m_niceness != 0 ) continue;
// callCallbacks_ass (false/*forReading?*/,i, g_now);
// processedOne = true;
// }
// #if 0
// if(processedOne && repeats < QUERYPRIORITYWEIGHT) {
// //m_needToPoll = true;
// repeats++;
// goto skipLowerPriorities;
// }
// // log(LOG_WARN,
// // "Loop: repeated %li times before moving to lower priority threads",
// // repeats);
// #endif
// // handle low priority fds here
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// // continue if not set for reading
// if ( ! FD_ISSET ( i , &readfds ) ) continue;
// // if niceness is 0, we already handled it above
// s = m_readSlots [ i /*fd*/ ];
// if ( s && s->m_niceness <= 0 ) continue;
// callCallbacks_ass (true/*forReading?*/,i, g_now);
// }
// // only call callbacks for fds that have a nice of 0 here
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// if ( ! FD_ISSET ( i , &writefds ) ) continue;
// // if niceness is 0, we already handled it above
// s = m_writeSlots [ i /*fd*/ ];
// if ( s && s->m_niceness <= 0 ) continue;
// callCallbacks_ass (false/*forReading?*/,i, g_now);
// }
// handle low priority fds here
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
// continue if not set for reading
if ( ! FD_ISSET ( i , &readfds ) ) continue;
// if niceness is 0, we already handled it above
s = m_readSlots [ i /*fd*/ ];
if ( s && s->m_niceness <= 0 ) continue;
callCallbacks_ass (true/*forReading?*/,i, g_now);
}
// only call callbacks for fds that have a nice of 0 here
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
if ( ! FD_ISSET ( i , &writefds ) ) continue;
// if niceness is 0, we already handled it above
s = m_writeSlots [ i /*fd*/ ];
if ( s && s->m_niceness <= 0 ) continue;
callCallbacks_ass (false/*forReading?*/,i, g_now);
}
// handle returned threads for all other nicenesses
g_threads.timedCleanUp(4,MAX_NICENESS); // 4 ms
@ -2125,6 +2287,11 @@ void Loop::quickPoll(long niceness, const char* caller, long lineno) {
m_inQuickPoll = true;
// doPoll() will since we are in quickpoll and only call niceness 0
// callbacks for all the fds. and it will set the timer to 0.
doPoll ();
/*
//g_udpServer2.process_ass ( g_now , 0 );
g_udpServer.process_ass ( g_now , 0 );
g_threads.timedCleanUp( 100 , 0 ); // ms ms, niceness 0
@ -2163,20 +2330,20 @@ void Loop::quickPoll(long niceness, const char* caller, long lineno) {
// continue if not set for reading
if ( ! FD_ISSET ( i , &readfds ) ) continue;
// if niceness is not -1, handle it below
s = m_readSlots [ i /*fd*/ ];
s = m_readSlots [ i ]; // i = fd
// now we have niceness 2 if Sections.cpp
if ( s && s->m_niceness >= niceness ) continue;
callCallbacks_ass (true/*forReading?*/,i, now);
callCallbacks_ass (true,i, now); // reading = true
// sanity check
if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; }
}
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
if ( ! FD_ISSET ( i , &writefds ) ) continue;
// if niceness is not -1, handle it below
s = m_writeSlots [ i /*fd*/ ];
s = m_writeSlots [ i ]; // i = fd
// now we have niceness 2 if Sections.cpp
if ( s && s->m_niceness >= niceness ) continue;
callCallbacks_ass (false/*forReading?*/,i, now);
callCallbacks_ass (false,i, now); // forReading = false
// sanity check
if ( g_niceness > niceness ) { char*xx=NULL;*xx=0; }
}
@ -2189,6 +2356,7 @@ void Loop::quickPoll(long niceness, const char* caller, long lineno) {
g_dns.m_udpServer.makeCallbacks_ass(0);
theend:
*/
// reset this again
g_missedQuickPolls = 0;