#include "gb-include.h" #include "Msg17.h" #include "Msg40.h" #include "UdpServer.h" //#include "zlib/zlib.h" //#include "TitleRec.h" #include "XmlDoc.h" // Z_BUF_ERROR static void gotReplyWrapper17 ( void *state , UdpSlot *slot ) ; static void handleRequest17 ( UdpSlot *slot , int32_t niceness ) ; static void gotReplyWrapper17b ( void *state , UdpSlot *slot ) ; // our caches RdbCache g_genericCache[MAX_GENERIC_CACHES]; // for saving network //RdbCache g_genericCacheSmallLocal[MAX_GENERIC_CACHES]; // . compress option array, compressed implies allocated uncompressed buffer // which the caller is responsible for. Uncompressed data will be stored in // the m_buf. char g_genericCacheCompress[MAX_GENERIC_CACHES] = { 1 }; // 0 }; // seoresults //static int32_t s_noMax = -1; //static int32_t s_oneMonth = 86400*30; int32_t *g_genericCacheMaxAge[MAX_GENERIC_CACHES] = { &g_conf.m_searchResultsMaxCacheAge //&s_oneMonth // seoresultscache //&g_conf.m_siteLinkInfoMaxCacheAge , // Msg50.cpp now has a dynamic max cache age which is higher // for higher qualities, since those take longer to recompute and // are usually much more stable... //&s_noMax // &g_conf.m_siteQualityMaxCacheAge }; Msg17::Msg17() { m_cbuf = NULL; m_found = false; } Msg17::~Msg17() { reset(); } void Msg17::reset() { if ( m_cbuf ) mfree ( m_cbuf , m_cbufSize , "Msg17" ); m_cbuf = NULL; m_found = false; } bool Msg17::registerHandler ( ) { // . register ourselves with the high priority udp server // . it calls our callback when it receives a msg of type 0x17 //if ( ! g_udpServer2.registerHandler ( 0x17, handleRequest17 )) // return false; if ( ! g_udpServer.registerHandler ( 0x17, handleRequest17 )) return false; return true; } // . returns false if blocked, true otherwise // . sets errno on error bool Msg17::getFromCache ( char cacheId, key_t key, char **recPtr, int32_t *recSize, //char *coll , collnum_t collnum , void *state , void (*callback) (void *state) , int32_t niceness , int32_t timeout ) { // assume not in cache m_found = false; // use a fake recSize if we should if ( ! recSize ) recSize = &m_tmpRecSize; // save ptr to msg40 so we can deserialize it if we block //m_msg40 = msg40; // make the key based on the query and other input parms in msg40 //SearchInput *si = msg40->m_si; //m_key = si->makeKey ( ); m_cacheId = cacheId; m_key = key; m_recPtr = recPtr; m_recSize = recSize; // . only one machine will cache the query results // . if it goes down we are SOL //int32_t hostId = key.n0 % g_hostdb.getNumHosts(); Host *host = getResponsibleHost ( key , &g_hostdb ) ; // if all in group are dead, we can't do it if ( ! host ) return true; //char *coll = si->m_coll; // . if we are the host responsible for caching this Msg40 check it //!g_conf.m_interfaceMachine ) { // look in our local cache RdbCache *c = NULL; if ( host->m_hostId == g_hostdb.m_hostId ) c = &g_genericCache[(int)m_cacheId]; // otherwise, try the small local cache to save network //else // c = &g_genericCacheSmallLocal[m_cacheId]; // is it in there? if ( c ) { time_t cachedTime; // return true if not found in our local cache if ( ! c->getRecord ( collnum , m_key , recPtr , recSize , false , // do copy? *g_genericCacheMaxAge[(int)cacheId], true , // keep stats &cachedTime )) return true; // set the delta m_cachedTimeDelta = getTime() - cachedTime; // uncompress it if we should gotReply( NULL , *recPtr , *recSize , false ); return true; } // if we were the man, we are done, it was not in the cache //if ( host->m_hostId == g_hostdb.m_hostId ) { // // . set the msg40 from the list's data buf // // . process it as a reply // gotReply ( NULL , recPtr2 , recSize2 , false ); // // we did not block, so return true // return true; //} // otherwise, we have to send a request over the network m_state = state; m_callback = callback; m_niceness = niceness; // . skip if his ping is too high // . this is a sanity check now because we should never be returned // the hostId of a dead host //if ( g_hostdb.isDead ( h ) ) return true; if ( g_hostdb.isDead ( host ) ) { char *xx = NULL; *xx = 0; } // make request char *p = m_request; *(key_t *)p = m_key; p += sizeof(key_t); // store the id *p++ = m_cacheId; // the flag (0 means read request, 1 means store request) *p++ = 0; gbmemcpy ( p , &collnum, sizeof(collnum_t)); p += sizeof(collnum_t); //strcpy ( p , coll ); p += gbstrlen ( coll ) + 1; // . send the request to the key host // . this returns false and sets g_errno on error // . now wait for 1 sec before timing out // . TODO: change timeout to 50ms instead of 1 second if ( ! g_udpServer.sendRequest ( m_request , p - m_request , 0x17 , // msgType 0x17 host->m_ip , host->m_port , host->m_hostId , NULL , this , // state data gotReplyWrapper17 , timeout , // timeout -1 , // backoff -1 , // maxWait NULL , // m_buf 0 , // MSG17_BUF_SIZE niceness ) ) { // cback nice log("query: Had error sending request for cache cacheId=%i: " "%s.",cacheId, mstrerror(g_errno)); return true; } // otherwise we blocked return false; } void gotReplyWrapper17 ( void *state , UdpSlot *slot ) { Msg17 *THIS = (Msg17 *)state; // don't let udpserver free the request, it's our m_key slot->m_sendBufAlloc = NULL; // don't let UdpServer free the reply buffer, it is our m_buf //slot->m_readBuf = NULL; // gotReply() does not block, it may set g_errno //THIS->gotReply ( slot , THIS->m_buf , slot->m_readBufSize , true ) ; THIS->gotReply ( slot, slot->m_readBuf, slot->m_readBufSize, true ) ; // call callback since we blocked, since we're here THIS->m_callback ( THIS->m_state ); } // . reply should hold our new docId // . returns false on error and sets g_errno bool Msg17::gotReply ( UdpSlot *slot , char *cbuf , int32_t cbufSize , bool includesCachedTime ) { // bitch about any error we got if ( g_errno ) return log("query: Reply for cache cacheId=%i " "(niceness=%"INT32") had error: %s.", m_cacheId,m_niceness,mstrerror(g_errno)); // assume we were not able to get the cached Msg40 m_found = false; // we were not found if reply was size 0 if ( cbufSize <= 0 ) return false; // first 4 bytes is cached time if ( includesCachedTime ) { m_cachedTimeDelta = *(int32_t *)cbuf; cbuf += 4; cbufSize -= 4; } // to save network, try to cache this in the small local cache /* if ( slot ) { char *coll = m_request + sizeof(key_t) + 2; RdbCache *c = &g_genericCacheSmallLocal[m_cacheId]; if ( c && ! c->addRecord ( coll , k , p , pend - p ) ) log("query: Had error storing cache cacheId=%"INT32": " "%s.", cacheId, mstrerror(g_errno)); } */ // set the buf size to its max for call to uncompress() //char ubuf [ 32*1024 ]; if ( g_genericCacheCompress[(int)m_cacheId] ) { // the uncompressed size is always preceeds the compressed data int32_t recSize = *(int32_t *)cbuf; // sanity check if ( recSize < 0 ) { log("query: got bad cached rec size=%"INT32" cacheid=%"INT32"", recSize,(int32_t)m_cacheId); return false; } cbuf += 4; cbufSize -= 4; //int32_t ubufMaxSize = 32*1024; //int32_t ubufSize = ubufMaxSize; int32_t ubufSize = recSize; char *ubuf = (char *)mmalloc ( ubufSize , "Msg17" ); if ( ! ubuf ) return log("query: Could not allocate %"INT32" bytes for " "uncompressing cache cacheId=%i: " "%s.", ubufSize,m_cacheId,mstrerror(g_errno)); // uncompress the reply int err = gbuncompress ( (unsigned char *) ubuf , (uint32_t *) &ubufSize , (unsigned char *) cbuf , (uint32_t ) cbufSize ); // hmmmm... if ( err == Z_BUF_ERROR ) { mfree ( ubuf , ubufSize , "Msg17"); return log("query: Allocated buffer space was not " "enough to hold uncompressed cache " "cacheId=%i.", m_cacheId); } // set g_errno and return false on error if ( err != Z_OK ) { mfree ( ubuf , ubufSize , "Msg17"); g_errno = EUNCOMPRESSERROR; return log("query: Got error in zlib when " "uncompressing cache cacheId=%i: " "ZG_ERRNO=%i", m_cacheId, err); } // sanity check if ( ubufSize != recSize ) { char *xx = NULL; *xx = 0; } if ( m_recPtr ) *m_recPtr = ubuf; if ( m_recSize ) *m_recSize = ubufSize; } // can be called after a local call to RdbCache::getRecord() in order // to just uncompress the data, so ignore this part else if ( slot ) { // . in case we haven't reset, free any buffer we've stolen // before we overwrite it if ( m_cbuf ) mfree ( m_cbuf , m_cbufSize , "Msg17" ); // . we need to free it though i guess, so remember it // oopsy, readBufSize is not the allocation size! //m_cbuf = cbuf; //m_cbufSize = cbufSize; m_cbuf = slot->m_readBuf; m_cbufSize = slot->m_readBufMaxSize; if ( m_recPtr ) *m_recPtr = cbuf; if ( m_recSize ) *m_recSize = cbufSize; // do not free the buf, we will steal it at this point slot->m_readBuf = NULL; } // we got it m_found = true; // return true on success return true; } // . only return false if you want slot to be nuked w/o replying // . MUST always call g_udpServer::sendReply() or sendErrorReply() void handleRequest17 ( UdpSlot *slot , int32_t niceness ) { // get the request, should be a full url char *request = slot->m_readBuf; int32_t requestSize = slot->m_readBufSize; UdpServer *us = &g_udpServer; //if ( niceness == 0 ) us = &g_udpServer2; // need at least a key in the request if ( requestSize < (int32_t)sizeof(key_t) ) { log("query: Request size for cache (%"INT32") " "is too small for some reason.", (int32_t)sizeof(key_t)); us->sendErrorReply ( slot , EBADREQUESTSIZE ); return; } char *p = request; char *pend = request + requestSize; // get the key key_t k = *(key_t *)p; p += sizeof(key_t); // id char cacheId = *p++; // then 1-byte flag (0 means read request, 1 means store request) char flag = *p++; // NULL terminated collection name follows //char *coll = p; p += gbstrlen ( coll ) + 1 ; collnum_t collnum = *(collnum_t *)p; p += sizeof(collnum_t); RdbCache *c = &g_genericCache[(int)cacheId]; // if flag is 1 then it is a request to store a compressed Msg40 if ( flag == 1 ) { if ( ! c->addRecord ( collnum , k, p, pend - p ) ) log("query: Had error storing cache cacheId=%i: " "%s.", cacheId, mstrerror(g_errno)); // send an empty reply us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ); return; } char *rec; int32_t recSize; time_t cachedTime; // send back nothing if not in cache if ( ! c->getRecord ( collnum , k , &rec , &recSize , false , // do copy? *g_genericCacheMaxAge[(int)cacheId], true ,// keep stats &cachedTime)) { us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ); return; } // alloc a buf to hold reply and 4 bytes for cachedTime int32_t bufSize = 4 + recSize; char *buf = (char *)mmalloc ( bufSize , "Msg17"); if ( ! buf ) { us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ); return; } // make the cached time into a delta because all hosts in the // cluster may not be synced time_t cachedTimeDelta = getTime() - cachedTime; char *x = buf; *(int32_t *)x = cachedTimeDelta; x += 4; gbmemcpy ( x , rec , recSize ); // . set the msg40 from the cached record // . UdpServer should free "rec" when he's done sending it us->sendReply_ass ( buf , bufSize , buf , // alloc bufSize , // allocSize slot , 2 ); // timeout in 2 secs } static int32_t s_numInProgress = 0; // . if you had to make your own Msg40 class because it wasn't cached // then you should store it in the cache here // . returns false if blocked, true otherwise // . MAY set g_errno on error //bool Msg17::storeInCache ( Msg40 *msg40 ) { bool Msg17::storeInCache ( char cacheId , key_t key , char *recPtr , int32_t recSize , collnum_t collnum, // char *coll , int32_t niceness , int32_t timeout ) { // only allow 200 launched in progress stores at a time to // save UdpSlots if ( s_numInProgress >= 200 ) { log("query: Unable to launch Msg17 request, already have 200 " "in progress. May affect performance."); return true; } // // how much room? // int32_t tmpSize = msg40->getStoredSize(); // // store in this buffer // char tmp [ 32 * 1024 ]; // // bail if too much // if ( tmpSize > 32*1024 ) { // log(LOG_LIMIT, // "query: Size of cached search results page (and all " // "associated data) is %"INT32" bytes. Max is %"INT32". " // "Page not cached.",tmpSize,32*1024); // return true; // } // // serialize into tmp // int32_t nb = msg40->serialize ( tmp , tmpSize ); // // it must fit exactly // if ( nb != tmpSize || nb == 0 ) { // log(LOG_LOGIC, // "query: Size of cached search results page (%"INT32") does not " // "match what it should be. (%"INT32")" , nb , tmpSize ); // return true; // } // // make key // SearchInput *si = msg40->m_si; // key_t k = si->makeKey ( ); m_key = key; m_cacheId = cacheId; // get this host responsible for holding this //int32_t hostId = key.n0 % g_hostdb.getNumHosts(); Host *host = getResponsibleHost ( key , &g_hostdb ) ; // if all in the group are dead, can't do it if ( ! host ) return true; // . skip if his ping is too high // . NO, because if we are caching something that is repeated // a lot, and has a high computation value, like the root // quality computed in Msg50.cpp, we can really hurt performance // so, let's cache it here now if its twin(s) are dead!! //if ( g_hostdb.isDead ( h ) && hostId!=g_hostdb.m_hostId) return true; // make a buffer to hold the request char buf [ 200000 ]; // MSG17_BUF_SIZE; char *p = buf; char *pend = buf + 200000; // MSG17_BUF_SIZE; *(key_t *)p = key ; p += sizeof(key_t); // id *p++ = m_cacheId; // use "1" for a store request *p++ = 1; //char *coll = si->m_coll; //strcpy ( p , coll ); p += gbstrlen(coll) + 1; // includes '\0' gbmemcpy ( p ,&collnum ,sizeof(collnum_t)); p += sizeof(collnum_t); QUICKPOLL(niceness); // now start the rec that will go into the cache char *cacheRec = p; // debug point //if ( key.n0 == 0x6ff1ee0116d9cfebLL ) // log("hey"); if ( g_genericCacheCompress[(int)m_cacheId] ) { // uncompressed size *(int32_t *)p = recSize; p += 4; // sanity check if ( recSize < 0 ) { char *xx = NULL; *xx = 0; } // how much left over int32_t avail = pend - p; // save it int32_t saved = avail; //int32_t clen = gbstrlen(coll); // compress "tmp" into m_buf, but leave leading bytes // for the key int err = gbcompress ( (unsigned char *)p , (uint32_t *)&avail , (unsigned char *)recPtr , (uint32_t )recSize ); // advance p by how many bytes we stored into "p" p += avail; // check for error if ( err != Z_OK ) { g_errno = ECOMPRESSFAILED; log("query: Compression of cache cacheId=%i " "failed err=%"INT32" avail=%"INT32" collnum=%"INT32" " "recSize=%"INT32".", cacheId , (int32_t)err , saved , (int32_t)collnum , recSize ); return true; } } else { // bail if not enough room! if ( recSize > pend - p ) return true; // otheriwse, store it gbmemcpy ( p, recPtr, recSize ); // advance p by how many bytes we stored into "p" p += recSize; } // . size of whole request, key and the serialized/compressed Msg40 // . "size" is set by call to ::compress() above int32_t requestSize = p - buf; // p - buf + size // size of the part of the request that goes into the cache int32_t cacheRecSize = p - cacheRec ; // store the key *(key_t *) buf = key; // if we are that host, store it ourselves right now if ( host->m_hostId == g_hostdb.m_hostId ) { RdbCache *c = &g_genericCache[(int)m_cacheId]; if ( ! c->addRecord ( collnum , key , cacheRec , cacheRecSize ) ) log("query: Failed to add compressed search results " "page to cache cacheId=%i: %s.", cacheId, mstrerror(g_errno)); return true; } // make a request to hold it char *request = (char *) mdup ( buf , requestSize , "Msg17" ); if ( ! request ) { log("query: Failed to allocate %i bytes to hold cache " "cacheId=%"INT32" for transmission to caching host.", cacheId, requestSize); return true; } QUICKPOLL(niceness); // . send it as a request to the appropriate machine // . this returns false and sets g_errno on error // . returns true if it blocks if ( ! g_udpServer.sendRequest ( request , // request requestSize , // request size 0x17 , // msgType 0x17 host->m_ip , host->m_port , // low priority host->m_hostId , NULL , this , // state data gotReplyWrapper17b , timeout ,// timeout in secs -1 , -1 , NULL , 0 , niceness )){// cback nice log("query: Had error sending request to cache cacheid=%i: " " %s.",cacheId,mstrerror(g_errno)); mfree ( request , requestSize , "Msg17" ); return true; } // count as in progress s_numInProgress++; // we blocked return false; } // got reply from a store request void gotReplyWrapper17b ( void *state , UdpSlot *slot ) { // . don't let udp server free m_buf, we own it // . not anymore, we don't want caller to delay just to store this! //slot->m_sendBufAlloc = NULL; // dec the count s_numInProgress--; } // . Dns.cpp uses key.n1 but we keep using key.n0 here so we can re-use old // saved caches Host *Msg17::getResponsibleHost ( key_t key , Hostdb * hostdb ) { // get the hostNum that should handle this int32_t hostId = key.n0 % hostdb->getNumHosts(); // return it if it is alive if ( ! hostdb->isDead ( hostId ) ) return hostdb->getHost ( hostId ); // how many hosts are up? int32_t numAlive = hostdb->getNumHostsAlive(); // if all dead return NULL if ( numAlive == 0 ) return NULL; // try another hostNum int32_t hostNum = key.n0 % numAlive; // otherwise, chain to him int32_t count = 0; for ( int32_t i = 0 ; i < g_hostdb.m_numHosts ; i++ ) { // get the ith host Host *host = &hostdb->m_hosts[i]; // skip him if he is dead if ( hostdb->isDead ( host ) ) continue; // count it if alive, continue if not our number if ( count++ != hostNum ) continue; // we got a match, we cannot use hostNum as the hostId now // because the host with that hostId might be dead return host; } // Msg17 does not need to set this //g_errno = EDEADHOST; return NULL; }