#include "gb-include.h" #include "UdpServer.h" #include "Hostdb.h" #include "Msg0.h" // for getRdb(char rdbId) #include "Msg4.h" #include "Tfndb.h" #include "Clusterdb.h" #include "Spider.h" //#include "Checksumdb.h" #include "Datedb.h" #include "Rdb.h" //#include "Indexdb.h" #include "Profiler.h" #include "Repair.h" #include "Multicast.h" #include "Syncdb.h" ////////////// // // Send out our records to add every X ms here: // // Batching up the add requests saves udp traffic // on large networks (100+ hosts). // // . currently: send out adds once every 500ms // . when this was 5000ms (5s) it would wait like // 5s to spider a url after adding it. // ////////////// //#define MSG4_WAIT 500 // article1.html and article11.html are dups but they are being spidered // within 500ms of another #define MSG4_WAIT 100 // we have up to this many outstanding Multicasts to send add requests to hosts #define MAX_MCASTS 128 Multicast s_mcasts[MAX_MCASTS]; Multicast *s_mcastHead = NULL; Multicast *s_mcastTail = NULL; int32_t s_mcastsOut = 0; int32_t s_mcastsIn = 0; // we have one buffer for each host in the cluster static char *s_hostBufs [MAX_HOSTS]; static int32_t s_hostBufSizes [MAX_HOSTS]; static int32_t s_numHostBufs; // . each host has a 32k add buffer which is sent when full or every 10 seconds // . buffer will be more than 32k if the record to add is larger than 32k #define MAXHOSTBUFSIZE (32*1024) // the linked list of Msg4s waiting in line static Msg4 *s_msg4Head = NULL; static Msg4 *s_msg4Tail = NULL; // . TODO: use this instead of spiderrestore.dat // . call this once for every Msg14 so it can add all at once... // . make Msg14 add the links before anything else since that uses Msg10 // . also, need to update spiderdb rec for the url in Msg14 using Msg4 too! // . need to add support for passing in array of lists for Msg14 static void gotReplyWrapper4 ( void *state , void *state2 ) ; static void storeLineWaiters ( ) ; static void handleRequest4 ( UdpSlot *slot , int32_t niceness ) ; static void sleepCallback4 ( int bogusfd , void *state ) ; static bool sendBuffer ( int32_t hostId , int32_t niceness ) ; static Multicast *getMulticast ( ) ; static void returnMulticast ( Multicast *mcast ) ; //static void processSpecialSignal ( collnum_t collnum , char *p ) ; //static bool storeList2 ( RdbList *list , char rdbId , collnum_t collnum, // bool forceLocal, bool splitList , int32_t niceness ); static bool storeRec ( collnum_t collnum , char rdbId , uint32_t gid , int32_t hostId , char *rec , int32_t recSize , int32_t niceness ) ; // all these parameters should be preset bool registerHandler4 ( ) { // register ourselves with the udp server if ( ! g_udpServer.registerHandler ( 0x04, handleRequest4 ) ) return false; // clear the host bufs s_numHostBufs = g_hostdb.getNumShards(); for ( int32_t i = 0 ; i < s_numHostBufs ; i++ ) s_hostBufs[i] = NULL; // init the linked list of multicasts s_mcastHead = &s_mcasts[0]; s_mcastTail = &s_mcasts[MAX_MCASTS-1]; for ( int32_t i = 0 ; i < MAX_MCASTS - 1 ; i++ ) s_mcasts[i].m_next = &s_mcasts[i+1]; // last guy has nobody after him s_mcastTail->m_next = NULL; // nobody is waiting in line s_msg4Head = NULL; s_msg4Tail = NULL; // spider hang bug //logf(LOG_DEBUG,"msg4: registering handler."); // for now skip it //return true; // . restore state from disk // . false means repair is not active if ( ! loadAddsInProgress ( NULL ) ) { log("init: Could not load addsinprogress.dat. Ignoring."); g_errno = 0; } // . register sleep handler every 5 seconds = 5000 ms // . right now MSG4_WAIT is 500ms... i lowered it from 5s // to speed up spidering so it would harvest outlinks // faster and be able to spider them right away. // . returns false on failure return g_loop.registerSleepCallback(MSG4_WAIT,NULL,sleepCallback4 ); } static void flushLocal ( ) ; // scan all host bufs and try to send on them void sleepCallback4 ( int bogusfd , void *state ) { // wait for clock to be in sync if ( ! isClockInSync() ) return; // flush them buffers flushLocal(); } void flushLocal ( ) { g_errno = 0; // put the line waiters into the buffers in case they are not there //storeLineWaiters(); // now try to send the buffers for ( int32_t i = 0 ; i < s_numHostBufs ; i++ ) sendBuffer ( i , MAX_NICENESS ); g_errno = 0; } //static void (* s_flushCallback) ( void *state ) = NULL ; //static void * s_flushState = NULL; // for holding flush callback data static SafeBuf s_callbackBuf; static int32_t s_numCallbacks = 0; class CBEntry { public: int64_t m_timestamp; void (*m_callback)(void *); void *m_callbackState; }; // . injecting into the "qatest123" coll flushes after each inject // . returns false if blocked and callback will be called bool flushMsg4Buffers ( void *state , void (* callback) (void *) ) { // if all empty, return true now if ( ! hasAddsInQueue () ) return true; // how much per callback? int32_t cbackSize = sizeof(CBEntry); // ensure big enough for first call if ( s_callbackBuf.m_capacity == 0 ) { // length() == 0 ) { // make big if ( ! s_callbackBuf.reserve ( 300 * cbackSize ) ) { // return true with g_errno set on error log("msg4: error allocating space for flush callback"); return true; } // then init s_callbackBuf.zeroOut(); } // scan for empty slot char *buf = s_callbackBuf.getBufStart(); CBEntry *cb = (CBEntry *)buf; CBEntry *cbEnd = (CBEntry *)(buf + s_callbackBuf.getCapacity()); // find empty slot for ( ; cb < cbEnd && cb->m_callback ; cb++ ) ; // no room? if ( cb >= cbEnd ) { log("msg4: no room for flush callback. count=%"INT32"", (int32_t)s_numCallbacks); g_errno = EBUFTOOSMALL; return true; } // add callback to list // time must be the same as used by UdpSlot::m_startTime cb->m_callback = callback; cb->m_callbackState = state; // inc count s_numCallbacks++; //if ( s_flushCallback ) { char *xx=NULL;*xx=0; } // start it up flushLocal(); // scan msg4 slots for maximum start time so we can only // call the flush done callback when all msg4 slots in udpserver // have start times STRICTLY GREATER THAN that, then we will // be guaranteed that everything we added has been replied to! UdpSlot *slot = g_udpServer.getActiveHead(); int64_t max = 0LL; for ( ; slot ; slot = slot->m_next ) { // get its time stamp if ( slot->m_msgType != 0x04 ) continue; // must be initiated by us if ( ! slot->m_callback ) continue; // get it if ( max && slot->m_startTime < max ) continue; // got a new max max = slot->m_startTime; } // set time AFTER the udpslot gets its m_startTime set so // now will be >= each slot's m_startTime. cb->m_timestamp = max; // can we sometimes flush without blocking? maybe... //if ( ! hasAddsInQueue () ) return true; // assign it //s_flushState = state; //s_flushCallback = callback; // we are waiting now return false; } // used by Repair.cpp to make sure we are not adding any more data ("writing") bool hasAddsInQueue ( ) { // if there is an outstanding multicast... if ( s_mcastsOut > s_mcastsIn ) return true; // if we have a msg4 waiting in line... if ( s_msg4Head ) return true; // if we have an host buf that has something in it... for ( int32_t i = 0 ; i < s_numHostBufs ; i++ ) { if ( ! s_hostBufs[i] ) continue; if ( *(int32_t *)s_hostBufs[i] > 4 ) return true; } // otherwise, we have nothing queued up to add return false; } /* // . returns false if blocked, true otherwise // . returns true and sets g_errno on error // . forceLocal was removed because if you want to delete a corrupt key in // spiderdb, it should be removed in a merge or something... bool Msg4::addList ( RdbList *list , char rdbId , char *coll , void *state , void (* callback)(void *state) , int32_t niceness , bool forceLocal , bool splitList ) { // warning if ( ! coll ) { g_errno = EBADENGINEER; log(LOG_LOGIC,"net: NULL collection. msg4.cpp."); return true; } // save it strcpy ( m_coll , coll ); // make it a collnum collnum_t collnum = g_collectiondb.getCollnum ( coll ); // is it non-existent if ( collnum == (collnum_t)-1 ) { g_errno = EBADENGINEER; log(LOG_LOGIC,"build: msg4: bad coll %s",coll); return true; } // then re-call return addList ( list , rdbId , collnum , state , callback , niceness, forceLocal, splitList); } // . returns false if blocked, true otherwise // . returns true and sets g_errno on error bool Msg4::addList ( RdbList *list , char rdbId , collnum_t collnum , void *state , void (* callback)(void *state) , int32_t niceness , bool forceLocal , bool splitList ) { // sanity check //if ( niceness != MAX_NICENESS ) { char *xx=NULL;*xx=0;} // clear it g_errno = 0; // if list has no records in it return true if ( ! list || list->isEmpty() ) return true; // save it m_rdbId = rdbId; m_niceness = niceness; // this is only valid if storeList() ends up returning false, otherwise // the caller may free it m_list = list; m_callback = callback; m_state = state; m_forceLocal = forceLocal; m_splitList = splitList; // MDW: make sure we point to start of the list list->resetListPtr(); // MDW: need to reset the list ptr, otherwise twin might not get the // recs if we are adding to spiderdb list->resetListPtr(); // . store the list in the buffer for each hostid // . have a buffer for each host and each rdb // . and each collnum // . this returns true if stored, false if could not store if ( ! storeList ( list , rdbId , collnum ) ) return false; // launch anyone that needs it //launchBuffers ( ); // otherwise, g_errno should be set return true; } // . split the list up into pieces based on hostname // . call storeSubList() on each piece // . returns false and sets g_errno on failure bool Msg4::storeList ( RdbList *list , char rdbId , collnum_t collnum ) { // sanity check if ( rdbId < 0 ) { char *xx=NULL;*xx=0; } // nobody is after us in the linked list m_next = NULL; // . if others are waiting in line, we must wait in line to in case // there is an order dependency in the records being added // . however, if we are the head of the list we are being called from // handleReply4() and this is an attempt to finish processing this // list. //if ( s_msg4Tail && this != s_msg4Head ) { if ( s_msg4Tail ) { // sanity check -- detect re-use of a blocked msg4! if ( this == s_msg4Head ) { char *xx =NULL; *xx=0; } if ( ! s_msg4Head ) { char *xx =NULL; *xx=0; } // spider hang bug //logf(LOG_DEBUG, // "db: msg4 blocked. adding to tail. msg4=%"INT32"",(int32_t)this); s_msg4Tail->m_next = this; s_msg4Tail = this; return false; } // this returns true if all of the records in the list were // successfully stored in the s_hostBufs[] buffers for sending, // otherwise it returns false and we must call storeList2() again // for this list when a Multicast becomes available. if ( storeList2 ( list , rdbId , collnum , m_forceLocal, m_splitList, m_niceness ) ) return true; // spider hang bug //logf(LOG_DEBUG,"build: msg4 first in line. msg4=%"INT32"",(int32_t)this); // sanity check if ( s_msg4Head || s_msg4Tail ) { char *xx=NULL; *xx=0; } // . wait in line // . when the s_hostBufs[hostId] is able to accomodate our // record this loop will be resumed and the caller's callback // will be called once we are able to successfully queue up // all recs in the list // . we are the only one in line, otherwise, we would have exited // the start of this function s_msg4Head = this; s_msg4Tail = this; // return false so caller blocks. we will call his callback // when we are able to add his list to the hostBufs[] queue // and then he can re-use this Msg4 class for other things. return false; } bool storeList2 ( RdbList *list , char rdbId , collnum_t collnum , bool forceLocal, bool splitList , int32_t niceness ) { // get groupId of each key uint32_t gid; char key[MAX_KEY_BYTES]; // sanity check if ( rdbId < 0 ) { log("repair: Consider erasing repair.dat and " "repair-addsinprogress.dat to restart the repair IF " "you were doing a repair."); char *xx=NULL;*xx=0; } // store each record in the list into the send buffers while ( ! list->isExhausted() ) { // get the key of the current record list->getCurrentKey ( key ); // . if key belongs to same group as firstKey then continue // . titledb now uses last bits of docId to determine groupId // . but uses the top 32 bits of key still // . spiderdb uses last 64 bits to determine groupId // . tfndb now is like titledb(top 32 bits are top 32 of docId) if(forceLocal) gid = g_hostdb.m_groupId; else gid = getGroupId ( rdbId , key , splitList ); char *rec = list->getCurrentRec(); int32_t recSize = list->getCurrentRecSize(); // i fixed UdpServer.cpp to NOT call msg4 handlers when in // a quickpoll, in case we receive a niceness 0 msg4 request QUICKPOLL(niceness); // MAX_NICENESS); // convert the gid to the hostid of the first host in this // group. uses a quick hash table. int32_t hostId = g_hostdb.makeHostIdFast ( gid ); // . add that rec to this groupId, gid, includes the key // . these are NOT allowed to be compressed (half bit set) // and this point // . this returns false and sets g_errno on failure if ( storeRec ( collnum, rdbId, gid, hostId, rec, recSize , niceness )) { // . point to next record // . will point past records if no more left! list->skipCurrentRecord(); // get next rec continue; } // g_errno is not set if the store rec could not send the // buffer because no multicast was available if ( g_errno ) log("build: Msg4 storeRec had error: %s.", mstrerror(g_errno)); // clear this just in case g_errno = 0; return false; } return true; } */ // returns false if blocked bool Msg4::addMetaList ( char *metaList , int32_t metaListSize , char *coll , void *state , void (* callback)(void *state) , int32_t niceness , char rdbId ) { collnum_t collnum = g_collectiondb.getCollnum ( coll ); return addMetaList ( metaList , metaListSize , collnum , state , callback , niceness , rdbId ); } bool Msg4::addMetaList ( SafeBuf *sb , collnum_t collnum , void *state , void (* callback)(void *state) , int32_t niceness , char rdbId , int32_t shardOverride ) { return addMetaList ( sb->getBufStart() , sb->length() , collnum , state , callback , niceness , rdbId , shardOverride ); } bool Msg4::addMetaList ( char *metaList , int32_t metaListSize , collnum_t collnum , void *state , void (* callback)(void *state) , int32_t niceness , char rdbId , // Rebalance.cpp needs to add negative keys to // remove foreign records from where they no // longer belong because of a new hosts.conf file. // This will be -1 if not be overridden. int32_t shardOverride ) { // not in progress m_inUse = false; // empty lists are easy! if ( metaListSize == 0 ) return true; // sanity //if ( collnum < 0 || collnum > 1000 ) { char *xx=NULL;*xx=0; } if ( collnum < 0 ) { char *xx=NULL;*xx=0; } // if first time set this m_currentPtr = metaList; m_metaList = metaList; m_metaListSize = metaListSize; m_collnum = collnum; m_state = state; m_callback = callback; m_rdbId = rdbId; m_niceness = niceness; m_next = NULL; m_shardOverride = shardOverride; retry: // get in line if there's a line if ( s_msg4Head ) { // add ourselves to the line s_msg4Tail->m_next = this; // we are the new tail s_msg4Tail = this; // debug log. seems to happen a lot if not using threads.. if ( g_conf.m_useThreads ) log("msg4: queueing body msg4=0x%"PTRFMT"",(PTRTYPE)this); // mark it m_inUse = true; // all done then, but return false so caller does not free // this msg4 return false; } // then do it if ( addMetaList2 ( ) ) return true; // . sanity check // . we sometimes get called with niceness 0 from possibly // an injection or something and from a quickpoll // inside addMetList2() in which case our addMetaList2() will // fail, assuming s_msg4Head got set, BUT it SHOULD be OK because // being interrupted at the one QUICKPOLL() in addMetaList2() // doesn't seem like it would hurt. // . FURTHEMORE the multicast seems to always be called with // MAX_NICENESS so i'm not sure how niceness 0 will really help // with any of this stuff. //if ( s_msg4Head || s_msg4Tail ) { char *xx=NULL; *xx=0; } if ( s_msg4Head || s_msg4Tail ) { log("msg4: got unexpected head"); // :) goto retry; } // . spider hang bug // . debug log. seems to happen a lot if not using threads.. if ( g_conf.m_useThreads ) logf(LOG_DEBUG,"msg4: queueing head msg4=0x%"PTRFMT"",(PTRTYPE)this); // mark it m_inUse = true; // . wait in line // . when the s_hostBufs[hostId] is able to accomodate our // record this loop will be resumed and the caller's callback // will be called once we are able to successfully queue up // all recs in the list // . we are the only one in line, otherwise, we would have exited // the start of this function s_msg4Head = this; s_msg4Tail = this; // return false so caller blocks. we will call his callback // when we are able to add his list to the hostBufs[] queue // and then he can re-use this Msg4 class for other things. return false; } bool isInMsg4LinkedList ( Msg4 *msg4 ) { Msg4 *m = s_msg4Head; for ( ; m ; m = m->m_next ) if ( m == msg4 ) return true; return false; } bool Msg4::addMetaList2 ( ) { char *p = m_currentPtr; // get the collnum //collnum_t collnum = g_collectiondb.getCollnum ( m_coll ); char *pend = m_metaList + m_metaListSize; //if ( m_collnum < 0 || m_collnum > 1000 ) { char *xx=NULL;*xx=0; } if ( m_collnum < 0 ) { char *xx=NULL;*xx=0; } // store each record in the list into the send buffers for ( ; p < pend ; ) { // first is rdbId char rdbId = m_rdbId; if ( rdbId < 0 ) rdbId = *p++; // get nosplit //bool nosplit = ( rdbId & 0x80 ) ; // mask off rdbId rdbId &= 0x7f; // get the key of the current record char *key = p; // negative key? bool del ; if ( *p & 0x01 ) del = false; else del = true; // tmp debug //if ( del ) { char *xx=NULL;*xx=0;} // get the key size. a table lookup in Rdb.cpp. int32_t ks ; if ( rdbId == RDB_POSDB || rdbId == RDB2_POSDB2) ks = 18; else if ( rdbId == RDB_DATEDB ) ks = 16; else ks = getKeySizeFromRdbId ( rdbId ); // skip key p += ks; // set this //bool split = true; if ( nosplit ) split = false; // . if key belongs to same group as firstKey then continue // . titledb now uses last bits of docId to determine groupId // . but uses the top 32 bits of key still // . spiderdb uses last 64 bits to determine groupId // . tfndb now is like titledb(top 32 bits are top 32 of docId) //uint32_t gid = getGroupId ( rdbId , key , split ); uint32_t shardNum = getShardNum( rdbId , key ); // override it from Rebalance.cpp for redistributing records // after updating hosts.conf? if ( m_shardOverride >= 0 ) shardNum = m_shardOverride; // get the record, is -1 if variable. a table lookup. int32_t dataSize; if ( rdbId==RDB_POSDB || rdbId==RDB2_POSDB2) dataSize = 0; else if ( rdbId == RDB_DATEDB ) dataSize = 0; else dataSize = getDataSizeFromRdbId ( rdbId ); // . negative keys have no data // . this unfortunately is not true according to RdbList.cpp if ( del ) dataSize = 0; // if variable read that in if ( dataSize == -1 ) { // -1 means to read it in dataSize = *(int32_t *)p; // sanity check if ( dataSize < 0 ) { char *xx=NULL;*xx=0; } // sanity check //if ( rdbId == RDB_DOLEDB && // (*key & 0x01) == 0x01 && // positive key // dataSize <= 0 ) { // char *xx=NULL;*xx=0; } // skip dataSize p += 4; } // skip over the data, if any p += dataSize; // breach us? if ( p > pend ) { char *xx=NULL;*xx=0; } // i fixed UdpServer.cpp to NOT call msg4 handlers when in // a quickpoll, in case we receive a niceness 0 msg4 request QUICKPOLL(m_niceness); // MAX_NICENESS); // convert the gid to the hostid of the first host in this // group. uses a quick hash table. //int32_t hostId = g_hostdb.makeHostIdFast ( gid ); Host *hosts = g_hostdb.getShard ( shardNum ); int32_t hostId = hosts[0].m_hostId; // . add that rec to this groupId, gid, includes the key // . these are NOT allowed to be compressed (half bit set) // and this point // . this returns false and sets g_errno on failure if ( storeRec ( m_collnum, rdbId, shardNum,//gid, hostId, key, // start of rec, p - key , // recSize m_niceness )) { // . point to next record // . will point past records if no more left! m_currentPtr = p; // += recSize; // debug log // int off = (int)(m_currentPtr-m_metaList); // log("msg4: cpoff=%i",off); // if ( off == 5271931 ) // log("msg4: hey"); // debug // get next rec continue; } // g_errno is not set if the store rec could not send the // buffer because no multicast was available if ( g_errno ) log("build: Msg4 storeRec had error: %s.", mstrerror(g_errno)); // clear this just in case g_errno = 0; // if g_errno was not set, this just means we do not have // room for the data yet, and try again later return false; } // . send out all bufs // . before we were caching to reduce packet traffic, but // since we don't use the network for sending termlists let's // try going back to making it even more real-time //if ( ! isClockInSync() ) return true; // flush them buffers //flushLocal(); // in case this was being used to hold the data, free it m_tmpBuf.purge(); return true; } // . modify each Msg4 request as follows // . collnum(2bytes)|rdbId(1bytes)|listSize&rawlistData|... // . store these requests in the buffer just like that bool storeRec ( collnum_t collnum , char rdbId , uint32_t shardNum, //gid int32_t hostId , char *rec , int32_t recSize , int32_t niceness ) { // loop back up here if you have to flush the buffer retry: // sanity check //if ( recSize==16 && rdbId==RDB_SPIDERDB && *(int32_t *)(rec+12)!=0 ) { // char *xx=NULL; *xx=0; } // . how many bytes do we need to store the request? // . USED(4 bytes)/collnum/rdbId(1)/recSize(4bytes)/recData // . "USED" is only used for mallocing new slots really int32_t needForRec = sizeof(collnum_t) + 1 + 4 + recSize; int32_t needForBuf = 4 + needForRec; // 8 bytes for the zid needForBuf += 8; // how many bytes of the buffer are occupied or "in use"? char *buf = s_hostBufs[hostId]; // if NULL, try to allocate one if ( ! buf || s_hostBufSizes[hostId] < needForBuf ) { // how big to make it int32_t size = MAXHOSTBUFSIZE; // must accomodate rec at all costs if ( size < needForBuf ) size = needForBuf; // make them all the same size buf = (char *)mmalloc ( size , "Msg4a" ); // if still no luck, we cannot send this msg if ( ! buf ) return false; if(s_hostBufs[hostId]) { //if the old buf was too small, resize gbmemcpy( buf, s_hostBufs[hostId], *(int32_t*)(s_hostBufs[hostId])); mfree ( s_hostBufs[hostId], s_hostBufSizes[hostId] , "Msg4b" ); } // if we are making a brand new buf, init the used // size to "4" bytes else // itself(4) PLUS the zid (8 bytes) *(int32_t *)buf = 4 + 8; // add it s_hostBufs [hostId] = buf; s_hostBufSizes[hostId] = size; } // . first int32_t is how much of "buf" is used // . includes everything even itself int32_t used = *(int32_t *)buf; // sanity chec. "used" must include the 4 bytes of itself if ( used < 12 ) { char *xx = NULL; *xx = 0; } // how much total buf space do we have, used or unused? int32_t maxSize = s_hostBufSizes[hostId]; // how many bytes are available in "buf"? int32_t avail = maxSize - used; // if we can not fit list into buffer... if ( avail < needForRec ) { // . send what is already in the buffer and clear it // . will set s_hostBufs[hostId] to NULL // . this will return false if no available Multicasts to // send the buffer, in which case we must tell the caller // to block and wait for us to call his callback, only then // will he be able to proceed. we will call his callback // as soon as we can copy... use this->m_msg1 to add the // list that was passed in... if ( ! sendBuffer ( hostId , niceness ) ) return false; // now the buffer should be empty, try again goto retry; } // point to where to store the list char *start = buf + used; char *p = start; // store the record and all the info for it *(collnum_t *)p = collnum; p += sizeof(collnum_t); *(char *)p = rdbId ; p += 1; *(int32_t *)p = recSize; p += 4; gbmemcpy ( p , rec , recSize ); p += recSize; // update buffer used *(int32_t *)buf = used + (p - start); // all done, did not "block" return true; } // . returns false if we were UNable to get a multicast to launch the buffer, // true otherwise // . returns false and sets g_errno on error bool sendBuffer ( int32_t hostId , int32_t niceness ) { //logf(LOG_DEBUG,"build: sending buf"); // how many bytes of the buffer are occupied or "in use"? char *buf = s_hostBufs [hostId]; int32_t allocSize = s_hostBufSizes[hostId]; // skip if empty if ( ! buf ) return true; // . get size used in buf // . includes everything, including itself! int32_t used = *(int32_t *)buf; // if empty, bail if ( used <= 12 ) return true; // grab a vehicle for sending the buffer Multicast *mcast = getMulticast(); // if we could not get one, wait in line for one to become available if ( ! mcast ) { //logf(LOG_DEBUG,"build: no mcast available"); return false; } // NO! storeRec() will alloc it! /* // make it point to another char *newBuf = (char *)mmalloc ( MAXHOSTBUFSIZE , "Msg4Buf" ); // assign it to the new Buf s_hostBufs [ hostId ] = newBuf; // reset used if ( newBuf ) { *(int32_t *)newBuf = 4; s_hostBufSizes[hostId] = MAXHOSTBUFSIZE; } else s_hostBufSizes[hostId] = 0; //if we were oom reset size */ // get groupId //uint32_t groupId = g_hostdb.getGroupIdFromHostId ( hostId ); Host *h = g_hostdb.getHost(hostId); uint32_t shardNum = h->m_shardNum; // get group # //int32_t groupNum = g_hostdb.getGroupNum ( groupId ); // sanity check. our clock must be in sync with host #0's or with // a host from his group, group #0 if ( ! isClockInSync() ) { log("msg4: msg4: warning sending out adds but clock not in " "sync with host #0"); //char *xx=NULL ; *xx=0; } } // try to keep all zids unique, regardless of their group static uint64_t s_lastZid = 0; // select a "zid", a sync id uint64_t zid = gettimeofdayInMilliseconds(); // keep it strictly increasing if ( zid <= s_lastZid ) zid = s_lastZid + 1; // update it s_lastZid = zid; // shift up 1 so Syncdb::makeKey() is easier zid <<= 1; // set some things up char *p = buf + 4; // . sneak it into the top of the buffer // . TODO: fix the code above for this new header *(uint64_t *)p = zid; p += 8; // syncdb debug if ( g_conf.m_logDebugSpider ) logf(LOG_DEBUG,"syncdb: sending msg4 request zid=%"UINT64"",zid); // this is the request char *request = buf; int32_t requestSize = used; // . launch the request // . we now have this multicast timeout if a host goes dead on it // and it fails to send its payload // . in that case we should restart from the top and we will add // the dead host ids to the top, and multicast will avoid sending // to hostids that are dead now key_t k; k.setMin(); if ( mcast->send ( request , // sets mcast->m_msg to this requestSize, // sets mcast->m_msgLen to this 0x04 , // msgType for add rdb record false , // does multicast own msg? shardNum,//groupId , // group to send to (groupKey) true , // send to whole group? 0 , // key is useless for us (void *)(PTRTYPE)allocSize , // state data (void *)mcast , // state data gotReplyWrapper4 , // this was 60 seconds, but if we saved the // addsinprogress at the wrong time we might miss // it when its between having timed out and // having been resent by us! 999999999 , // timeout in secs MAX_NICENESS, // niceness false , // realtime -1 , // first host to try NULL , // replyBuf = NULL , 0 , // replyBufMaxSize = 0 , true , // freeReplyBuf = true , false , // doDiskLoadBalancing = false , -1 , // no max cache age limit k , // cache key RDB_NONE , // bogus rdbId -1 , // unknown minRecSizes read size true )) { // sendToSelf? // . let storeRec() do all the allocating... // . only let the buffer go once multicast succeeds s_hostBufs [ hostId ] = NULL; // success return true; } // g_errno should be set log("net: Had error when sending request to add data to rdb shard " "#%"UINT32": %s.", shardNum,mstrerror(g_errno)); returnMulticast ( mcast ); return false; } Multicast *getMulticast ( ) { // get head Multicast *avail = s_mcastHead; // return NULL if none available if ( ! avail ) return NULL; // if all are out then forget it! if ( s_mcastsOut - s_mcastsIn >= MAX_MCASTS ) return NULL; // remove from head of linked list s_mcastHead = avail->m_next; // if we were the tail, none now if ( s_mcastTail == avail ) s_mcastTail = NULL; // count it s_mcastsOut++; // sanity if ( avail->m_inUse ) { char *xx=NULL;*xx=0; } // return that return avail; } void returnMulticast ( Multicast *mcast ) { // return this multicast mcast->reset(); // we are at the tail, nobody is after us mcast->m_next = NULL; // if no tail we are both head and tail if ( ! s_mcastTail ) s_mcastHead = mcast; // put after the tail else s_mcastTail->m_next = mcast; // and we are the new tail s_mcastTail = mcast; // count it s_mcastsIn++; } // just free the request void gotReplyWrapper4 ( void *state , void *state2 ) { //logf(LOG_DEBUG,"build: got msg4 reply"); int32_t allocSize = (int32_t)(PTRTYPE)state; Multicast *mcast = (Multicast *)state2; // get the request we sent char *request = mcast->m_msg; //int32_t requestSize = mcast->m_msgSize; // get the buffer alloc size //int32_t allocSize = requestSize; //if ( allocSize < MAXHOSTBUFSIZE ) allocSize = MAXHOSTBUFSIZE; if ( request ) mfree ( request , allocSize , "Msg4" ); // make sure no one else can free it! mcast->m_msg = NULL; // get the udpslot that is replying here UdpSlot *replyingSlot = mcast->m_slot; if ( ! replyingSlot ) { char *xx=NULL;*xx=0; } returnMulticast ( mcast ); storeLineWaiters ( ); // try to launch more msg4 requests in waiting // // now if all buffers are empty, let any flush request know that // // bail if no callbacks to call if ( s_numCallbacks == 0 ) return; //log("msg4: got msg4 reply. replyslot starttime=%"INT64" slot=0x%"XINT32"", // replyingSlot->m_startTime,(int32_t)replyingSlot); // get the oldest msg4 slot starttime UdpSlot *slot = g_udpServer.getActiveHead(); int64_t min = 0LL; for ( ; slot ; slot = slot->m_next ) { // get its time stamp if ( slot->m_msgType != 0x04 ) continue; // must be initiated by us if ( ! slot->m_callback ) continue; // if it is this replying slot or already had the callback // called, then ignore it... if ( slot->m_calledCallback ) continue; // ignore incoming slot! that could be the slot we were // waiting for to complete so its starttime will always // be less than our callback's m_timestamp //if ( slot == replyingSlot ) continue; // log it //log("msg4: slot starttime = %"INT64" ",slot->m_startTime); // get it if ( min && slot->m_startTime >= min ) continue; // got a new min min = slot->m_startTime; } // log it //log("msg4: slots min = %"INT64" ",min); // scan for slots whose callbacks we can call now char *buf = s_callbackBuf.getBufStart(); CBEntry *cb = (CBEntry *)buf; CBEntry *cbEnd = (CBEntry *)(buf + s_callbackBuf.getCapacity()); // find empty slot for ( ; cb < cbEnd ; cb++ ) { // skip if empty if ( ! cb->m_callback ) continue; // debug //log("msg4: cb timestamp = %"INT64"",cb->m_timestamp); // wait until callback's stored time is <= all msg4 // slot's start times, then we can guarantee that all the // msg4s required for this callback have replied. // min will be zero if no msg4s in there, so call callback. if ( min && cb->m_timestamp >= min ) continue; // otherwise, call the callback! cb->m_callback ( cb->m_callbackState ); // take out of queue now by setting callback ptr to 0 cb->m_callback = NULL; // discount s_numCallbacks--; } // of course, skip this part if nobody called a flush //if ( ! s_flushCallback ) return; // if not completely empty, wait! if ( hasAddsInQueue () ) { // flush away some more just in case flushLocal(); // and wait return; } // seems good to go! //s_flushCallback ( s_flushState ); // nuke it //s_flushCallback = NULL; } void storeLineWaiters ( ) { // try to store all the msg4's lists that are waiting in line loop: Msg4 *msg4 = s_msg4Head; // now were we waiting on a multicast to return in order to send // another request? return if not. if ( ! msg4 ) return; // grab the first Msg4 in line. ret fls if blocked adding more of list. if ( ! msg4->addMetaList2 ( ) ) return; // hey, we were able to store that Msg4's list, remove him s_msg4Head = msg4->m_next; // empty? make tail NULL too then if ( ! s_msg4Head ) s_msg4Tail = NULL; // . if his callback was NULL, then was loaded in loadAddsInProgress() // . we no longer do that so callback should never be null now if ( ! msg4->m_callback ) { char *xx=NULL;*xx=0; } // log this now i guess. seems to happen a lot if not using threads if ( g_conf.m_useThreads ) logf(LOG_DEBUG,"msg4: calling callback for msg4=0x%"PTRFMT"", (PTRTYPE)msg4); // release it msg4->m_inUse = false; // call his callback msg4->m_callback ( msg4->m_state ); // ensure not re-added - no, msg4 might be freed now! //msg4->m_next = NULL; // try the next Msg4 in line goto loop; } #include "Process.h" // . destroys the slot if false is returned // . this is registered in Msg4::set() to handle add rdb record msgs // . seems like we should always send back a reply so we don't leave the // requester's slot hanging, unless he can kill it after transmit success??? // . TODO: need we send a reply back on success???? // . NOTE: Must always call g_udpServer::sendReply or sendErrorReply() so // read/send bufs can be freed void handleRequest4 ( UdpSlot *slot , int32_t netnice ) { // easy var UdpServer *us = &g_udpServer; // if we just came up we need to make sure our hosts.conf is in // sync with everyone else before accepting this! it might have // been the case that the sender thinks our hosts.conf is the same // since last time we were up, so it is up to us to check this if ( g_pingServer.m_hostsConfInDisagreement ) { g_errno = EBADHOSTSCONF; us->sendErrorReply ( slot , g_errno ); return; } // need to be in sync first if ( ! g_pingServer.m_hostsConfInAgreement ) { // . if we do not know the sender's hosts.conf crc, wait 4 it // . this is 0 if not received yet if ( ! slot->m_host->m_pingInfo.m_hostsConfCRC ) { g_errno = EWAITINGTOSYNCHOSTSCONF; us->sendErrorReply ( slot , g_errno ); return; } // compare our hosts.conf to sender's otherwise if ( slot->m_host->m_pingInfo.m_hostsConfCRC != g_hostdb.getCRC() ) { g_errno = EBADHOSTSCONF; us->sendErrorReply ( slot , g_errno ); return; } } //logf(LOG_DEBUG,"build: handling msg4 request"); // extract what we read char *readBuf = slot->m_readBuf; int32_t readBufSize = slot->m_readBufSize; // must at least have an rdbId if ( readBufSize < 7 ) { g_errno = EREQUESTTOOSHORT; us->sendErrorReply ( slot , g_errno ); return; } //char *p = readBuf; //char *pend = readBuf + readBufSize; // get total buf used int32_t used = *(int32_t *)readBuf; //p += 4; // sanity check if ( used != readBufSize ) { // if we send back a g_errno then multicast retries forever // so just absorb it! log("msg4: got corrupted request from hostid %"INT32" " "used=%"INT32" != %"INT32"=readBufSize msg4", slot->m_host->m_hostId, used, readBufSize); us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ) ; //us->sendErrorReply(slot,ECORRUPTDATA);return;} return; } bool skipSyncdb = false; // skip syncdb if we are just one host! if ( g_hostdb.m_numHosts == 1 ) skipSyncdb = true; // if we did not sync our parms up yet with host 0, wait... if ( g_hostdb.m_hostId != 0 && ! g_parms.m_inSyncWithHost0 ) { // limit logging to once per second static int32_t s_lastTime = 0; int32_t now = getTimeLocal(); if ( now - s_lastTime >= 1 ) { s_lastTime = now; log("msg4: waiting to sync with " "host #0 before accepting data"); } // tell send to try again int16_tly g_errno = ETRYAGAIN; us->sendErrorReply(slot,g_errno); return; } // OK, just to get the ball rolling let's delay using/debugging // syncdb until after launch in order to move up the launch date. // we are going to be running solid states so there should be a lot // fewer hardware issues... skipSyncdb = true; if ( skipSyncdb ) { // this returns false with g_errno set on error if ( ! addMetaList ( readBuf , slot ) ) { us->sendErrorReply(slot,g_errno); return; } // good to go us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ) ; return; } // . add to syncdb tree // . a key_t is now before the "used" // . this returns false and sets g_errno if we could not add it to // syncdb OR if there were some msg4 requests we should have got // before this one! // . in the first case it will set g_errno to ETRYAGAIN probably, // but if out of order it will just set g_errno to EOUTOFSYNC i guess if ( ! g_syncdb.gotMetaListRequest ( slot ) ) { us->sendErrorReply(slot,g_errno);return; } // . chalk it up // . it may have multiple different rdb items in the list now! //rdb->sentReplyAdd ( 0 ); // . send an empty (non-error) reply as verification // . slot should be auto-nuked on transmission/timeout of reply // . udpServer should free the readBuf us->sendReply_ass ( NULL , 0 , NULL , 0 , slot ) ; } // . Syncdb.cpp will call this after it has received checkoff keys from // all the alive hosts for this zid/sid // . returns false and sets g_errno on error, returns true otherwise bool addMetaList ( char *p , UdpSlot *slot ) { if ( g_conf.m_logDebugSpider ) logf(LOG_DEBUG,"syncdb: calling addMetalist zid=%"UINT64"", *(int64_t *)(p+4)); // get total buf used int32_t used = *(int32_t *)p; // the end char *pend = p + used; // skip the used amount p += 4; // skip zid p += 8; Rdb *rdb = NULL; char lastRdbId = -1; // . this request consists of multiple recs, so add each one // . collnum(2bytes)/rdbId(1byte)/recSize(4bytes)/recData/... loop: // extract collnum, rdbId, recSize collnum_t collnum = *(collnum_t *)p; p += sizeof(collnum_t); char rdbId = *(char *)p; p += 1; int32_t recSize = *(int32_t *)p; p += 4; // int16_tcut //UdpServer *us = &g_udpServer; // . get the rdb to which it belongs, use Msg0::getRdb() // . do not call this for every rec if we do not have to if ( rdbId != lastRdbId ) { rdb = getRdbFromId ( (char) rdbId ); // skip RDBFAKEDB //if ( rdbId == RDB_FAKEDB ) { // // do special handler process // processSpecialSignal ( collnum , p ); // // skip the fakedb record // p += recSize; // // drop it for now!! // if ( p < pend ) goto loop; // // all done // return true; //} // an uninitialized secondary rdb? it will have a keysize // of 0 if its never been intialized from the repair page. // don't core any more, we probably restarted this shard // and it needs to wait for host #0 to syncs its // g_conf.m_repairingEnabled to '1' so it can start its // Repair.cpp repairWrapper() loop and init the secondary // rdbs so "rdb" here won't be NULL any more. if ( rdb && rdb->m_ks <= 0 ) { time_t currentTime = getTime(); static time_t s_lastTime = 0; if ( currentTime > s_lastTime + 10 ) { s_lastTime = currentTime; log("msg4: oops. got an rdbId key for a " "secondary " "rdb and not in repair mode. waiting to " "be in repair mode."); g_errno = ETRYAGAIN; return false; //char *xx=NULL;*xx=0; } } if ( ! rdb ) { if ( slot ) log("msg4: rdbId of %"INT32" unrecognized " "from hostip=%s. " "dropping WHOLE request", (int32_t)rdbId, iptoa(slot->m_ip)); else log("msg4: rdbId of %"INT32" unrecognized. " "dropping WHOLE request", (int32_t)rdbId); g_errno = ETRYAGAIN; return false; // drop it for now!! //if ( p < pend ) goto loop; // all done //return true; char *xx=NULL;*xx=0; // silently drop it, the WHOLE thing, it seems // corrupted!!! return true; //g_errno = EBADENGINEER; //return false; } //if ( ! rdb ) return false; } // . if already in addList and we are quickpoll interruptint, try again // . happens if our niceness gets converted to 0 if ( rdb->m_inAddList ) { g_errno = ETRYAGAIN; return false; } // sanity check if ( p + recSize > pend ) { g_errno = ECORRUPTDATA; return false; } // reset g_errno g_errno = 0; // . make a list from this data // . skip over the first 4 bytes which is the rdbId // . TODO: embed the rdbId in the msgtype or something... RdbList list; // sanity check if ( rdb->getKeySize() == 0 ) { log("seems like a stray /e/repair-addsinprogress.dat file " "rdbId=%"INT32". waiting to be in repair mode." ,(int32_t)rdbId); //not in repair mode. dropping.",(int32_t)rdbId); g_errno = ETRYAGAIN; return false; char *xx=NULL;*xx=0; // drop it for now!! p += recSize; if ( p < pend ) goto loop; // all done return true; } // set the list list.set ( p , recSize , p , recSize , rdb->getFixedDataSize() , false , // ownData? rdb->useHalfKeys() , rdb->getKeySize () ); // advance over the rec data to point to next entry p += recSize; // keep track of stats rdb->readRequestAdd ( recSize ); // this returns false and sets g_errno on error bool status =rdb->addList(collnum, &list, MAX_NICENESS ); // bad coll #? ignore it. common when deleting and resetting // collections using crawlbot. but there are other recs in this // list from different collections, so do not abandon the whole // meta list!! otherwise we lose data!! if ( g_errno == ENOCOLLREC && !status ) { g_errno = 0; status = true; } // do the next record here if there is one if ( status && p < pend ) goto loop; // no memory means to try again if ( g_errno == ENOMEM ) g_errno = ETRYAGAIN; // doing a full rebuid will add collections if ( g_errno == ENOCOLLREC && g_repairMode > 0 ) //g_repair.m_fullRebuild ) g_errno = ETRYAGAIN; // ignore enocollrec errors since collection can be reset while // spiders are on now. //if ( g_errno == ENOCOLLREC ) // g_errno = 0; // are we done if ( g_errno ) return false; // success return true; } // // serialization code // // . when we core, save this stuff so we can re-add when we come back up // . have a sleep wrapper that tries to flush the buffers every 10 seconds // or so. // . returns false on error, true on success // . does not do any mallocs in case we are OOM and need to save // . BUG: might be trying to send an old bucket, so scan udp slots too? or // keep unsent buckets in the list? bool saveAddsInProgress ( char *prefix ) { if ( g_conf.m_readOnlyMode ) return true; // this does not work so skip it for now //return true; // open the file char filename[1024]; // if saving while in repair mode, that means all of our adds must // must associated with the repair. if we send out these add requests // when we restart and not in repair mode then we try to add to an // rdb2 which has not been initialized and it does not work. if ( ! prefix ) prefix = ""; sprintf ( filename , "%s%saddsinprogress.saving", g_hostdb.m_dir , prefix ); int32_t fd = open ( filename, O_RDWR | O_CREAT | O_TRUNC , S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH ); if ( fd < 0 ) { log ("build: Failed to open %s for writing: %s", filename,strerror(errno)); return false; } log(LOG_INFO,"build: Saving %s",filename); // the # of host bufs write ( fd , (char *)&s_numHostBufs , 4 ); // serialize each hostbuf for ( int32_t i = 0 ; i < s_numHostBufs ; i++ ) { // get the size int32_t used = 0; // if not null, how many bytes are used in it? if ( s_hostBufs[i] ) used = *(int32_t *)s_hostBufs[i]; // size of the buf write ( fd , (char *)&used , 4 ); // skip if none if ( ! used ) continue; // if only 4 bytes used, that is basically empty, the first // 4 bytes is how much of the total buffer is used, including // those 4 bytes. if ( used == 4 ) continue; // the buf itself write ( fd , s_hostBufs[i] , used ); } // scan in progress msg4 requests too! UdpSlot *slot = g_udpServer.m_head2; for ( ; slot ; slot = slot->m_next2 ) { // skip if not msg4 if ( slot->m_msgType != 0x04 ) continue; // skip if we did not initiate it if ( ! slot->m_callback ) continue; // skip if got reply if ( slot->m_readBuf ) continue; // write hostid sent to write ( fd , &slot->m_hostId , 4 ); // write that write ( fd , &slot->m_sendBufSize , 4 ); // then the buf data itself write ( fd , &slot->m_sendBuf , slot->m_sendBufSize ); } // MDW: if msg4 was stored in the linked list then caller // never got his callback called, so the spider will redo // this url later... // . serialize each Msg4 that is waiting in line // . need to preserve their list ptrs so to avoid re-adds? /* Msg4 *msg4 = s_msg4Head; while ( msg4 ) { msg4->save ( fd ); // next msg4 msg4 = msg4->m_next; } */ // all done close ( fd ); // if all was successful, rename the file char newFilename[1024]; // if saving while in repair mode, that means all of our adds must // must associated with the repair. if we send out these add requests // when we restart and not in repair mode then we try to add to an // rdb2 which has not been initialized and it does not work. sprintf ( newFilename , "%s%saddsinprogress.dat", g_hostdb.m_dir , prefix ); ::rename ( filename , newFilename ); return true; } /* bool Msg4::save ( int fd ) { int16_t collLen = gbstrlen(m_coll); // collLen write ( fd , &collLen , 2 ); // coll, as a string in case coll is deleted and another added write ( fd , coll , collLen + 1 ); // offset int32_t offset = m_currentPtr - m_metaList; // might as well avoid re-adds write ( fd , (char *)&offset , 4 ); // list size (4 bytes) write ( fd , (char *)&m_metaListSize , 4 ); // list data write ( fd , m_metaList , m_metaListSize ); return true; } */ // . returns false on an unrecoverable error, true otherwise // . sets g_errno on error bool loadAddsInProgress ( char *prefix ) { if ( g_conf.m_readOnlyMode ) return true; // open the file char filename[1024]; // . a load when in repair mode means something special // . see Repair.cpp's call to loadAddState() // . if we saved the add state while in repair mode when we exited // then we need to restore just that if ( ! prefix ) prefix = ""; sprintf ( filename, "%s%saddsinprogress.dat", g_hostdb.m_dir , prefix ); // if file does not exist, return true, not really an error struct stat stats; stats.st_size = 0; int status = stat ( filename , &stats ); if ( status != 0 && errno == ENOENT ) return true; // get the fileSize into "pend" int32_t p = 0; int32_t pend = stats.st_size; int32_t fd = open ( filename, O_RDONLY ); if ( fd < 0 ) { log ("build: Failed to open %s for reading: %s", filename,strerror(errno)); g_errno = errno; return false; } log(LOG_INFO,"build: Loading %"INT32" bytes from %s",pend,filename); // . deserialize each hostbuf // . the # of host bufs int32_t numHostBufs; read ( fd , (char *)&numHostBufs , 4 ); p += 4; if ( numHostBufs != s_numHostBufs ) { g_errno = EBADENGINEER; return log("build: addsinprogress.dat has wrong number of " "host bufs."); } // deserialize each hostbuf for ( int32_t i = 0 ; i < s_numHostBufs ; i++ ) { // break if nothing left to read if ( p >= pend ) break; // USED size of the buf int32_t used; read ( fd , (char *)&used , 4 ); p += 4; // if used is 0, a NULL buffer, try to read the next one if ( used == 0 || used == 4 ) { s_hostBufs [i] = NULL; s_hostBufSizes[i] = 0; continue; } // malloc the min buf size int32_t allocSize = MAXHOSTBUFSIZE; if ( allocSize < used ) allocSize = used; // alloc the buf space, returns NULL and sets g_errno on error char *buf = (char *)mmalloc ( allocSize , "Msg4" ); if ( ! buf ) return log("build: Could not alloc %"INT32" bytes for " "reading %s",allocSize,filename); // the buf itself int32_t nb = read ( fd , buf , used ); // sanity if ( nb != used ) { // reset the buffer usage //*(int32_t *)(p-4) = 4; *(int32_t *)buf = 4; // return false return log("build: error reading addsinprogress.dat: " "%s", mstrerror(errno)); } // skip over it p += used; // sanity check if ( *(int32_t *)buf != used ) { log("build: file %s is bad.",filename); char *xx = NULL; *xx = 0; } // set the array s_hostBufs [i] = buf; s_hostBufSizes [i] = allocSize; } // scan in progress msg4 requests too that we stored in this file too for ( ; ; ) { // break if nothing left to read if ( p >= pend ) break; // hostid sent to int32_t hostId; read ( fd , (char *)&hostId , 4 ); p += 4; // get host Host *h = g_hostdb.getHost(hostId); // must be there if ( ! h ) { close (fd); return log("build: bad msg4 hostid %"INT32"",hostId); } // host many bytes int32_t numBytes; read ( fd , (char *)&numBytes , 4 ); p += 4; // allocate buffer char *buf = (char *)mmalloc ( numBytes , "msg4loadbuf"); if ( ! buf ) { close ( fd ); return log("build: could not alloc msg4 buf"); } // the buffer int32_t nb = read ( fd , buf , numBytes ); if ( nb != numBytes ) { close ( fd ); return log("build: bad msg4 buf read"); } p += numBytes; // send it! if ( ! g_udpServer.sendRequest ( buf , numBytes , 0x04 , // msgType h->m_ip , h->m_port , h->m_hostId , NULL , NULL , // state data NULL , // callback 999999999)){// seconds timeout close ( fd ); // report it return log("build: could not resend reload buf: %s", mstrerror(g_errno)); } } // MDW: if msg4 was stored in the linked list then caller // never got his callback called, so the spider will redo // this url later... // . deserialize each Msg4 that is waiting in line // . need to preserve their list ptrs so to avoid re-adds? // . format: // rdbid(1 byte) // collLen(2 byte) // coll(\0 terminated string) // listOff(4 bytes) // listSize(4 bytes) // list(listSize bytes) /* while ( p < pend ) { // make a new msg4 to hold this Msg4 *msg4 ; try { msg4 = new (Msg4); } catch ( ... ) { // return false and set g_errno on error g_errno = ENOMEM; return log("build: Msg4 new failed. Could not read in " "addsinprogress.dat."); } // register with Mem.cpp's table mnew ( msg4 , sizeof(Msg4) , "Msg4c"); char rdbId ; int16_t collLen ; char coll[MAX_COLL_LEN+1]; int32_t listOff ; int32_t listSize ; bool err = false; // read in rdbid, collLen, coll, listSize, listOffset, listData if ( read ( fd, &rdbId , 1 ) != 1 ) err = true; if ( read ( fd, &collLen , 2 ) != 2 ) err = true; if ( read ( fd, coll , collLen+1 ) != collLen+1) err =true; if ( read ( fd, &listOff , 4 ) != 4 ) err = true; if ( read ( fd, &listSize, 4 ) != 4 ) err = true; // advance read head p += 1 + 2 + collLen + 1 + 4 + 4; // make a buf for the list char *listBuf = (char *)mmalloc ( listSize , "Msg4d" ); if ( ! listBuf ) return log("build: Failed to load addsinprogress.dat."); if ( read ( fd , listBuf , listSize ) != listSize ) err = true; p += listSize; // handle read errors, return false with g_errno set if ( err ) { mfree ( listBuf , listSize , "Msg4d" ); mdelete ( msg4 , sizeof(Msg4), "Msg4" ); delete ( msg4 ); g_errno = ECORRUPTDATA; return log("build: addsinprogress.dat: %s", mstrerror(g_errno)); } // no callback, so it will be deleted when fully added msg4->m_callback = NULL; msg4->m_state = NULL; // use our own internal list msg4->m_list = &msg4->m_myList; Rdb *rdb = getRdbFromId ( rdbId ); // if had a bad rdbId, try reading the next queue if ( ! rdb ) { log("build: had bogus rdbId of %"INT32".",(int32_t)rdbId); mfree ( listBuf , listSize , "Msg4d" ); mdelete ( msg4 , sizeof(Msg4), "Msg4" ); delete ( msg4 ); return log("build: addsinprogress.dat: %s", mstrerror(g_errno)); } // otherwise, set the list to wait in line, our linked list msg4->m_list->set ( listBuf , listSize , listBuf , listSize , rdb->getFixedDataSize() , true , // ownData? rdb->useHalfKeys() , rdb->getKeySize () ); // force set the current rec ptr msg4->m_list->m_listPtr = listBuf + listOff; // init for linked list msg4->m_next = NULL; // store in linked list if ( ! s_msg4Tail ) { // hey, we are the first in the linked list s_msg4Head = msg4; s_msg4Tail = msg4; continue; } // otherwise, we are not the first s_msg4Tail->m_next = msg4; s_msg4Tail = msg4; } */ // all done close ( fd ); return true; } // // right now the FAKEDB record is a signal to remove the spider lock // from the lock table because we are done spidering it. // /* void processSpecialSignal ( collnum_t collnum , char *p ) { key_t *fake = (key_t *)p; // use a uh48 of 0 to signify an unlock operation //g_titledb.getUrlHash48 ( (key_t *)key ) == 0LL ) { // must be 96 bits //if ( m_ks != 12 ) { char *xx=NULL;*xx=0; } // get docid that was locked //int64_t d = g_titledb.getDocId ( (key_t *)key); int64_t d = fake->n0; // . make it the first probable, that is the lock key // . we do that so if we are locking a new url that // is not yet indexed, its probable docid may collide // and be incremented, so we do not know what its // actual docid will end up being... int64_t lockKey = g_titledb.getFirstProbableDocId(d); // log debug msg if ( g_conf.m_logDebugSpider) // log debug logf(LOG_DEBUG,"msg4: got FAKE titledb " "key for lockkey=%"UINT64" - removing spider lock", lockKey); // int16_tcut HashTableX *ht = &g_spiderLoop.m_lockTable; UrlLock *lock = (UrlLock *)ht->getValue ( &lockKey ); time_t nowGlobal = getTimeGlobal(); if ( g_conf.m_logDebugSpiderFlow ) logf(LOG_DEBUG,"spflow: scheduled lock removal in 5 secs for " "docid=lockkey=%"UINT64"", lockKey); // test it //if ( m_nowGlobal == 0 && lock ) // m_nowGlobal = getTimeGlobal(); // we do it this way rather than remove it ourselves // because a lock request for this guy // might be currently outstanding, and it will end up // being granted the lock even though we have by now removed // it from doledb, because it read doledb before we removed // it! so wait 5 seconds for the doledb negative key to // be absorbed to prevent a url we just spidered from being // re-spidered right away because of this sync issue. if ( lock ) lock->m_expires = nowGlobal + 5; // bitch if not in there if (!lock&&g_conf.m_logDebugSpider)//ht->isInTable(&lockKey)) logf(LOG_DEBUG,"spider: rdb: lockkey %"UINT64" " "was not in lock table",lockKey); // now unlock on that //g_spiderLoop.m_lockTable.removeKey(&lockKey); // do not actually add this fake key to titledb! //return true; } */