mirror of
https://github.com/gigablast/open-source-search-engine.git
synced 2024-10-04 04:07:13 +03:00
try to fix inner loop logic some more
This commit is contained in:
parent
10f897e5be
commit
ee070d9378
126
Loop.cpp
126
Loop.cpp
@ -131,7 +131,7 @@ static void sigalrmHandler( int x , siginfo_t *info , void *y ) ;
|
||||
static void sigvtalrmHandler( int x , siginfo_t *info , void *y ) ;
|
||||
|
||||
//long g_fdWriteBits[MAX_NUM_FDS/32];
|
||||
long g_fdReadBits [MAX_NUM_FDS/32];
|
||||
//long g_fdReadBits [MAX_NUM_FDS/32];
|
||||
|
||||
void Loop::unregisterReadCallback ( int fd, void *state ,
|
||||
void (* callback)(int fd,void *state),
|
||||
@ -142,11 +142,11 @@ void Loop::unregisterReadCallback ( int fd, void *state ,
|
||||
silent );
|
||||
}
|
||||
|
||||
// void Loop::unregisterWriteCallback ( int fd, void *state ,
|
||||
// void (* callback)(int fd,void *state)){
|
||||
// // from writing
|
||||
// unregisterCallback ( m_writeSlots , fd , state , callback );
|
||||
// }
|
||||
void Loop::unregisterWriteCallback ( int fd, void *state ,
|
||||
void (* callback)(int fd,void *state)){
|
||||
// from writing
|
||||
unregisterCallback ( m_writeSlots , fd , state , callback );
|
||||
}
|
||||
|
||||
void Loop::unregisterSleepCallback ( void *state ,
|
||||
void (* callback)(int fd,void *state)){
|
||||
@ -203,15 +203,15 @@ void Loop::unregisterCallback ( Slot **slots , int fd , void *state ,
|
||||
FD_CLR(fd,&s_selectMaskRead );
|
||||
break;
|
||||
}
|
||||
// for (long i = 0; i < s_numWriteFds ; i++ ) {
|
||||
// if ( s_writeFds[i] != fd ) continue;
|
||||
// s_writeFds[i] = s_writeFds[s_numWriteFds-1];
|
||||
// s_numWriteFds--;
|
||||
// // remove from select mask too
|
||||
// FD_CLR(fd,&s_selectMaskWrite);
|
||||
for (long i = 0; i < s_numWriteFds ; i++ ) {
|
||||
if ( s_writeFds[i] != fd ) continue;
|
||||
s_writeFds[i] = s_writeFds[s_numWriteFds-1];
|
||||
s_numWriteFds--;
|
||||
// remove from select mask too
|
||||
FD_CLR(fd,&s_selectMaskWrite);
|
||||
// FD_CLR(fd,&s_selectMaskExcept);
|
||||
// break;
|
||||
// }
|
||||
break;
|
||||
}
|
||||
}
|
||||
// debug msg
|
||||
//log("Loop::unregistered fd=%li state=%lu", fd, (long)state );
|
||||
@ -257,14 +257,14 @@ bool Loop::registerReadCallback ( int fd,
|
||||
}
|
||||
|
||||
|
||||
// bool Loop::registerWriteCallback ( int fd,
|
||||
// void *state,
|
||||
// void (* callback)(int fd, void *state ) ,
|
||||
// long niceness ) {
|
||||
// // the "false" answers the question "for reading?"
|
||||
// if ( addSlot ( false, fd, state, callback, niceness ) )return true;
|
||||
// return log("loop: Unable to register write callback.");
|
||||
// }
|
||||
bool Loop::registerWriteCallback ( int fd,
|
||||
void *state,
|
||||
void (* callback)(int fd, void *state ) ,
|
||||
long niceness ) {
|
||||
// the "false" answers the question "for reading?"
|
||||
if ( addSlot ( false, fd, state, callback, niceness ) )return true;
|
||||
return log("loop: Unable to register write callback.");
|
||||
}
|
||||
|
||||
// tick is in milliseconds
|
||||
bool Loop::registerSleepCallback ( long tick ,
|
||||
@ -297,9 +297,8 @@ bool Loop::addSlot ( bool forReading , int fd, void *state,
|
||||
// . ensure fd not already registered with this callback/state
|
||||
// . prevent dups so you can keep calling register w/o fear
|
||||
Slot *s;
|
||||
//if ( forReading ) s = m_readSlots [ fd ];
|
||||
//else s = m_writeSlots [ fd ];
|
||||
s = m_readSlots [ fd ];
|
||||
if ( forReading ) s = m_readSlots [ fd ];
|
||||
else s = m_writeSlots [ fd ];
|
||||
while ( s ) {
|
||||
if ( s->m_callback == callback &&
|
||||
s->m_state == state ) {
|
||||
@ -318,34 +317,34 @@ bool Loop::addSlot ( bool forReading , int fd, void *state,
|
||||
// for pointing to slot already in position for fd
|
||||
Slot *next ;
|
||||
// store ourselves in the slot for this fd
|
||||
//if ( forReading ) {
|
||||
next = m_readSlots [ fd ];
|
||||
m_readSlots [ fd ] = s;
|
||||
// if not already registered, add to list
|
||||
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskRead ) ) {
|
||||
s_readFds[s_numReadFds++] = fd;
|
||||
FD_SET ( fd,&s_selectMaskRead );
|
||||
// sanity
|
||||
if ( s_numReadFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
|
||||
}
|
||||
if ( forReading ) {
|
||||
next = m_readSlots [ fd ];
|
||||
m_readSlots [ fd ] = s;
|
||||
// if not already registered, add to list
|
||||
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskRead ) ) {
|
||||
s_readFds[s_numReadFds++] = fd;
|
||||
FD_SET ( fd,&s_selectMaskRead );
|
||||
// sanity
|
||||
if ( s_numReadFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
|
||||
}
|
||||
// fd == MAX_NUM_FDS if it's a sleep callback
|
||||
//if ( fd < MAX_NUM_FDS ) {
|
||||
//FD_SET ( fd , &m_readfds );
|
||||
//FD_SET ( fd , &m_exceptfds );
|
||||
//}
|
||||
// }
|
||||
// else {
|
||||
// next = m_writeSlots [ fd ];
|
||||
// m_writeSlots [ fd ] = s;
|
||||
// //FD_SET ( fd , &m_writefds );
|
||||
// // if not already registered, add to list
|
||||
// if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
|
||||
// s_writeFds[s_numWriteFds++] = fd;
|
||||
// FD_SET ( fd,&s_selectMaskWrite );
|
||||
// // sanity
|
||||
// if ( s_numWriteFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
|
||||
// }
|
||||
// }
|
||||
}
|
||||
else {
|
||||
next = m_writeSlots [ fd ];
|
||||
m_writeSlots [ fd ] = s;
|
||||
//FD_SET ( fd , &m_writefds );
|
||||
// if not already registered, add to list
|
||||
if ( fd<MAX_NUM_FDS && ! FD_ISSET ( fd,&s_selectMaskWrite ) ) {
|
||||
s_writeFds[s_numWriteFds++] = fd;
|
||||
FD_SET ( fd,&s_selectMaskWrite );
|
||||
// sanity
|
||||
if ( s_numWriteFds>MAX_NUM_FDS){char *xx=NULL;*xx=0;}
|
||||
}
|
||||
}
|
||||
// set our callback and state
|
||||
s->m_callback = callback;
|
||||
s->m_state = state;
|
||||
@ -430,6 +429,7 @@ bool Loop::setNonBlocking ( int fd , long niceness ) {
|
||||
// . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks
|
||||
void Loop::callCallbacks_ass ( bool forReading , int fd , long long now ,
|
||||
long niceness ) {
|
||||
|
||||
// debug msg
|
||||
//if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass start");
|
||||
//if ( fd != 1024 ) {
|
||||
@ -937,7 +937,7 @@ bool Loop::init ( ) {
|
||||
//mdw:disableTimer();
|
||||
|
||||
// make this 7ms i guess
|
||||
setitimer(ITIMER_REAL, &m_realInterrupt, NULL);
|
||||
//setitimer(ITIMER_REAL, &m_realInterrupt, NULL);
|
||||
// this is 10ms
|
||||
setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL);
|
||||
|
||||
@ -1811,8 +1811,7 @@ void Loop::doPoll ( ) {
|
||||
fd_set writefds;
|
||||
fd_set exceptfds;
|
||||
memcpy ( &readfds, &s_selectMaskRead , sizeof(fd_set) );
|
||||
memcpy ( &writefds, &s_selectMaskRead , sizeof(fd_set) );
|
||||
//memcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) );
|
||||
memcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) );
|
||||
//memcpy ( &exceptfds, &s_selectMaskExcept , sizeof(fd_set) );
|
||||
|
||||
// what is the point of fds for writing... its for when we
|
||||
@ -1882,15 +1881,18 @@ void Loop::doPoll ( ) {
|
||||
if ( g_conf.m_logDebugLoop)
|
||||
logf(LOG_DEBUG,"loop: Got %li fds waiting.",n);
|
||||
|
||||
// for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
||||
// // continue if not set for reading
|
||||
// if ( FD_ISSET ( i , &readfds ) ||
|
||||
// FD_ISSET ( i , &writefds ) ||
|
||||
// FD_ISSET ( i , &exceptfds ) )
|
||||
// // debug
|
||||
// log("loop: fd %li is on",i);
|
||||
// // if niceness is not -1, handle it below
|
||||
// }
|
||||
for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) {
|
||||
// continue if not set for reading
|
||||
if ( FD_ISSET ( i , &readfds ) )
|
||||
log("loop: fd %li is on for read",i);
|
||||
if ( FD_ISSET ( i , &writefds ) )
|
||||
log("loop: fd %li is on for write",i);
|
||||
if ( FD_ISSET ( i , &exceptfds ) )
|
||||
log("loop: fd %li is on for except",i);
|
||||
// debug
|
||||
|
||||
// if niceness is not -1, handle it below
|
||||
}
|
||||
|
||||
// . reset the need to poll flag if everything is caught up now
|
||||
// . let's take this out for now ... won't this leave some
|
||||
@ -1947,6 +1949,8 @@ void Loop::doPoll ( ) {
|
||||
calledOne = true;
|
||||
callCallbacks_ass (true/*forReading?*/,fd, g_now,0);
|
||||
}
|
||||
// fds are always ready for writing so take this out.
|
||||
// our read callbacks always try to do a write as well.
|
||||
if ( FD_ISSET ( fd , &writefds ) ) {
|
||||
if ( g_conf.m_logDebugLoop )
|
||||
log("loop: calling cback0 niceness=%li fd=%i"
|
||||
@ -1990,6 +1994,8 @@ void Loop::doPoll ( ) {
|
||||
calledOne = true;
|
||||
callCallbacks_ass (true/*forReading?*/,fd, g_now,1);
|
||||
}
|
||||
// fds are always ready for writing so take this out.
|
||||
// our read callbacks always try to do a write as well.
|
||||
if ( FD_ISSET ( fd , &writefds ) ) {
|
||||
if ( g_conf.m_logDebugLoop )
|
||||
log("loop: calling cback1 niceness=%li fd=%i"
|
||||
|
12
Loop.h
12
Loop.h
@ -134,10 +134,10 @@ class Loop {
|
||||
// . register this "fd" with "callback"
|
||||
// . "callback" will be called when fd is ready for reading
|
||||
// . "callback" will be called when there is an error on fd
|
||||
/* bool registerWriteCallback ( int fd , */
|
||||
/* void *state , */
|
||||
/* void (* callback)(int fd, void *state ) , */
|
||||
/* long niceness ); */
|
||||
bool registerWriteCallback ( int fd ,
|
||||
void *state ,
|
||||
void (* callback)(int fd, void *state ) ,
|
||||
long niceness );
|
||||
|
||||
// . register this callback to be called every second
|
||||
// . TODO: implement "seconds" parameter
|
||||
@ -150,8 +150,8 @@ class Loop {
|
||||
void unregisterReadCallback ( int fd, void *state ,
|
||||
void (* callback)(int fd,void *state),
|
||||
bool silent = false );
|
||||
/* void unregisterWriteCallback ( int fd, void *state , */
|
||||
/* void (* callback)(int fd,void *state)); */
|
||||
void unregisterWriteCallback ( int fd, void *state ,
|
||||
void (* callback)(int fd,void *state));
|
||||
void unregisterSleepCallback ( void *state ,
|
||||
void (* callback)(int fd,void *state));
|
||||
|
||||
|
@ -1002,10 +1002,10 @@ TcpSocket *TcpServer::wrapSocket ( int sd , long niceness , bool isIncoming ) {
|
||||
goto hadError;
|
||||
// what does thie really mean? shouldn't we only register for write
|
||||
// if a write we did failed because the buffer was full?
|
||||
// if(!g_loop.registerWriteCallback(sd,this,writeSocketWrapper,niceness)){
|
||||
// g_loop.unregisterReadCallback(sd,this , readSocketWrapper );
|
||||
// goto hadError;
|
||||
// }
|
||||
if(!g_loop.registerWriteCallback(sd,this,writeSocketWrapper,niceness)){
|
||||
g_loop.unregisterReadCallback(sd,this , readSocketWrapper );
|
||||
goto hadError;
|
||||
}
|
||||
// return "s" on success
|
||||
return s;
|
||||
// otherwise, free "s" and return NULL
|
||||
@ -1138,6 +1138,11 @@ void readSocketWrapper2 ( int sd , void *state ) ;
|
||||
// . g_errno will be set by Loop if there was a kinda socket reset error
|
||||
void readSocketWrapper ( int sd , void *state ) {
|
||||
readSocketWrapper2 ( sd , state );
|
||||
// since we got rid of waiting for writing on fds, since it only
|
||||
// applies to freshly connected tcp sockets, we poll for ready-
|
||||
// -for-write fds on the select() call in Loop.cpp on the same fds
|
||||
// we are waiting for reads on. so if we get a signal it could really
|
||||
// be a ready-for-write signal, so try this writing just in case.
|
||||
writeSocketWrapper ( sd , state );
|
||||
}
|
||||
|
||||
@ -1506,6 +1511,7 @@ void writeSocketWrapper ( int sd , void *state ) {
|
||||
// debug msg
|
||||
//log("........... TcpServer::writeSocketWrapper sd=%li\n",sd);
|
||||
TcpServer *THIS = (TcpServer *)state;
|
||||
|
||||
// get the TcpSocket for this socket descriptor
|
||||
TcpSocket *s = THIS->getSocket ( sd );
|
||||
if ( ! s ) {
|
||||
@ -1613,6 +1619,12 @@ long TcpServer::writeSocket ( TcpSocket *s ) {
|
||||
//long status = readSocket ( s );
|
||||
//return status; //-1;
|
||||
}
|
||||
|
||||
// we only register write callback to see when it is connected so
|
||||
// we can do a write, so we should not need this now
|
||||
g_loop.unregisterWriteCallback(s->m_sd,this,writeSocketWrapper);
|
||||
|
||||
|
||||
loop:
|
||||
// send some stuff
|
||||
long toSend = s->m_sendBufUsed - s->m_sendOffset;
|
||||
@ -1978,7 +1990,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
|
||||
// always free the sendBuf
|
||||
if ( s->m_sendBuf ) mfree (s->m_sendBuf, s->m_sendBufSize,"TcpServer");
|
||||
// unregister it with Loop so we don't get any calls about it
|
||||
//g_loop.unregisterWriteCallback ( sd , this , writeSocketWrapper );
|
||||
g_loop.unregisterWriteCallback ( sd , this , writeSocketWrapper );
|
||||
g_loop.unregisterReadCallback ( sd , this , readSocketWrapper );
|
||||
// debug msg
|
||||
//log("unregistering sd=%li",sd);
|
||||
|
Loading…
Reference in New Issue
Block a user