try to ameliorate the udp slot jamming issue.

This commit is contained in:
Matt 2015-04-03 10:43:11 -06:00
parent c2567ad244
commit c991a2dcdd
4 changed files with 121 additions and 21 deletions

View File

@ -349,11 +349,16 @@ void printUdpTable ( SafeBuf *p, char *title, UdpServer *server ,
"<td><b>hostname</b></td>";
}
UdpSlot *slot = server->m_head3;
int32_t callbackReadyCount = 0;
for ( ; slot ; slot = slot->m_next3 , callbackReadyCount++ );
p->safePrintf ( "<table %s>"
"<tr class=hdrow><td colspan=19>"
"<center>"
//"<font size=+1>"
"<b>%s</b> (%"INT32" transactions)"
"(%"INT32" replies ready)"
//"</font>"
"</td></tr>"
"<tr bgcolor=#%s>"
@ -380,6 +385,7 @@ void printUdpTable ( SafeBuf *p, char *title, UdpServer *server ,
"</tr>\n" ,
TABLE_STYLE,
title , server->getNumUsedSlots() ,
callbackReadyCount ,
DARK_BLUE ,
dd );

View File

@ -251,7 +251,7 @@ bool UdpServer::init ( uint16_t port, UdpProtocol *proto, int32_t niceness,
m_head2 = NULL;
m_tail2 = NULL;
// linked list of callback candidates
//m_head3 = NULL;
m_head3 = NULL;
// . set up hash table that converts key (ip/port/transId) to a slot
// . m_numBuckets must be power of 2
m_numBuckets = getHighestLitBitValue ( m_maxSlots * 6 );
@ -1693,8 +1693,45 @@ int32_t UdpServer::readSock_ass ( UdpSlot **slotPtr , int64_t now ) {
// we we could not allocate a read buffer to hold the request/reply
// just send a cancel ack so the send will call its callback with
// g_errno set
// MDW: it won't make it into the m_head3 callback linked list with
// this logic.... maybe it just times out or resends later...
if ( ! status && g_errno == ENOMEM ) goto cancelTrans;
// if it is now a complete REPLY, callback will need to be called
// so insert into the callback linked list, m_head3.
// we have to put slots with NULL callbacks in here since they
// are incoming requests to handle.
if ( //slot->m_callback &&
// if we got an error reading the reply (or sending req?) then
// consider it completed too?
// ( slot->isTransactionComplete() || slot->m_errno ) &&
( slot->isDoneReading() || slot->m_errno ) &&
// must not be in there already, lest we double add it
! isInCallbackLinkedList ( slot ) ) {
// debug log
if ( slot->m_errno )
log("udp: adding slot with err = %s to callback list"
, mstrerror(slot->m_errno) );
if ( g_conf.m_logDebugUdp )
log("udp: adding slot=%"PTRFMT" to callback list"
,(PTRTYPE)slot);
// prepare to call the callback by adding it to this
// special linked list
slot->m_next3 = NULL;
slot->m_prev3 = NULL;
if ( ! m_tail3 ) {
m_head3 = slot;
m_tail3 = slot;
}
else {
// insert at end of linked list otherwise
m_tail3->m_next3 = slot;
slot->m_prev3 = m_tail3;
m_tail3 = slot;
}
}
// if(g_conf.m_sequentialProfiling) {
// if(slot->isDoneReading())
// log(LOG_TIMING, "admin: read last dgram: "
@ -1886,6 +1923,10 @@ void UdpServer::resume ( ) {
// . the problem is when we call this with niceness 1 and we convert
// a niceness 1 callback to 0...
bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
// if nothing to call, forget it
if ( ! m_head3 ) return true;
if ( g_conf.m_logDebugUdp )
log(LOG_DEBUG,"udp: makeCallbacks_ass: start. nice=%"INT32" "
"inquickpoll=%"INT32"",
@ -1893,6 +1934,7 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
// bail if suspended
if ( m_isSuspended ) return false;
// . if there are active high priority threads, do not
// call low priority callbacks. in that case
// . This seems to block things up to much?
@ -1938,9 +1980,13 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
nextPass:
UdpSlot *nextSlot = NULL;
// only scan those slots that are ready
//for ( UdpSlot *slot = m_head3 ; slot ; slot = slot->m_next3 )
for ( UdpSlot *slot = m_head2 ; slot ; slot = slot->m_next2 ) {
//for ( UdpSlot *slot = m_head2 ; slot ; slot = slot->m_next2 ) {
for ( UdpSlot *slot = m_head3 ; slot ; slot = nextSlot ) {
// because makeCallback_ass() can delete the slot, use this
nextSlot = slot->m_next3;
// call quick handlers in pass 0, they do not take any time
// and if they do not get called right away can cause this host
// to bottleneck many hosts
@ -2097,12 +2143,15 @@ bool UdpServer::makeCallbacks_ass ( int32_t niceness ) {
//UdpSlot *next3 = slot->m_next2;
// . crap, this can alter the linked list we are scanning
// if it deletes the slot!
// if it deletes the slot! yes, but now we use "nextSlot"
// . return false on error and sets g_errno, true otherwise
// . return true if we called one
// . skip to next slot if did not call callback/handler
if ( ! makeCallback_ass ( slot ) ) continue;
// remove it from the callback list to avoid re-call
removeFromCallbackLinkedList ( slot );
int64_t took = 0;
if ( logIt )
took = gettimeofdayInMillisecondsLocal()-start2;
@ -2245,9 +2294,13 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
start = gettimeofdayInMillisecondsLocal();
// callback is non-NULL if we initiated the transaction
if ( slot->m_callback ) {
// . if transaction has not fully completed, bail
// . unless there was an error
if ( ! g_errno && ! slot->isTransactionComplete())return false;
if ( ! g_errno && ! slot->isTransactionComplete()) {
log("udp: why calling callback when not ready???");
return false;
}
/*
#ifdef _UDPDEBUG_
// if we had the token, give it up so others can send with it
@ -2389,7 +2442,10 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
if ( slot->m_calledHandler ) {
// . if transaction has not fully completed, keep sending
// . unless there was an error
if ( ! g_errno && ! slot->isTransactionComplete())return false;
if ( ! g_errno && ! slot->isTransactionComplete()) {
log("udp: why calling handler when not ready?");
return false;
}
// we should not destroy the slot here on ENOMEM error,
// because handler might be referencing the slot's read buffer
// still. that is what Msg20 does... the first dgram was
@ -2468,6 +2524,7 @@ bool UdpServer::makeCallback_ass ( UdpSlot *slot ) {
if ( g_inSigHandler ) goto queueSig;
// nuke the slot, we gave them a reply...
destroySlot ( slot );
//log("udp: why double calling handler?");
// this kind of callback doesn't count
return false;
}
@ -3126,7 +3183,7 @@ bool UdpServer::shutdown ( bool urgent ) {
time_t now = getTime();
int32_t count = 0;
if(!urgent) {
//if ( m_head && m_head2->m_next2 ) return false;
//if ( m_head && m_head2->m_next2 ) return false;
for ( UdpSlot *slot = m_head2 ; slot ; slot = slot->m_next2 ) {
// if we initiated, then don't count it
if ( slot->m_callback ) continue;
@ -3244,12 +3301,12 @@ UdpSlot *UdpServer::getEmptyUdpSlot_ass ( key_t k ) {
m_tail2 = slot;
}
// also to callback candidates if we should
//if ( hasCallback ) {
// slot->m_next3 = m_head3;
// slot->m_prev3 = NULL;
// if ( m_head3 ) m_head3->m_prev3 = slot;
// m_head3 = slot;
//}
// if ( hasCallback ) {
// slot->m_next3 = m_head3;
// slot->m_prev3 = NULL;
// if ( m_head3 ) m_head3->m_prev3 = slot;
// m_head3 = slot;
// }
// count it
m_numUsedSlots++;
// now store ptr in hash table
@ -3281,6 +3338,42 @@ UdpSlot *UdpServer::getUdpSlot ( key_t k ) {
return m_ptrs[i];
}
bool UdpServer::isInCallbackLinkedList ( UdpSlot *slot ) {
// return if not in the linked list
if ( slot->m_prev3 ) return true;
if ( slot->m_next3 ) return true;
if ( m_head3 == slot ) return true;
return false;
}
void UdpServer::removeFromCallbackLinkedList ( UdpSlot *slot ) {
if ( g_conf.m_logDebugUdp )
log("udp: removing slot=%"PTRFMT" from callback list"
,(PTRTYPE)slot);
// return if not in the linked list
if ( slot->m_prev3 == NULL &&
slot->m_next3 == NULL &&
m_head3 != slot )
return;
// excise from linked list otherwise
if ( m_head3 == slot )
m_head3 = slot->m_next3;
if ( m_tail3 == slot )
m_tail3 = slot->m_prev3;
if ( slot->m_prev3 )
slot->m_prev3->m_next3 = slot->m_next3;
if ( slot->m_next3 )
slot->m_next3->m_prev3 = slot->m_prev3;
// and so we do not try to re-excise it
slot->m_prev3 = NULL;
slot->m_next3 = NULL;
}
// verified that this is not interruptible
void UdpServer::freeUdpSlot_ass ( UdpSlot *slot ) {
bool flipped = interruptsOff();
@ -3291,11 +3384,7 @@ void UdpServer::freeUdpSlot_ass ( UdpSlot *slot ) {
if ( slot->m_prev2 ) slot->m_prev2->m_next2 = slot->m_next2;
if ( slot->m_next2 ) slot->m_next2->m_prev2 = slot->m_prev2;
// also from callback candidates if we should
//if ( slot->m_callback ) {
// if ( slot->m_prev3 ) slot->m_prev3->m_next3 = slot->m_next3;
// else m_head3 = slot->m_next3;
// if ( slot->m_next3 ) slot->m_next3->m_prev3 = slot->m_prev3;
//}
removeFromCallbackLinkedList ( slot );
// discount it
m_numUsedSlots--;
// add to linked list of available slots

View File

@ -282,6 +282,10 @@ class UdpServer {
UdpSlot *getActiveHead ( ) { return m_head2; };
// callback linked list functions (m_head3)
bool isInCallbackLinkedList ( UdpSlot *slot );
void removeFromCallbackLinkedList ( UdpSlot *slot ) ;
// cancel a transaction
void cancel ( void *state , unsigned char msgType ) ;
@ -434,7 +438,8 @@ class UdpServer {
UdpSlot *m_head2;
UdpSlot *m_tail2;
// linked list of callback candidates
//UdpSlot *m_head3;
UdpSlot *m_head3;
UdpSlot *m_tail3;
int32_t m_numUsedSlots;

View File

@ -426,8 +426,8 @@ class UdpSlot {
class UdpSlot *m_next2;
class UdpSlot *m_prev2;
// and for doubly linked list of callback candidates
//class UdpSlot *m_next3;
//class UdpSlot *m_prev3;
class UdpSlot *m_next3;
class UdpSlot *m_prev3;
// store the key so when returning slot we can remove from hash table
key_t m_key;