open-source-search-engine/Msg8b.cpp
2013-12-10 15:28:04 -08:00

722 lines
21 KiB
C++

#include "gb-include.h"
#include "Msg8b.h"
#include "Collectiondb.h"
//#include "CollectionRec.h"
static void gotListWrapper ( void *state );//, RdbList *list ) ;
static void handleRequest8b ( UdpSlot *slot, long niceness );
static void gotMulticastReplyWrapper8b( void *state, void *state2 );
static void gotCatRecWrapper ( void *state );//, CatRec *catrec );
//static void doneSending_ass ( void *state, UdpSlot *slot );
// JAB: warning abatement
//static void gotMsg8bsReplyWrapper ( void *state, CatRec *catrec );
//static void gotMsg22sReplyWrapper ( void *state );
//static void gotMsgcsReplyWrapper ( void *state, long ip );
static Msg8bListQueue g_msg8bQueue[MSG8BQUEUE_SIZE];
static bool g_isMsg8bQueueInitialized = false;
bool Msg8b::registerHandler ( ) {
// . register with udp server
if ( ! g_udpServer.registerHandler ( 0x8b, handleRequest8b ) )
return false;
//if ( ! g_udpServer2.registerHandler ( 0x08, handleRequest8 ) )
// return false;
return true;
}
// . get the CatRec for this url/coll
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . CatRec will be set to default site for "url" using default tagdb file
// if no site has been defined specifically for "url"
// . updateFlag is added to check if the caller
// is calling it for read or write(add/update/delete) operation.
// if updateFlag is true then tagdb cache is not used
bool Msg8b::getCatRec ( Url *url ,
char *coll ,
long collLen ,
bool useCanonicalName ,
long niceness ,
CatRec *cr ,
void *state ,
void (* callback)(void *state ) ) {
// clear g_errno
g_errno = 0;
// warning
//if ( ! coll ) log(LOG_LOGIC,"net: NULL collection. msg8b.");
// store the calling parameters in this class for retrieval by callback
m_state = state;
m_callback = callback;
m_url = url;
//m_coll = coll;
//m_collLen = collLen;
m_cr = cr;
m_niceness = niceness;
bool isIp = m_url->isIp();
//m_triedIp = isIp;
// now find the min/max keys so we can call ../rdb/Msg0.h to get a list
key_t startKey;
key_t endKey ;
// normalize the url
g_catdb.normalizeUrl(url, &m_normalizedUrl);
m_url = &m_normalizedUrl;
// make catdb only in the main collection
//m_coll = g_conf.m_dirColl;
//m_collLen = gbstrlen(m_coll);
// catdb uses a dummy collection now, should not be looked at
//m_coll = "catdb";
//m_collLen = 5;
//m_collnum = g_collectiondb.getCollnum ( m_coll , m_collLen );
// . first, try it by canonical domain name
// . if that finds no matches, then try it by ip domain
g_catdb.getKeyRange ( isIp, m_url, &startKey, &endKey );
// get the groupid
//m_groupId = startKey.n1 & g_hostdb.m_groupMask;
m_shardNum = getShardNum ( RDB_CATDB , &startKey );
// reset the xml's in case they were already set
m_cr->reset();
//
// forward
//
if ( getMyShardNum() != m_shardNum ) {//g_hostdb.m_groupId!=m_groupId){
// coll, url, niceness(1), rdbid(1), useCanonicalName(1)
long requestSize = m_url->getUrlLen() + 4 + 3;
// make the request
char *p = m_request;
*(long *)p = m_url->getIp() ; p+=4;
//*p = RDB_CATDB ; p++;
*p = (char)niceness ; p++;
*p = (char)useCanonicalName; p++;
// coll
//memcpy(p, m_coll, m_collLen);
//p += m_collLen;
//*p = '\0';
//p++;
// url
memcpy(p, m_url->getUrl(), m_url->getUrlLen());
p += m_url->getUrlLen();
*p = '\0';
p++;
// size and check
m_requestSize = p - m_request;
if ( m_requestSize != requestSize ) {
log ( "Msg8b: request size %li != %li, bad engineer.",
m_requestSize, requestSize );
char *xx = NULL; *xx = 0;
}
QUICKPOLL(m_niceness);
// send the group request
if ( ! m_mcast.send ( m_request,
m_requestSize,
0x8b,
false, // multicase own request?
m_shardNum,//m_groupId,
false, // send to whole group?
startKey.n1, // key
this, // state data
NULL, // state data
gotMulticastReplyWrapper8b,
3600*24*365, // timeout, one year
m_niceness,
false, // realtime
-1, // firstHostId
NULL, // reply buf
0, // reply size
true, // free reply buf?
true, // disk load balance?
0, // max cache age
0, // cache key
RDB_CATDB,
-1 ) ) // read size
return true;
return false;
}
//
// local lookup
//
// min rec sizes
//long minRecSizes = 256*1024;
// blogspot.com was not showing up! make this 1MB -- MDW
// get min rec sizes from the original collection
//CollectionRec *cr = g_collectiondb.getRec ( m_coll ,
// m_collLen );
long minRecSizes = g_conf.m_catdbMinRecSizes;
// reset the list completely
//m_list.reset();
// if url's canonical hostname is an ip try lookup by ip domain only
// this is checked above
//if ( m_url->isIp() ) return getCatRecByIp();
m_localList.reset();
m_list = &m_localList;
// if we should NOT lookup based on cannoical name then try ip here
if ( ! useCanonicalName ) {
//m_localList.reset();
//m_list = &m_localList;
m_queueMaster = false;
m_queueSlave = false;
m_queueSlot = -1;
return gotList();
}
// if url has no ip do a warning
//if (!url->hasIp()) log(0,"Msg8b::getCatRec: warning: url has no ip");
QUICKPOLL(m_niceness);
// check the queue for the desired list
if ( !checkQueueForList ( startKey.n1 ) ) {
// . summon the powerful Msg0(extracts lists from remote rdb's)
// . store the candidate NORMAL tagdb recs in the list in
// rec itself so we don't have to copy from the list
if ( ! m_msg0.getList (
-1 , // hostId
0 , // host ip
0 , // host port ,
0 , // max cached age in seconds (60)
false , // add net recv'd list to cache?
RDB_CATDB, // specifies the rdb, 1 = tagdb
"",//NULL,//m_coll ,
//&m_list ,
m_list ,
startKey ,
endKey ,
minRecSizes, // minRecSizes(TODO: make bigger?)
this , // state
gotListWrapper , // callback
m_niceness , // niceness
true , // doErrorCorrection
true , // includeTree
true , // doMerge
-1 , // firstHostId
0 , // startFileNum
-1 , // numFiles,
3600*24*365 , // timeout, one year
-1 , // syncPoint
1 ) ) // prefer local reads
return false;
// first allow slaves to process with the list
if ( m_queueMaster )
processSlaves();
// . this should set m_cr no matter what
// . sets g_errno on failure
if (!gotList())
return false;
// get indirect catids
getIndirectCatids();
// done, clean up master slot
if ( m_queueMaster )
cleanSlot();
return true;
}
// attached to queue, wait for the master
return false;
}
/*
bool Msg8b::getCatRecByIp ( ) {
// now find the min/max keys so we can call ../rdb/Msg0.h to get a list
key_t startKey;
key_t endKey ;
// so we don't try again forever
m_triedIp = true;
// now try the lookup by ip domain
g_catdb.getKeyRange (true,m_url,&startKey,&endKey);
// check the queue for the desired list
if ( !checkQueueForList ( startKey.n1 ) ) {
// . summon the powerful Msg0(extracts lists from remote rdb's)
// . store the candidate NORMAL tagdb recs in the list in
// rec itself so we don't have to copy from the list
if ( ! m_msg0.getList ( -1 , // hostId
0 , // host ip
0 , // host port ,
0 , // max cached age in seconds (60)
false , // add net recv'd list to cache?
RDB_CATDB , // specifies the rdb, 1 = tagdb
m_coll ,
//&m_list ,
m_list ,
startKey ,
endKey ,
1024*64 , // minRecSizes (TODO: make bigger?)
this , // state
gotListWrapper , // callback
m_niceness ) ) // niceness
return false;
// first allow slaves to process with the list
if ( m_queueMaster )
processSlaves();
// . this should set m_xml and m_xmlLen appropriately
// . sets g_errno on failure
//return gotList();
if (!gotList())
return false;
// get indirect catids
getIndirectCatids();
// done, clean up master slot
if ( m_queueMaster )
cleanSlot();
return true;
}
// attached to queue, wait for the master
return false;
}
*/
void gotListWrapper ( void *state ) { //, RdbList *list ) {
Msg8b *THIS = (Msg8b *) state;
// first allow slaves to process with the list
if ( THIS->m_queueMaster )
THIS->processSlaves();
// return if this blocks
if ( ! THIS->gotList() ) return;
// get indirect catids
THIS->getIndirectCatids();
// done, clean up master slot
if ( THIS->m_queueMaster )
THIS->cleanSlot();
// otherwise give control back to the caller's callback -- we're done
THIS->m_callback ( THIS->m_state );//, THIS->m_cr );
}
void gotMulticastReplyWrapper8b ( void *state , void *state2 ) {
Msg8b *THIS = (Msg8b*)state;
THIS->gotReply ( );
THIS->m_callback ( THIS->m_state );//, THIS->m_cr );
}
// . get the site rec from the reply
void Msg8b::gotReply ( ) {
// check for error
if ( g_errno ) {
log ( "Msg8b: Reply had error: %s", mstrerror(g_errno));
return;
}
long long startTime = gettimeofdayInMilliseconds();
// get the reply
long replySize;
long replyMaxSize;
bool freeit;
char *reply = m_mcast.getBestReply ( &replySize,
&replyMaxSize,
&freeit );
relabel( reply, replyMaxSize, "Msg8b-GBR" );
//if the replysize is 0, then give an error
//actually g_errno should already be set.
if ( !reply || replySize <= 0 )
g_errno = EBADREPLY;
// set the site rec with the reply and original url
else if (reply && replySize > 0) {
// deserialize
char *p = reply;
long dataSize = *(long*)p; p += 4;
char *data = p; p += dataSize;
bool gotByIp = *p; p++;
bool hadRec = *p; p++;
long numIndCatids = *(long*)p; p+=4;
long *indCatids = (long*)p; p += numIndCatids*4;
// sanity check
if (p - reply != replySize) {
log("Msg8b: Deserialized reply size %i != %li",
p - reply, replySize );
char *xx = NULL; *xx = 0;
}
QUICKPOLL(m_niceness);
// get site file num and catids from reply
m_cr->set ( m_url ,
data ,
dataSize ,
gotByIp ); // gotByIp?
m_cr->m_hadRec = hadRec;
// set the indirect catids
m_cr->setIndirectCatids(indCatids, numIndCatids);
// we have to free it
mfree ( reply , replyMaxSize , "Msg8b" );
}
long long now = gettimeofdayInMilliseconds();
long long msg8bTook = now - startTime;
if(msg8bTook > 10)
log(LOG_INFO, "admin: gotreply for msg8b took %lli",
msg8bTook);
}
class State08b {
public:
Msg8b m_msg8b;
CatRec m_catrec;
UdpSlot *m_slot;
UdpServer *m_us;
long m_niceness;
//char m_rdbId;
Url m_url;
};
// . request for a CatRec
// . must call g_udpServer.senReply() or sendErrorReply()
void handleRequest8b ( UdpSlot *slot, long netnice ) {
// if niceness is 0, use the higher priority udpServer
UdpServer *us = &g_udpServer;
//if ( netnice == 0 ) us = &g_udpServer2;
// get the request
char *request = slot->m_readBuf;
long requestSize = slot->m_readBufSize;
// parse the request
char *p = request;
long ip = *(long *)p ; p+=4;
//char rdbId = *p ; p++;
long niceness = (long)*p ; p++;
bool useCanonicalName = *p; p++;
// coll
//char *coll = p;
//long collLen = gbstrlen(coll);
//p += collLen + 1;
// url
char *url = p;
long urlLen = gbstrlen(url);
p += urlLen + 1;
// sanity check
if (p - request != requestSize) {
log("build: Msg8b: Read Request Size %i != %li, bad engineer.",
p - request, requestSize);
char *xx = NULL; *xx = 0;
}
// create the state
State08b *st8b;
try { st8b = new (State08b); }
catch ( ... ) {
g_errno = ENOMEM;
log("Msg8b: new(%i): %s", sizeof(State08b),mstrerror(g_errno));
us->sendErrorReply ( slot, g_errno );
return;
}
mnew ( st8b , sizeof(State08b) , "Msg8b" );
// fill the state
st8b->m_slot = slot;
st8b->m_us = us;
st8b->m_niceness = niceness;
//st8b->m_rdbId = rdbId;
st8b->m_url.set(url, urlLen,false);
st8b->m_url.setIp(ip);
// call the local msg8b to get the site rec
if ( ! st8b->m_msg8b.getCatRec ( &st8b->m_url,
NULL,//coll,
0,//collLen,
useCanonicalName,
niceness,
&st8b->m_catrec,
(void*)st8b,
gotCatRecWrapper ) )
return;
// call wrapper
gotCatRecWrapper ( st8b );//, &st8b->m_catrec );
}
// send the site rec back in the reply
void gotCatRecWrapper ( void *state ) { // , CatRec *catrec ) {
char *p;
// state
State08b *st8b = (State08b*)state;
// get it
CatRec *catrec = &st8b->m_catrec;
// get udp slot and server
UdpSlot *slot = st8b->m_slot;
UdpServer *us = st8b->m_us;
// check for error
if ( g_errno ) {
mdelete ( st8b , sizeof(State08b) , "Msg8b" );
delete (st8b);
us->sendErrorReply(slot, g_errno);
return;
}
// serialize the reply: data, dataSize(4), gotByIp(1), hadRec(1),
long dataSize = catrec->m_dataSize + 6;
// add indirect catids: numIndCatids(4), indCatids
dataSize += 4 + catrec->m_numIndCatids*4;
// check if we're bigger than the tmp buf
char *data = slot->m_tmpBuf;
if (dataSize > TMPBUFSIZE) {
data = (char*)mmalloc(dataSize, "Msg8breply");
if (!data) {
log("build: Msg8b: Can't allocate %li bytes for reply.",
dataSize);
// clean up the state
mdelete ( st8b , sizeof(State08b) , "Msg8b" );
delete (st8b);
g_errno = ENOMEM;
us->sendErrorReply(slot, g_errno);
return;
}
}
p = data;
memcpy(p, &catrec->m_dataSize, 4);
p += 4;
memcpy(p, catrec->m_data, catrec->m_dataSize);
p += catrec->m_dataSize;
memcpy(p, &catrec->m_gotByIp, 1);
p++;
memcpy(p, &catrec->m_hadRec, 1);
p++;
memcpy(p, &catrec->m_numIndCatids, 4);
p += 4;
memcpy(p, catrec->m_indCatids, catrec->m_numIndCatids*4);
p += catrec->m_numIndCatids*4;
// sanity check
if (p - data != dataSize) {
log("Msg8b: Reply Size %i != %li",
p - data, dataSize);
char *xx = NULL; *xx = 0;
}
// clean up the state
mdelete ( st8b , sizeof(State08b) , "Msg8b" );
delete (st8b);
// send the reply
us->sendReply_ass ( data,
dataSize,
data,
dataSize,
slot );
}
// . returns false if blocks, true otherwise
// . sets g_errno on error
// . each normal tagdb record has the following format:
// templateKey (12 bytes) then non-NULL-terminated site string
bool Msg8b::gotList ( ) {
// return on error
if (g_errno){
log("build: Had error getting ruleset record: %s.",
mstrerror(g_errno));
m_list->reset();
return true;
}
// . get the collection rec
//CollectionRec *cr = g_collectiondb.getRec ( m_coll , m_collLen );
//long siteFileNum = -1;
// watch out, if no url just default the damn thing
if ( m_url->getUrlLen() <= 0 ) {
// use host name as the site
Url site;
site.set ( m_url->getHost() , m_url->getHostLen(),false );
// no match in tagdb or regular expressions, so use default
//if ( cr ) siteFileNum = cr->m_defaultSiteFileNum;
//else siteFileNum = 0;
// . use the default site file num as specified by the
// collection rec
// . don't use the url for the site!!
m_cr->set ( &site , //m_coll , m_collLen , //0,//siteFileNum ,
CATREC_CURRENT_VERSION );
QUICKPOLL(m_niceness);
// free the list
m_list->reset();
return true;
}
// set "gotIt" to true if we found a match in this list of tagdb recs
char gotIt = false;
// record and record size
long recSize;
char *rec;
//rec = g_catdb->getRec ( &m_list , m_url , &recSize );
rec = g_catdb.getRec(m_list,m_url,&recSize,NULL,0);//m_coll,m_collLen);
// if record found then set it and also set gotIt to true
if ( rec ) {
// get site file num from "rec"
m_cr->set ( m_url, rec , recSize ,
false ); //m_triedIp /*gotByIp*/);
// got it
gotIt = true;
}
QUICKPOLL(m_niceness);
//bool defaultSet = false;
// free the list
// don NOT free the list yet, we have to get the INDIRECT catids!
//m_list->reset();
// . if we did not find a match, try looking up by ip domain name
// . turn this off for tagdb lookups for now
// . we might want to leave it off for performance since i don't
// think it is a good idea to "ban" ips, too risky...
//if ( ! gotIt && m_url->hasIp() && ! m_triedIp )
// return getCatRecByIp();
// we are just catdb, so if we don't got it now, don't check url
// filters table
return true;
}
// get indirect catids for catdb
void Msg8b::getIndirectCatids ( ) {
// get the indirect catids
char *matchRecs[MAX_IND_CATIDS];
long matchRecSizes[MAX_IND_CATIDS];
long numMatches = g_catdb.getIndirectMatches (
m_list,
m_url,
matchRecs,
matchRecSizes,
MAX_IND_CATIDS,
NULL,//m_coll,
0);//m_collLen);
// parse out the catids from the matches
m_cr->m_numIndCatids = 0;
for ( long i = 0; i < numMatches; i++ ) {
char *p = matchRecs[i];
// num catids for this rec
char numCatids = *p;
p++;
// copy the catids over
char *pend = p + numCatids*4;
while ( m_cr->m_numIndCatids < MAX_IND_CATIDS &&
p < pend ) {
m_cr->m_indCatids[m_cr->m_numIndCatids] = *(long*)p;
p += 4;
m_cr->m_numIndCatids++;
}
}
}
// . checks the Msg8b queue for the desired list
// . if it exists, it will attach this Msg8b to it and set m_queueSlave
// . if it doesn't, it will setup a new slot in the queue and set
// m_queueMaster
// . if the queue is full, both master and slave will be false and the
// local RdbList will be used
// . returns true if attached to queue, false if not and msg0 should
// be called
bool Msg8b::checkQueueForList ( unsigned long domainHash ) {
// make sure the queue is initialized
if ( !g_isMsg8bQueueInitialized ) {
for ( long i = 0; i < MSG8BQUEUE_SIZE; i++ ) {
g_msg8bQueue[i].m_list.reset();
g_msg8bQueue[i].m_numAttached = 0;
g_msg8bQueue[i].m_domainHash = 0xffffffff;
g_msg8bQueue[i].m_isOpen = 0;
}
g_isMsg8bQueueInitialized = true;
}
// loop through the queue looking for the domainHash
long firstOpen = -1;
Msg8bListQueue *slot;
for (long i = 0; i < MSG8BQUEUE_SIZE; i++) {
slot = &g_msg8bQueue[i];
// check for open slot
if ( slot->m_domainHash == 0xffffffff ) {
if ( firstOpen < 0 )
firstOpen = i;
continue;
}
// check the slot for existing list
if ( slot->m_domainHash == domainHash &&
slot->m_numAttached < MSG8BQUEUE_MAX_ATTACHED &&
slot->m_isOpen == 1 ) {
// become a slave to this slot
m_queueMaster = false;
m_queueSlave = true;
m_list = &slot->m_list;
m_queueSlot = i;
slot->m_attachedMsg8bs[slot->m_numAttached] = this;
slot->m_numAttached++;
return true;
}
}
// do not put this here because firstOpen is set from the g_msg8bQueue
// array above, and this may indeed call another
// Msg8b::checkQueueForList but instead with niceness 0 and mess up
// the table. let's make it more atomic
//QUICKPOLL(m_niceness);
// . no hit found
// . if firstOpen < 0, no slots are open, use local RdbList
if ( firstOpen < 0 ) {
m_localList.reset();
m_list = &m_localList;
m_queueMaster = false;
m_queueSlave = false;
m_queueSlot = -1;
}
// . otherwise become the master of the open slot
else {
slot = &g_msg8bQueue[firstOpen];
//slot->m_attachedMsg8bs[0] = this;
//slot->m_numAttached = 1;
slot->m_domainHash = domainHash;
slot->m_isOpen = 1;
slot->m_masterMsg8b = this;
m_queueMaster = true;
m_queueSlave = false;
m_list = &slot->m_list;
m_queueSlot = firstOpen;
}
// move down here to be safe
QUICKPOLL(m_niceness);
return false;
}
void Msg8b::processSlaves() {
// if a queue master, call the slaves
if ( !m_queueMaster ) return;
// . could this grow durring the call? *it could reattach to itself
// be careful, close the slot
Msg8bListQueue *slot = &g_msg8bQueue[m_queueSlot];
slot->m_isOpen = 0;
for ( long i = 0; i < slot->m_numAttached; i++ ) {
Msg8b *slave = slot->m_attachedMsg8bs[i];
// . call the slave's gotList
// . if it blocks, it's getting by IP and released
// from this queue.
if (!slave->gotList())
continue;
// . otherwise call its callback to finish
slave->m_callback ( slave->m_state );//, slave->m_cr );
}
// clean up this slot
//slot->m_list.reset();
//slot->m_numAttached = 0;
//slot->m_domainHash = 0xffffffff;
}
void Msg8b::cleanSlot() {
if ( !m_queueMaster ) return;
// clean up the master slot
Msg8bListQueue *slot = &g_msg8bQueue[m_queueSlot];
slot->m_list.reset();
slot->m_numAttached = 0;
slot->m_domainHash = 0xffffffff;
slot->m_masterMsg8b = NULL;
slot->m_isOpen = 0;
}