open-source-search-engine/Msg17.cpp
2021-05-06 01:52:55 +10:00

600 lines
19 KiB
C++

#include "gb-include.h"
#include "Msg17.h"
#include "Msg40.h"
#include "UdpServer.h"
//#include "zlib/zlib.h"
//#include "TitleRec.h"
#include "XmlDoc.h" // Z_BUF_ERROR
static void gotReplyWrapper17 ( void *state , UdpSlot *slot ) ;
static void handleRequest17 ( UdpSlot *slot , int32_t niceness ) ;
static void gotReplyWrapper17b ( void *state , UdpSlot *slot ) ;
// our caches
RdbCache g_genericCache[MAX_GENERIC_CACHES];
// for saving network
//RdbCache g_genericCacheSmallLocal[MAX_GENERIC_CACHES];
// . compress option array, compressed implies allocated uncompressed buffer
// which the caller is responsible for. Uncompressed data will be stored in
// the m_buf.
char g_genericCacheCompress[MAX_GENERIC_CACHES] = { 1 };
// 0 }; // seoresults
//static int32_t s_noMax = -1;
//static int32_t s_oneMonth = 86400*30;
int32_t *g_genericCacheMaxAge[MAX_GENERIC_CACHES] = {
&g_conf.m_searchResultsMaxCacheAge
//&s_oneMonth // seoresultscache
//&g_conf.m_siteLinkInfoMaxCacheAge ,
// Msg50.cpp now has a dynamic max cache age which is higher
// for higher qualities, since those take longer to recompute and
// are usually much more stable...
//&s_noMax // &g_conf.m_siteQualityMaxCacheAge
};
Msg17::Msg17() {
m_cbuf = NULL;
m_found = false;
}
Msg17::~Msg17() {
reset();
}
void Msg17::reset() {
if ( m_cbuf ) mfree ( m_cbuf , m_cbufSize , "Msg17" );
m_cbuf = NULL;
m_found = false;
}
bool Msg17::registerHandler ( ) {
// . register ourselves with the high priority udp server
// . it calls our callback when it receives a msg of type 0x17
//if ( ! g_udpServer2.registerHandler ( 0x17, handleRequest17 ))
// return false;
if ( ! g_udpServer.registerHandler ( 0x17, handleRequest17 ))
return false;
return true;
}
// . returns false if blocked, true otherwise
// . sets errno on error
bool Msg17::getFromCache ( char cacheId,
key_t key,
char **recPtr,
int32_t *recSize,
//char *coll ,
collnum_t collnum ,
void *state ,
void (*callback) (void *state) ,
int32_t niceness ,
int32_t timeout ) {
// assume not in cache
m_found = false;
// use a fake recSize if we should
if ( ! recSize ) recSize = &m_tmpRecSize;
// save ptr to msg40 so we can deserialize it if we block
//m_msg40 = msg40;
// make the key based on the query and other input parms in msg40
//SearchInput *si = msg40->m_si;
//m_key = si->makeKey ( );
m_cacheId = cacheId;
m_key = key;
m_recPtr = recPtr;
m_recSize = recSize;
// . only one machine will cache the query results
// . if it goes down we are SOL
//int32_t hostId = key.n0 % g_hostdb.getNumHosts();
Host *host = getResponsibleHost ( key , &g_hostdb ) ;
// if all in group are dead, we can't do it
if ( ! host ) return true;
//char *coll = si->m_coll;
// . if we are the host responsible for caching this Msg40 check it
//!g_conf.m_interfaceMachine ) {
// look in our local cache
RdbCache *c = NULL;
if ( host->m_hostId == g_hostdb.m_hostId )
c = &g_genericCache[(int)m_cacheId];
// otherwise, try the small local cache to save network
//else
// c = &g_genericCacheSmallLocal[m_cacheId];
// is it in there?
if ( c ) {
time_t cachedTime;
// return true if not found in our local cache
if ( ! c->getRecord ( collnum ,
m_key ,
recPtr ,
recSize ,
false , // do copy?
*g_genericCacheMaxAge[(int)cacheId],
true , // keep stats
&cachedTime ))
return true;
// set the delta
m_cachedTimeDelta = getTime() - cachedTime;
// uncompress it if we should
gotReply( NULL , *recPtr , *recSize , false );
return true;
}
// if we were the man, we are done, it was not in the cache
//if ( host->m_hostId == g_hostdb.m_hostId ) {
// // . set the msg40 from the list's data buf
// // . process it as a reply
// gotReply ( NULL , recPtr2 , recSize2 , false );
// // we did not block, so return true
// return true;
//}
// otherwise, we have to send a request over the network
m_state = state;
m_callback = callback;
m_niceness = niceness;
// . skip if his ping is too high
// . this is a sanity check now because we should never be returned
// the hostId of a dead host
//if ( g_hostdb.isDead ( h ) ) return true;
if ( g_hostdb.isDead ( host ) ) { char *xx = NULL; *xx = 0; }
// make request
char *p = m_request;
*(key_t *)p = m_key; p += sizeof(key_t);
// store the id
*p++ = m_cacheId;
// the flag (0 means read request, 1 means store request)
*p++ = 0;
gbmemcpy ( p , &collnum, sizeof(collnum_t)); p += sizeof(collnum_t);
//strcpy ( p , coll ); p += gbstrlen ( coll ) + 1;
// . send the request to the key host
// . this returns false and sets g_errno on error
// . now wait for 1 sec before timing out
// . TODO: change timeout to 50ms instead of 1 second
if ( ! g_udpServer.sendRequest ( m_request ,
p - m_request ,
0x17 , // msgType 0x17
host->m_ip ,
host->m_port ,
host->m_hostId ,
NULL ,
this , // state data
gotReplyWrapper17 ,
timeout , // timeout
-1 , // backoff
-1 , // maxWait
NULL , // m_buf
0 , // MSG17_BUF_SIZE
niceness ) ) { // cback nice
log("query: Had error sending request for cache cacheId=%i: "
"%s.",cacheId, mstrerror(g_errno));
return true;
}
// otherwise we blocked
return false;
}
void gotReplyWrapper17 ( void *state , UdpSlot *slot ) {
Msg17 *THIS = (Msg17 *)state;
// don't let udpserver free the request, it's our m_key
slot->m_sendBufAlloc = NULL;
// don't let UdpServer free the reply buffer, it is our m_buf
//slot->m_readBuf = NULL;
// gotReply() does not block, it may set g_errno
//THIS->gotReply ( slot , THIS->m_buf , slot->m_readBufSize , true ) ;
THIS->gotReply ( slot, slot->m_readBuf, slot->m_readBufSize, true ) ;
// call callback since we blocked, since we're here
THIS->m_callback ( THIS->m_state );
}
// . reply should hold our new docId
// . returns false on error and sets g_errno
bool Msg17::gotReply ( UdpSlot *slot , char *cbuf , int32_t cbufSize ,
bool includesCachedTime ) {
// bitch about any error we got
if ( g_errno ) return log("query: Reply for cache cacheId=%i "
"(niceness=%"INT32") had error: %s.",
m_cacheId,m_niceness,mstrerror(g_errno));
// assume we were not able to get the cached Msg40
m_found = false;
// we were not found if reply was size 0
if ( cbufSize <= 0 ) return false;
// first 4 bytes is cached time
if ( includesCachedTime ) {
m_cachedTimeDelta = *(int32_t *)cbuf;
cbuf += 4;
cbufSize -= 4;
}
// to save network, try to cache this in the small local cache
/*
if ( slot ) {
char *coll = m_request + sizeof(key_t) + 2;
RdbCache *c = &g_genericCacheSmallLocal[m_cacheId];
if ( c && ! c->addRecord ( coll ,
k ,
p ,
pend - p ) )
log("query: Had error storing cache cacheId=%"INT32": "
"%s.", cacheId, mstrerror(g_errno));
}
*/
// set the buf size to its max for call to uncompress()
//char ubuf [ 32*1024 ];
if ( g_genericCacheCompress[(int)m_cacheId] ) {
// the uncompressed size is always preceeds the compressed data
int32_t recSize = *(int32_t *)cbuf;
// sanity check
if ( recSize < 0 ) {
log("query: got bad cached rec size=%"INT32" cacheid=%"INT32"",
recSize,(int32_t)m_cacheId);
return false;
}
cbuf += 4;
cbufSize -= 4;
//int32_t ubufMaxSize = 32*1024;
//int32_t ubufSize = ubufMaxSize;
int32_t ubufSize = recSize;
char *ubuf = (char *)mmalloc ( ubufSize , "Msg17" );
if ( ! ubuf )
return log("query: Could not allocate %"INT32" bytes for "
"uncompressing cache cacheId=%i: "
"%s.",
ubufSize,m_cacheId,mstrerror(g_errno));
// uncompress the reply
int err = gbuncompress ( (unsigned char *) ubuf ,
(uint32_t *) &ubufSize ,
(unsigned char *) cbuf ,
(uint32_t ) cbufSize );
// hmmmm...
if ( err == Z_BUF_ERROR ) {
mfree ( ubuf , ubufSize , "Msg17");
return log("query: Allocated buffer space was not "
"enough to hold uncompressed cache "
"cacheId=%i.", m_cacheId);
}
// set g_errno and return false on error
if ( err != Z_OK ) {
mfree ( ubuf , ubufSize , "Msg17");
g_errno = EUNCOMPRESSERROR;
return log("query: Got error in zlib when "
"uncompressing cache cacheId=%i: "
"ZG_ERRNO=%i", m_cacheId, err);
}
// sanity check
if ( ubufSize != recSize ) { char *xx = NULL; *xx = 0; }
if ( m_recPtr ) *m_recPtr = ubuf;
if ( m_recSize ) *m_recSize = ubufSize;
}
// can be called after a local call to RdbCache::getRecord() in order
// to just uncompress the data, so ignore this part
else if ( slot ) {
// . in case we haven't reset, free any buffer we've stolen
// before we overwrite it
if ( m_cbuf ) mfree ( m_cbuf , m_cbufSize , "Msg17" );
// . we need to free it though i guess, so remember it
// oopsy, readBufSize is not the allocation size!
//m_cbuf = cbuf;
//m_cbufSize = cbufSize;
m_cbuf = slot->m_readBuf;
m_cbufSize = slot->m_readBufMaxSize;
if ( m_recPtr ) *m_recPtr = cbuf;
if ( m_recSize ) *m_recSize = cbufSize;
// do not free the buf, we will steal it at this point
slot->m_readBuf = NULL;
}
// we got it
m_found = true;
// return true on success
return true;
}
// . only return false if you want slot to be nuked w/o replying
// . MUST always call g_udpServer::sendReply() or sendErrorReply()
void handleRequest17 ( UdpSlot *slot , int32_t niceness ) {
// get the request, should be a full url
char *request = slot->m_readBuf;
int32_t requestSize = slot->m_readBufSize;
UdpServer *us = &g_udpServer;
//if ( niceness == 0 ) us = &g_udpServer2;
// need at least a key in the request
if ( requestSize < (int32_t)sizeof(key_t) ) {
log("query: Request size for cache (%"INT32") "
"is too small for some reason.", (int32_t)sizeof(key_t));
us->sendErrorReply ( slot , EBADREQUESTSIZE );
return;
}
char *p = request;
char *pend = request + requestSize;
// get the key
key_t k = *(key_t *)p; p += sizeof(key_t);
// id
char cacheId = *p++;
// then 1-byte flag (0 means read request, 1 means store request)
char flag = *p++;
// NULL terminated collection name follows
//char *coll = p; p += gbstrlen ( coll ) + 1 ;
collnum_t collnum = *(collnum_t *)p; p += sizeof(collnum_t);
RdbCache *c = &g_genericCache[(int)cacheId];
// if flag is 1 then it is a request to store a compressed Msg40
if ( flag == 1 ) {
if ( ! c->addRecord ( collnum ,
k,
p,
pend - p ) )
log("query: Had error storing cache cacheId=%i: "
"%s.", cacheId, mstrerror(g_errno));
// send an empty reply
us->sendReply_ass ( NULL , 0 , NULL , 0 , slot );
return;
}
char *rec;
int32_t recSize;
time_t cachedTime;
// send back nothing if not in cache
if ( ! c->getRecord ( collnum ,
k ,
&rec ,
&recSize ,
false , // do copy?
*g_genericCacheMaxAge[(int)cacheId],
true ,// keep stats
&cachedTime)) {
us->sendReply_ass ( NULL , 0 , NULL , 0 , slot );
return;
}
// alloc a buf to hold reply and 4 bytes for cachedTime
int32_t bufSize = 4 + recSize;
char *buf = (char *)mmalloc ( bufSize , "Msg17");
if ( ! buf ) {
us->sendReply_ass ( NULL , 0 , NULL , 0 , slot );
return;
}
// make the cached time into a delta because all hosts in the
// cluster may not be synced
time_t cachedTimeDelta = getTime() - cachedTime;
char *x = buf;
*(int32_t *)x = cachedTimeDelta; x += 4;
gbmemcpy ( x , rec , recSize );
// . set the msg40 from the cached record
// . UdpServer should free "rec" when he's done sending it
us->sendReply_ass ( buf ,
bufSize ,
buf , // alloc
bufSize , // allocSize
slot ,
2 ); // timeout in 2 secs
}
static int32_t s_numInProgress = 0;
// . if you had to make your own Msg40 class because it wasn't cached
// then you should store it in the cache here
// . returns false if blocked, true otherwise
// . MAY set g_errno on error
//bool Msg17::storeInCache ( Msg40 *msg40 ) {
bool Msg17::storeInCache ( char cacheId ,
key_t key ,
char *recPtr ,
int32_t recSize ,
collnum_t collnum, // char *coll ,
int32_t niceness ,
int32_t timeout ) {
// only allow 200 launched in progress stores at a time to
// save UdpSlots
if ( s_numInProgress >= 200 ) {
log("query: Unable to launch Msg17 request, already have 200 "
"in progress. May affect performance.");
return true;
}
// // how much room?
// int32_t tmpSize = msg40->getStoredSize();
// // store in this buffer
// char tmp [ 32 * 1024 ];
// // bail if too much
// if ( tmpSize > 32*1024 ) {
// log(LOG_LIMIT,
// "query: Size of cached search results page (and all "
// "associated data) is %"INT32" bytes. Max is %"INT32". "
// "Page not cached.",tmpSize,32*1024);
// return true;
// }
// // serialize into tmp
// int32_t nb = msg40->serialize ( tmp , tmpSize );
// // it must fit exactly
// if ( nb != tmpSize || nb == 0 ) {
// log(LOG_LOGIC,
// "query: Size of cached search results page (%"INT32") does not "
// "match what it should be. (%"INT32")" , nb , tmpSize );
// return true;
// }
// // make key
// SearchInput *si = msg40->m_si;
// key_t k = si->makeKey ( );
m_key = key;
m_cacheId = cacheId;
// get this host responsible for holding this
//int32_t hostId = key.n0 % g_hostdb.getNumHosts();
Host *host = getResponsibleHost ( key , &g_hostdb ) ;
// if all in the group are dead, can't do it
if ( ! host ) return true;
// . skip if his ping is too high
// . NO, because if we are caching something that is repeated
// a lot, and has a high computation value, like the root
// quality computed in Msg50.cpp, we can really hurt performance
// so, let's cache it here now if its twin(s) are dead!!
//if ( g_hostdb.isDead ( h ) && hostId!=g_hostdb.m_hostId) return true;
// make a buffer to hold the request
char buf [ 200000 ]; // MSG17_BUF_SIZE;
char *p = buf;
char *pend = buf + 200000; // MSG17_BUF_SIZE;
*(key_t *)p = key ; p += sizeof(key_t);
// id
*p++ = m_cacheId;
// use "1" for a store request
*p++ = 1;
//char *coll = si->m_coll;
//strcpy ( p , coll ); p += gbstrlen(coll) + 1; // includes '\0'
gbmemcpy ( p ,&collnum ,sizeof(collnum_t)); p += sizeof(collnum_t);
QUICKPOLL(niceness);
// now start the rec that will go into the cache
char *cacheRec = p;
// debug point
//if ( key.n0 == 0x6ff1ee0116d9cfebLL )
// log("hey");
if ( g_genericCacheCompress[(int)m_cacheId] ) {
// uncompressed size
*(int32_t *)p = recSize; p += 4;
// sanity check
if ( recSize < 0 ) { char *xx = NULL; *xx = 0; }
// how much left over
int32_t avail = pend - p;
// save it
int32_t saved = avail;
//int32_t clen = gbstrlen(coll);
// compress "tmp" into m_buf, but leave leading bytes
// for the key
int err = gbcompress ( (unsigned char *)p ,
(uint32_t *)&avail ,
(unsigned char *)recPtr ,
(uint32_t )recSize );
// advance p by how many bytes we stored into "p"
p += avail;
// check for error
if ( err != Z_OK ) {
g_errno = ECOMPRESSFAILED;
log("query: Compression of cache cacheId=%i "
"failed err=%"INT32" avail=%"INT32" collnum=%"INT32" "
"recSize=%"INT32".",
cacheId , (int32_t)err ,
saved , (int32_t)collnum , recSize );
return true;
}
}
else {
// bail if not enough room!
if ( recSize > pend - p ) return true;
// otherwise, store it
gbmemcpy ( p, recPtr, recSize );
// advance p by how many bytes we stored into "p"
p += recSize;
}
// . size of whole request, key and the serialized/compressed Msg40
// . "size" is set by call to ::compress() above
int32_t requestSize = p - buf; // p - buf + size
// size of the part of the request that goes into the cache
int32_t cacheRecSize = p - cacheRec ;
// store the key
*(key_t *) buf = key;
// if we are that host, store it ourselves right now
if ( host->m_hostId == g_hostdb.m_hostId ) {
RdbCache *c = &g_genericCache[(int)m_cacheId];
if ( ! c->addRecord ( collnum ,
key ,
cacheRec ,
cacheRecSize ) )
log("query: Failed to add compressed search results "
"page to cache cacheId=%i: %s.",
cacheId, mstrerror(g_errno));
return true;
}
// make a request to hold it
char *request = (char *) mdup ( buf , requestSize , "Msg17" );
if ( ! request ) {
log("query: Failed to allocate %i bytes to hold cache "
"cacheId=%"INT32" for transmission to caching host.",
cacheId, requestSize);
return true;
}
QUICKPOLL(niceness);
// . send it as a request to the appropriate machine
// . this returns false and sets g_errno on error
// . returns true if it blocks
if ( ! g_udpServer.sendRequest ( request , // request
requestSize , // request size
0x17 , // msgType 0x17
host->m_ip ,
host->m_port , // low priority
host->m_hostId ,
NULL ,
this , // state data
gotReplyWrapper17b ,
timeout ,// timeout in secs
-1 ,
-1 ,
NULL ,
0 ,
niceness )){// cback nice
log("query: Had error sending request to cache cacheid=%i: "
" %s.",cacheId,mstrerror(g_errno));
mfree ( request , requestSize , "Msg17" );
return true;
}
// count as in progress
s_numInProgress++;
// we blocked
return false;
}
// got reply from a store request
void gotReplyWrapper17b ( void *state , UdpSlot *slot ) {
// . don't let udp server free m_buf, we own it
// . not anymore, we don't want caller to delay just to store this!
//slot->m_sendBufAlloc = NULL;
// dec the count
s_numInProgress--;
}
// . Dns.cpp uses key.n1 but we keep using key.n0 here so we can re-use old
// saved caches
Host *Msg17::getResponsibleHost ( key_t key , Hostdb * hostdb ) {
// get the hostNum that should handle this
int32_t hostId = key.n0 % hostdb->getNumHosts();
// return it if it is alive
if ( ! hostdb->isDead ( hostId ) ) return hostdb->getHost ( hostId );
// how many hosts are up?
int32_t numAlive = hostdb->getNumHostsAlive();
// if all dead return NULL
if ( numAlive == 0 ) return NULL;
// try another hostNum
int32_t hostNum = key.n0 % numAlive;
// otherwise, chain to him
int32_t count = 0;
for ( int32_t i = 0 ; i < g_hostdb.m_numHosts ; i++ ) {
// get the ith host
Host *host = &hostdb->m_hosts[i];
// skip him if he is dead
if ( hostdb->isDead ( host ) ) continue;
// count it if alive, continue if not our number
if ( count++ != hostNum ) continue;
// we got a match, we cannot use hostNum as the hostId now
// because the host with that hostId might be dead
return host;
}
// Msg17 does not need to set this
//g_errno = EDEADHOST;
return NULL;
}