diff --git a/Loop.cpp b/Loop.cpp index 34e41772..4cecbc80 100644 --- a/Loop.cpp +++ b/Loop.cpp @@ -131,7 +131,7 @@ static void sigalrmHandler( int x , siginfo_t *info , void *y ) ; static void sigvtalrmHandler( int x , siginfo_t *info , void *y ) ; //long g_fdWriteBits[MAX_NUM_FDS/32]; -long g_fdReadBits [MAX_NUM_FDS/32]; +//long g_fdReadBits [MAX_NUM_FDS/32]; void Loop::unregisterReadCallback ( int fd, void *state , void (* callback)(int fd,void *state), @@ -142,11 +142,11 @@ void Loop::unregisterReadCallback ( int fd, void *state , silent ); } -// void Loop::unregisterWriteCallback ( int fd, void *state , -// void (* callback)(int fd,void *state)){ -// // from writing -// unregisterCallback ( m_writeSlots , fd , state , callback ); -// } +void Loop::unregisterWriteCallback ( int fd, void *state , + void (* callback)(int fd,void *state)){ + // from writing + unregisterCallback ( m_writeSlots , fd , state , callback ); +} void Loop::unregisterSleepCallback ( void *state , void (* callback)(int fd,void *state)){ @@ -203,15 +203,15 @@ void Loop::unregisterCallback ( Slot **slots , int fd , void *state , FD_CLR(fd,&s_selectMaskRead ); break; } - // for (long i = 0; i < s_numWriteFds ; i++ ) { - // if ( s_writeFds[i] != fd ) continue; - // s_writeFds[i] = s_writeFds[s_numWriteFds-1]; - // s_numWriteFds--; - // // remove from select mask too - // FD_CLR(fd,&s_selectMaskWrite); + for (long i = 0; i < s_numWriteFds ; i++ ) { + if ( s_writeFds[i] != fd ) continue; + s_writeFds[i] = s_writeFds[s_numWriteFds-1]; + s_numWriteFds--; + // remove from select mask too + FD_CLR(fd,&s_selectMaskWrite); // FD_CLR(fd,&s_selectMaskExcept); - // break; - // } + break; + } } // debug msg //log("Loop::unregistered fd=%li state=%lu", fd, (long)state ); @@ -257,14 +257,14 @@ bool Loop::registerReadCallback ( int fd, } -// bool Loop::registerWriteCallback ( int fd, -// void *state, -// void (* callback)(int fd, void *state ) , -// long niceness ) { -// // the "false" answers the question "for reading?" -// if ( addSlot ( false, fd, state, callback, niceness ) )return true; -// return log("loop: Unable to register write callback."); -// } +bool Loop::registerWriteCallback ( int fd, + void *state, + void (* callback)(int fd, void *state ) , + long niceness ) { + // the "false" answers the question "for reading?" + if ( addSlot ( false, fd, state, callback, niceness ) )return true; + return log("loop: Unable to register write callback."); +} // tick is in milliseconds bool Loop::registerSleepCallback ( long tick , @@ -297,9 +297,8 @@ bool Loop::addSlot ( bool forReading , int fd, void *state, // . ensure fd not already registered with this callback/state // . prevent dups so you can keep calling register w/o fear Slot *s; - //if ( forReading ) s = m_readSlots [ fd ]; - //else s = m_writeSlots [ fd ]; - s = m_readSlots [ fd ]; + if ( forReading ) s = m_readSlots [ fd ]; + else s = m_writeSlots [ fd ]; while ( s ) { if ( s->m_callback == callback && s->m_state == state ) { @@ -318,34 +317,34 @@ bool Loop::addSlot ( bool forReading , int fd, void *state, // for pointing to slot already in position for fd Slot *next ; // store ourselves in the slot for this fd - //if ( forReading ) { - next = m_readSlots [ fd ]; - m_readSlots [ fd ] = s; - // if not already registered, add to list - if ( fdMAX_NUM_FDS){char *xx=NULL;*xx=0;} - } + if ( forReading ) { + next = m_readSlots [ fd ]; + m_readSlots [ fd ] = s; + // if not already registered, add to list + if ( fdMAX_NUM_FDS){char *xx=NULL;*xx=0;} + } // fd == MAX_NUM_FDS if it's a sleep callback //if ( fd < MAX_NUM_FDS ) { //FD_SET ( fd , &m_readfds ); //FD_SET ( fd , &m_exceptfds ); //} - // } - // else { - // next = m_writeSlots [ fd ]; - // m_writeSlots [ fd ] = s; - // //FD_SET ( fd , &m_writefds ); - // // if not already registered, add to list - // if ( fdMAX_NUM_FDS){char *xx=NULL;*xx=0;} - // } - // } + } + else { + next = m_writeSlots [ fd ]; + m_writeSlots [ fd ] = s; + //FD_SET ( fd , &m_writefds ); + // if not already registered, add to list + if ( fdMAX_NUM_FDS){char *xx=NULL;*xx=0;} + } + } // set our callback and state s->m_callback = callback; s->m_state = state; @@ -430,6 +429,7 @@ bool Loop::setNonBlocking ( int fd , long niceness ) { // . if fd is MAX_NUM_FDS and "forReading" is true call all sleepy callbacks void Loop::callCallbacks_ass ( bool forReading , int fd , long long now , long niceness ) { + // debug msg //if ( g_conf.m_logDebugUdp ) log("callCallbacks_ass start"); //if ( fd != 1024 ) { @@ -937,7 +937,7 @@ bool Loop::init ( ) { //mdw:disableTimer(); // make this 7ms i guess - setitimer(ITIMER_REAL, &m_realInterrupt, NULL); + //setitimer(ITIMER_REAL, &m_realInterrupt, NULL); // this is 10ms setitimer(ITIMER_VIRTUAL, &m_quickInterrupt, NULL); @@ -1811,8 +1811,7 @@ void Loop::doPoll ( ) { fd_set writefds; fd_set exceptfds; memcpy ( &readfds, &s_selectMaskRead , sizeof(fd_set) ); - memcpy ( &writefds, &s_selectMaskRead , sizeof(fd_set) ); - //memcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) ); + memcpy ( &writefds, &s_selectMaskWrite , sizeof(fd_set) ); //memcpy ( &exceptfds, &s_selectMaskExcept , sizeof(fd_set) ); // what is the point of fds for writing... its for when we @@ -1882,15 +1881,18 @@ void Loop::doPoll ( ) { if ( g_conf.m_logDebugLoop) logf(LOG_DEBUG,"loop: Got %li fds waiting.",n); - // for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) { - // // continue if not set for reading - // if ( FD_ISSET ( i , &readfds ) || - // FD_ISSET ( i , &writefds ) || - // FD_ISSET ( i , &exceptfds ) ) - // // debug - // log("loop: fd %li is on",i); - // // if niceness is not -1, handle it below - // } + for ( long i = 0 ; i < MAX_NUM_FDS ; i++ ) { + // continue if not set for reading + if ( FD_ISSET ( i , &readfds ) ) + log("loop: fd %li is on for read",i); + if ( FD_ISSET ( i , &writefds ) ) + log("loop: fd %li is on for write",i); + if ( FD_ISSET ( i , &exceptfds ) ) + log("loop: fd %li is on for except",i); + // debug + + // if niceness is not -1, handle it below + } // . reset the need to poll flag if everything is caught up now // . let's take this out for now ... won't this leave some @@ -1947,6 +1949,8 @@ void Loop::doPoll ( ) { calledOne = true; callCallbacks_ass (true/*forReading?*/,fd, g_now,0); } + // fds are always ready for writing so take this out. + // our read callbacks always try to do a write as well. if ( FD_ISSET ( fd , &writefds ) ) { if ( g_conf.m_logDebugLoop ) log("loop: calling cback0 niceness=%li fd=%i" @@ -1990,6 +1994,8 @@ void Loop::doPoll ( ) { calledOne = true; callCallbacks_ass (true/*forReading?*/,fd, g_now,1); } + // fds are always ready for writing so take this out. + // our read callbacks always try to do a write as well. if ( FD_ISSET ( fd , &writefds ) ) { if ( g_conf.m_logDebugLoop ) log("loop: calling cback1 niceness=%li fd=%i" diff --git a/Loop.h b/Loop.h index ac98b32d..c1e82bd8 100644 --- a/Loop.h +++ b/Loop.h @@ -134,10 +134,10 @@ class Loop { // . register this "fd" with "callback" // . "callback" will be called when fd is ready for reading // . "callback" will be called when there is an error on fd - /* bool registerWriteCallback ( int fd , */ - /* void *state , */ - /* void (* callback)(int fd, void *state ) , */ - /* long niceness ); */ + bool registerWriteCallback ( int fd , + void *state , + void (* callback)(int fd, void *state ) , + long niceness ); // . register this callback to be called every second // . TODO: implement "seconds" parameter @@ -150,8 +150,8 @@ class Loop { void unregisterReadCallback ( int fd, void *state , void (* callback)(int fd,void *state), bool silent = false ); - /* void unregisterWriteCallback ( int fd, void *state , */ - /* void (* callback)(int fd,void *state)); */ + void unregisterWriteCallback ( int fd, void *state , + void (* callback)(int fd,void *state)); void unregisterSleepCallback ( void *state , void (* callback)(int fd,void *state)); diff --git a/TcpServer.cpp b/TcpServer.cpp index 008c9a9c..f9790ac9 100644 --- a/TcpServer.cpp +++ b/TcpServer.cpp @@ -1002,10 +1002,10 @@ TcpSocket *TcpServer::wrapSocket ( int sd , long niceness , bool isIncoming ) { goto hadError; // what does thie really mean? shouldn't we only register for write // if a write we did failed because the buffer was full? - // if(!g_loop.registerWriteCallback(sd,this,writeSocketWrapper,niceness)){ - // g_loop.unregisterReadCallback(sd,this , readSocketWrapper ); - // goto hadError; - // } + if(!g_loop.registerWriteCallback(sd,this,writeSocketWrapper,niceness)){ + g_loop.unregisterReadCallback(sd,this , readSocketWrapper ); + goto hadError; + } // return "s" on success return s; // otherwise, free "s" and return NULL @@ -1138,6 +1138,11 @@ void readSocketWrapper2 ( int sd , void *state ) ; // . g_errno will be set by Loop if there was a kinda socket reset error void readSocketWrapper ( int sd , void *state ) { readSocketWrapper2 ( sd , state ); + // since we got rid of waiting for writing on fds, since it only + // applies to freshly connected tcp sockets, we poll for ready- + // -for-write fds on the select() call in Loop.cpp on the same fds + // we are waiting for reads on. so if we get a signal it could really + // be a ready-for-write signal, so try this writing just in case. writeSocketWrapper ( sd , state ); } @@ -1506,6 +1511,7 @@ void writeSocketWrapper ( int sd , void *state ) { // debug msg //log("........... TcpServer::writeSocketWrapper sd=%li\n",sd); TcpServer *THIS = (TcpServer *)state; + // get the TcpSocket for this socket descriptor TcpSocket *s = THIS->getSocket ( sd ); if ( ! s ) { @@ -1613,6 +1619,12 @@ long TcpServer::writeSocket ( TcpSocket *s ) { //long status = readSocket ( s ); //return status; //-1; } + + // we only register write callback to see when it is connected so + // we can do a write, so we should not need this now + g_loop.unregisterWriteCallback(s->m_sd,this,writeSocketWrapper); + + loop: // send some stuff long toSend = s->m_sendBufUsed - s->m_sendOffset; @@ -1978,7 +1990,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) { // always free the sendBuf if ( s->m_sendBuf ) mfree (s->m_sendBuf, s->m_sendBufSize,"TcpServer"); // unregister it with Loop so we don't get any calls about it - //g_loop.unregisterWriteCallback ( sd , this , writeSocketWrapper ); + g_loop.unregisterWriteCallback ( sd , this , writeSocketWrapper ); g_loop.unregisterReadCallback ( sd , this , readSocketWrapper ); // debug msg //log("unregistering sd=%li",sd);