#include "gb-include.h" #include "Msg5.h" #include "RdbBase.h" #include "Rdb.h" //#include "Indexdb.h" #include "Stats.h" //#include "RdbCache.h" #include "Threads.h" #include "Msg0.h" #include "PingServer.h" //#include "Indexdb.h" // g_indexdb.getTruncationLimit() #include "CollectionRec.h" //#define _SANITYCHECK_ //#define _TESTNEWALGO_ 1 static void gotListWrapper ( void *state ) ; long g_numCorrupt = 0; Msg5::Msg5() { m_waitingForList = false; m_numListPtrs = 0; m_mergeLists = true; reset(); } Msg5::~Msg5() { reset(); } // frees m_treeList void Msg5::reset() { if ( m_waitingForList ) { log("disk: Trying to reset a class waiting for a reply."); // might being doing an urgent exit (mainShutdown(1)) or // g_process.shutdown(), so do not core here //char *xx = NULL; *xx = 0; } m_treeList.freeList(); //m_tfndbList.freeList(); m_msg3.reset(); m_prevCount = 0; //m_prevKey.setMin(); KEYMIN(m_prevKey,MAX_KEY_BYTES);// m_ks); m_ks is invalid m_waitingForList = false; // free lists if m_mergeLists was false for ( long i = 0 ; ! m_mergeLists && i < m_numListPtrs ; i++ ) m_listPtrs[i]->freeList(); m_numListPtrs = 0; // and the tree list m_treeList.freeList(); } /* //key_t makeCacheKey ( key_t startKey , //void makeCacheKey ( key_t startKey , // key_t endKey , void makeCacheKey ( char *startKey , char *endKey , bool includeTree , long minRecSizes , long startFileNum , //long numFiles ) { long numFiles , char *cacheKeyPtr , char ks ) { // keySize //key_t cacheKey; //cacheKey = startKey ; //cacheKey = hash96 ( endKey , cacheKey ); //cacheKey = hash96 ( (long)includeTree , cacheKey ); //cacheKey = hash96 ( (long)minRecSizes , cacheKey ); //cacheKey = hash96 ( (long)startFileNum , cacheKey ); //cacheKey = hash96 ( (long)numFiles , cacheKey ); if ( ks == 12 ) { key_t cacheKey = *(key_t *)startKey; cacheKey = hash96 ( *(key_t *)endKey , cacheKey ); cacheKey = hash96 ( (long)includeTree , cacheKey ); cacheKey = hash96 ( (long)minRecSizes , cacheKey ); cacheKey = hash96 ( (long)startFileNum , cacheKey ); cacheKey = hash96 ( (long)numFiles , cacheKey ); *(key_t *)cacheKeyPtr = cacheKey; } else { key128_t cacheKey = *(key128_t *)startKey; cacheKey = hash128 ( *(key128_t *)endKey, cacheKey ); cacheKey = hash128 ( (long)includeTree , cacheKey ); cacheKey = hash128 ( (long)minRecSizes , cacheKey ); cacheKey = hash128 ( (long)startFileNum , cacheKey ); cacheKey = hash128 ( (long)numFiles , cacheKey ); *(key128_t *)cacheKeyPtr = cacheKey; } //return cacheKey; } */ #include "Tfndb.h" //#include "Checksumdb.h" //HashTableX g_waitingTable; // . return false if blocked, true otherwise // . set g_errno on error // . fills "list" with the requested list // . we want at least "minRecSizes" bytes of records, but not much more // . we want all our records to have keys in the [startKey,endKey] range // . final merged list should try to have a size of at least "minRecSizes" // . if may fall short if not enough records were in [startKey,endKey] range // . endKey of list will be set so that all records from startKey to that // endKey are in the list // . a minRecSizes of 0x7fffffff means virtual inifinty, but it also has // another special meaning. it tells msg5 to tell RdbTree's getList() to // pre-allocate the list size by counting the recs ahead of time. bool Msg5::getList ( char rdbId , char *coll , RdbList *list , //key_t startKey , //key_t endKey , void *startKey , void *endKey , long minRecSizes , // requested scan size(-1 none) bool includeTree , bool addToCache , long maxCacheAge , // in secs for cache lookup long startFileNum , // first file to scan long numFiles , // rel. to startFileNum,-1 all void *state , // for callback void (* callback ) ( void *state , RdbList *list , Msg5 *msg5 ) , long niceness , bool doErrorCorrection , //key_t *cacheKeyPtr , // NULL if none char *cacheKeyPtr , // NULL if none long retryNum , long maxRetries , bool compensateForMerge , long long syncPoint , class Msg5 *msg5b , bool isRealMerge , bool allowPageCache , // if this is false we only check the page cache bool hitDisk , bool mergeLists ) { // make sure we are not being re-used prematurely if ( m_waitingForList ) { log("disk: Trying to reset a class waiting for a reply."); char *xx = NULL; *xx = 0; } // sanity check. we no longer have record caches! // now we do again for posdb gbdocid:xxx| restricted queries //if ( addToCache || maxCacheAge ) {char *xx=NULL;*xx=0; } // assume no error g_errno = 0; // sanity if ( ! list && mergeLists ) { char *xx=NULL;*xx=0; } // warning if ( ! coll ) log(LOG_LOGIC,"net: NULL collection. msg5."); // MUST have this if ( rdbId == RDB_TITLEDB && ! msg5b ) { log(LOG_LOGIC,"net: No msg5b supplied. 1."); char *xx = NULL; *xx = 0; } // . reset the provided list // . this will not free any mem it may have alloc'd but it will set // m_listSize to 0 so list->isEmpty() will return true if ( list ) list->reset(); // key size set m_ks = getKeySizeFromRdbId(rdbId); // . complain if endKey < startKey // . no because IndexReadInfo does this to prevent us from reading // a list //if ( startKey > endKey ) return true; if ( KEYCMP((char *)startKey,(char *)endKey,m_ks)>0 ) return true; // log("Msg5::readList: startKey > endKey warning"); // we no longer allow negative minRecSizes if ( minRecSizes < 0 ) { log(LOG_LOGIC,"net: msg5: MinRecSizes < 0, using 1."); minRecSizes = 1; char *xx = NULL; *xx = 0; } // ensure startKey last bit clear, endKey last bit set //if ( (startKey.n0 & 0x01) == 0x01 ) if ( !KEYNEG((char *)startKey) ) log(LOG_REMIND,"net: msg5: StartKey lastbit set."); // fix endkey //if ( (endKey.n0 & 0x01) == 0x00 ) { if ( KEYNEG((char *)endKey) ) { log(LOG_REMIND,"net: msg5: EndKey lastbit clear. Fixing."); //endKey.n0 |= 0x01; *((char *)endKey) |= 0x01; } QUICKPOLL(niceness); // debug msg //log("doing msg5 niceness=%li",niceness); //if ( niceness == 1 ) // log("hey!"); // timing debug //m_startTime = gettimeofdayInMilliseconds(); // remember stuff m_rdbId = rdbId; m_coll = coll; m_list = list; //m_startKey = startKey; //m_endKey = endKey; KEYSET(m_startKey,(char *)startKey,m_ks); KEYSET(m_endKey,(char *)endKey,m_ks); m_minRecSizes = minRecSizes; m_includeTree = includeTree; m_addToCache = addToCache; m_maxCacheAge = maxCacheAge; m_startFileNum = startFileNum; m_numFiles = numFiles; m_state = state; m_callback = callback; m_calledCallback= 0; m_niceness = niceness; m_maxRetries = maxRetries; m_oldListSize = 0; m_dupsRemoved = 0; m_compensateForMerge = compensateForMerge; //m_syncPoint = syncPoint; m_msg5b = msg5b; m_isRealMerge = isRealMerge; m_allowPageCache = allowPageCache; m_hitDisk = hitDisk; m_mergeLists = mergeLists; // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true; // point to cache //RdbCache *cache = base->m_rdb->getCache(); // . these 2 vars are used for error correction // . doRemoteLookup is -2 if it's up to us to decide m_doErrorCorrection = doErrorCorrection; // these get changed both by cache and gotList() m_newMinRecSizes = minRecSizes; m_round = 0; m_readAbsolutelyNothing = false; //m_fileStartKey = startKey; KEYSET(m_fileStartKey,m_startKey,m_ks); QUICKPOLL(m_niceness); // get trunc limit by collection now, not just in g_conf m_indexdbTruncationLimit = 0x7fffffff; #ifdef _SANITYCHECK_ log("msg5: sk=%s", KEYSTR(m_startKey,m_ks)); log("msg5: ek=%s", KEYSTR(m_endKey,m_ks)); #endif /* if ( rdbId == RDB_INDEXDB ) { CollectionRec *cr = g_collectiondb.getRec ( m_coll ); if ( ! cr ) { log("disk: Msg5 no coll rec."); return true; } //m_indexdbTruncationLimit = cr->m_indexdbTruncationLimit; // debug //log("trunc limit = %li",m_indexdbTruncationLimit); // Parms.cpp should never let this happen... if ( m_indexdbTruncationLimit < MIN_TRUNC ) { log("disk: trunc limit = %li", m_indexdbTruncationLimit); char *xx = NULL; *xx = 0; } } */ // debug msg and stuff //m_startKey.n1 = 1616550649; //m_startKey.n0 = (unsigned long long)10489958987685363408LL; //m_includeTree = true; //m_minRecSizes = 10080000; //m_startFileNum = 0; //m_numFiles = -1; // hack it down if ( numFiles > base->getNumFiles() ) numFiles = base->getNumFiles(); /* // if we're storing or reading from cache.. make the cache key now if ( m_maxCacheAge != 0 || m_addToCache ) { //if ( cacheKeyPtr ) m_cacheKey = *cacheKeyPtr; if ( cacheKeyPtr ) KEYSET(m_cacheKey,cacheKeyPtr,m_ks); //else m_cacheKey = makeCacheKey ( m_startKey , else makeCacheKey ( m_startKey , m_endKey , m_includeTree , m_minRecSizes , m_startFileNum , //m_numFiles ); m_numFiles , m_cacheKey , m_ks ); } */ //log("ck.n1=%lu ck.n0=%llu",m_cacheKey.n1,m_cacheKey.n0); //exit(-1); // . make sure we set base above so Msg0.cpp:268 doesn't freak out // . if startKey is > endKey list is empty //if ( m_startKey > m_endKey ) return true; if ( KEYCMP(m_startKey,m_endKey,m_ks)>0 ) return true; // same if minRecSizes is 0 if ( m_minRecSizes == 0 ) return true; /* // bring back cache for posdb. but only if posdb is merging the lists // like when it does gbdocid:xxx| restriction queries in xmldoc.cpp's // seo pipe. if ( m_maxCacheAge && m_rdbId == RDB_POSDB && //m_mergeLists && // this is obsolete me thinks getListFromTermListCache ( coll, m_startKey , m_endKey , m_maxCacheAge, list ) ) // list should now be set from the termlistcache! return true; // check to see if another request for the exact same termlist // is in progress. then wait for that if so. // so use logic like makeCacheKey() was using. long conti = 0; m_waitingKey=hash64_cont(m_startKey,m_ks,0LL,&conti); m_waitingKey=hash64_cont(m_endKey,m_ks,m_waitingKey,&conti); m_waitingKey=hash64_cont((char *)&m_minRecSizes,4,m_waitingKey,&conti); m_waitingKey=hash64_cont((char*)&m_startFileNum,4,m_waitingKey,&conti); m_waitingKey=hash64_cont((char *)&m_numFiles,4,m_waitingKey,&conti); m_waitingKey=hash64_cont((char *)&m_includeTree,1,m_waitingKey,&conti); m_waitingKey^= ((unsigned long long)rdbId) << (64-8); // init it? static bool s_waitInit = false; if ( ! s_waitInit ) { s_waitInit = true; if ( ! g_waitingTable.set(8,4,2048,NULL,0, true,m_niceness,"m5wtbl")) log("msg5: failed to init waiting table"); // ignore error g_errno = 0; } // wait for the reply to come in bool inTable = g_waitingTable.isInTable(&m_waitingKey); // always add it void *THIS = this; bool added = g_waitingTable.addKey(&m_waitingKey,&THIS); // wait for in-progress reply? if ( inTable ) { // log debug msg if ( added && rdbId == RDB_POSDB && m_addToCache ) { long long cks; cks = getTermListCacheKey(m_startKey,m_endKey); log("msg5: waiting for rdbid=%li wkey=%llx " "startkey=%s ckey=%llx", (long)rdbId,m_waitingKey,KEYSTR(m_startKey,m_ks), cks); } // we blocked and are in the waiting table if ( added ) return false; // otherwise, the addkey failed log("msg5: failed to add waitingkey: %s",mstrerror(g_errno)); g_errno = 0; } // . otherwise log as well // . you should NEVER see a dup waiting key or cks because // it should all be cached!!!! if ( rdbId == RDB_POSDB && m_addToCache ) { long long cks = getTermListCacheKey(m_startKey,m_endKey); log("msg5: hitting disk for rdbid=%li wkey=%llx " "startkey=%s ckey=%llx", (long)rdbId,m_waitingKey,KEYSTR(m_startKey,m_ks),cks); } */ /* // use g_termListCache for single docid lookups long long singleDocId = 0LL; if ( rdbId == RDB_POSDB ) { long long d1 = g_posdb.getDocId(startKey); long long d2 = g_posdb.getDocId(endKey); if ( d1+1 == d2 ) singleDocId = d1; } // if in the termlist cache, send it back right away char *trec; long trecSize; if ( singleDocId && getRecFromTermListCache(coll, m_startKey, m_endKey, 20*60, // 20 mins. maxCacheAge, &trec, &trecSize) ) { // if in cache send it back! m_list->set ( trec , // list data trecSize , // list data size trec , // alloc trecSize , // alloc size m_startKey , m_endKey , 0 , // fixeddatasize true , // own data? true , // use half keys? m_ks ); // we got it without blocking return true; } // if not found in cache it might be a query we do not match like // 'track-track' if ( singleDocId ) { // just copy ptrs from this list into m_list m_list->set ( NULL , // list data 0 , // list data size NULL , // alloc 0 , // alloc size m_startKey , m_endKey , base->getFixedDataSize() , true , // own data? base->useHalfKeys() , m_ks ); return true; } */ // . check in cache first // . we now cache everything /* if ( m_maxCacheAge != 0 ) { log(LOG_DEBUG,"net: Checking cache for list. " //"startKey.n1=%lu %llu. cacheKey.n1=%lu %llu.", //m_startKey.n1,m_startKey.n0, //m_cacheKey.n1,m_cacheKey.n0); "startKey.n1=%llx %llx. cacheKey.n1=%llx %llx.", KEY1(m_startKey,m_ks),KEY0(m_startKey), KEY1(m_cacheKey,m_ks),KEY0(m_cacheKey)); // is this list in the cache? if ( cache->getList ( base->m_collnum , m_cacheKey , m_startKey , m_list , true , // do copy? m_maxCacheAge , true )) {// incCounts? // debug msg log(LOG_DEBUG,"net: msg5: Got cache hit for %s.", base->m_dbname ); // sanity check //bool ok = m_list->checkList_r ( false , true ); //if ( ! ok ) log("GETLIST had problem"); // break out //if ( ! ok ) { char *xx = NULL; *xx = 0; } return true; } } */ // timing debug //log("Msg5:getting list startKey.n1=%lu",m_startKey.n1); // start the read loop - hopefully, will only loop once return readList ( ); } // . returns false if blocked, true otherwise // . sets g_errno on error // . reads from cache, tree and files // . calls gotList() to do the merge if we need to // . loops until m_minRecSizes is satisfied OR m_endKey is reached bool Msg5::readList ( ) { // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true; readMore: // . reset our tree list // . sets fixedDataSize here in case m_includeTree is false because // we don't want merge to have incompatible lists m_treeList.reset(); m_treeList.setFixedDataSize ( base->getFixedDataSize() ); m_treeList.m_ks = m_ks; // reset Msg3 in case gotList() is called without calling // Msg3::readList() first m_msg3.reset(); // assume lists have no errors in them m_hadCorruption = false; // . restrict tree's endkey by calling msg3 now... // . this saves us from spending 1000ms to read 100k of negative // spiderdb recs from the tree only to have most of the for naught // . this call will ONLY set m_msg3.m_endKey // . but only do this if dealing with spiderdb really // . also now for tfndb, since we scan that in RdbDump.cpp to dedup // the spiderdb list we are dumping to disk. it is really for any // time when the endKey is unbounded, so check that now char *treeEndKey = m_endKey; bool compute = true; if ( ! m_includeTree ) compute = false; // if endKey is "unbounded" then bound it... char max[MAX_KEY_BYTES]; KEYMAX(max,m_ks); if ( KEYCMP(m_endKey,max,m_ks) != 0 ) compute = false; // BUT don't bother if a small list, probably faster just to get it if ( m_newMinRecSizes < 1024 ) compute = false; // try to make merge read threads higher priority than // regular spider read threads long niceness = m_niceness; if ( niceness > 0 ) niceness = 2; if ( m_isRealMerge ) niceness = 1; if ( compute ) { m_msg3.readList ( m_rdbId , m_coll , m_fileStartKey , // modified by gotList() m_endKey , m_newMinRecSizes , // modified by gotList() m_startFileNum , m_numFiles , this , gotListWrapper , niceness , 0 , // retry num m_maxRetries , // -1=def m_compensateForMerge , -1,//m_syncPoint , true , // just get endKey? m_allowPageCache ); if ( g_errno ) { log("db: Msg5: getting endKey: %s",mstrerrno(g_errno)); return true; } treeEndKey = m_msg3.m_constrainKey; } QUICKPOLL((m_niceness)); // . get the list from our tree // . set g_errno and return true on error // . it is crucial that we get tree list before spawning a thread // because Msg22 will assume that if the TitleRec is in the tree // now we'll get it, because we need to have the latest version // of a particular document and this guarantees it. Otherwise, if // the doc is not in the tree then tfndb must tell its file number. // I just don't want to think its in the tree then have it get // dumped out right before we read it, then we end up getting the // older version rather than the new one in the tree which tfndb // does not know about until it is dumped out. so we could lose // recs between the in-memory and on-disk phases this way. // . however, if we are getting a titlerec, we first read the tfndb // list from the tree then disk. if the merge replaces the tfndb rec // we want with another while we are reading the tfndb list from // disk, then the tfndb rec we got from the tree was overwritten! // so then we'd read from the wrong title file number (tfn) and // not find the rec because the merge just removed it. so keeping // the tfndb recs truly in sync with the titledb recs requires // some dancing. the simplest way is to just scan all titleRecs // in the event of a disagreement... so turn on m_scanAllIfNotFound, // which essentially disregards tfndb and searches all the titledb // files for the titleRec. if ( m_includeTree ) { // get the mem tree for this rdb RdbTree *tree = base->m_rdb->getTree(); // how many recs are deletes in this list? long numNegativeRecs = 0; long numPositiveRecs = 0; // set start time long long start ; if ( m_newMinRecSizes > 64 ) start = gettimeofdayInMilliseconds(); // save for later m_treeMinRecSizes = m_newMinRecSizes; // . returns false on error and sets g_errno // . endKey of m_treeList may be less than m_endKey char *structName = "tree"; if(base->m_rdb->useTree()) { if ( ! tree->getList ( base->m_collnum , m_fileStartKey , treeEndKey , m_newMinRecSizes , &m_treeList , &numPositiveRecs , // # pos &numNegativeRecs , // # neg base->useHalfKeys() ) ) return true; // debug msg } else { RdbBuckets *buckets = &base->m_rdb->m_buckets; if ( ! buckets->getList ( base->m_collnum , m_fileStartKey , treeEndKey , m_newMinRecSizes , &m_treeList , &numPositiveRecs , &numNegativeRecs , base->useHalfKeys() )) { return true; } structName = "buckets"; } long long now ; if ( m_newMinRecSizes > 64 ) { now = gettimeofdayInMilliseconds(); long long took = now - start ; if ( took > 9 ) logf(LOG_INFO,"net: Got list from %s " "in %llu ms. size=%li db=%s " "niceness=%li.", structName, took,m_treeList.getListSize(), base->m_dbname,m_niceness); } // if our recSize is fixed we can boost m_minRecSizes to // compensate for these deletes when we call m_msg3.readList() long rs = base->getRecSize() ; // . use an avg. rec size for variable-length records // . just use tree to estimate avg. rec size if ( rs == -1) { if(base->m_rdb->useTree()) { // how much space do all recs take up in the tree? long totalSize = tree->getMemOccupiedForList(); // how many recs in the tree long numRecs = tree->getNumUsedNodes(); // get avg record size if ( numRecs > 0 ) rs = totalSize / numRecs; // add 10% for deviations rs = (rs * 110) / 100; // what is the minimal record size? long minrs = sizeof(key_t) + 4; // ensure a minimal record size if ( rs < minrs ) rs = minrs; } else { RdbBuckets *buckets = &base->m_rdb->m_buckets; rs = buckets->getNumKeys() / buckets->getMemOccupied(); long minrs = buckets->getRecSize() + 4; // ensure a minimal record size if ( rs < minrs ) rs = minrs; } } // . TODO: get avg recSize in this rdb (avgRecSize*numNeg..) // . don't do this if we're not merging because it makes // it harder to compute the # of bytes to read used to // pre-allocate a reply buf for Msg0 when !m_doMerge // . we set endKey for spiderdb when reading from tree above // based on the current minRecSizes so do not mess with it // in that case. if ( m_rdbId != RDB_SPIDERDB ) { //m_newMinRecSizes += rs * numNegativeRecs; long nn = m_newMinRecSizes + rs * numNegativeRecs; if ( rs > 0 && nn < m_newMinRecSizes ) nn = 0x7fffffff; m_newMinRecSizes = nn; } // . if m_endKey = m_startKey + 1 and our list has a rec // then no need to check the disk, it was in the tree // . it could be a negative or positive record // . tree can contain both negative/positive recs for the key // so we should do the final merge in gotList() // . that can happen because we don't do an annihilation // because the positive key may be being dumped out to disk // but it really wasn't and we get stuck with it //key_t kk = m_startKey ; //kk += (unsigned long)1; //if ( m_endKey == kk && ! m_treeList.isEmpty() ) { char kk[MAX_KEY_BYTES]; KEYSET(kk,m_startKey,m_ks); KEYADD(kk,1,m_ks); // no no no.... gotList() might be returning false because // it's doing a threaded call to merge_r to take out // the negative recs i guess... if ( KEYCMP(m_endKey,kk,m_ks)==0 && ! m_treeList.isEmpty() ) { return gotList(); } // return true; } } // if we don't use the tree then at least set the key bounds cuz we // pick the min endKey between diskList and treeList below else m_treeList.set ( m_fileStartKey , m_endKey ); // . if we're reading indexlists from 2 or more sources then some // will probably be compressed from 12 byte keys to 6 byte keys // . it is typically only about 1% when files are small, // and smaller than that when a file is large // . but just to be save reading an extra 2% won't hurt too much if ( base->useHalfKeys() ) { long numSources = m_numFiles; if ( numSources == -1 ) numSources = base->getNumFiles(); // if tree is empty, don't count it if ( m_includeTree && ! m_treeList.isEmpty() ) numSources++; // . if we don't do a merge then we return the list directly // (see condition where m_numListPtrs == 1 below) // from Msg3 (or tree) and we must hit minRecSizes as // close as possible for Msg3's call to constrain() so // we don't overflow the UdpSlot's TMPBUFSIZE buffer // . if we just arbitrarily boost m_newMinRecSizes then // the single list we get back from Msg3 will not have // been constrained with m_minRecSizes, but constrained // with m_newMinRecSizes (x2%) and be too big for our UdpSlot if ( numSources >= 2 ) { long long newmin = (long long)m_newMinRecSizes ; newmin = (newmin * 50LL) / 49LL ; // watch out for wrap around if ( (long)newmin < m_newMinRecSizes ) m_newMinRecSizes = 0x7fffffff; else m_newMinRecSizes = (long)newmin; } } QUICKPOLL((m_niceness)); // debug msg //log("msg5 calling msg3 for %li bytes (msg5=%lu)", // m_newMinRecSizes,(long)this); // . it's pointless to fetch data from disk passed treeList's endKey // . he only differs from m_endKey if his listSize is at least // newMinRecSizes //key_t diskEndKey = m_treeList.getEndKey(); char *diskEndKey = m_treeList.getEndKey(); // sanity check if ( m_treeList.m_ks != m_ks ) { char *xx = NULL; *xx = 0; } // we are waiting for the list m_waitingForList = true; // clear just in case g_errno = 0; // . now get from disk // . use the cache-modified constraints to reduce reading time // . return false if it blocked // . if compensateForMerge is true then m_startFileNum/m_numFiles // will be appropriately mapped around the merge if ( ! m_msg3.readList ( m_rdbId , m_coll , m_fileStartKey , // modified by gotList() diskEndKey , m_newMinRecSizes , // modified by gotList() m_startFileNum , m_numFiles , this , gotListWrapper , niceness , 0 , // retry num m_maxRetries , // max retries (-1=def) m_compensateForMerge , -1,//m_syncPoint , false , m_allowPageCache , m_hitDisk )) return false; // . this returns false if blocked, true otherwise // . sets g_errno on error // . updates m_newMinRecSizes // . updates m_fileStartKey to the endKey of m_list + 1 if ( ! gotList () ) return false; // bail on error from gotList() or Msg3::readList() if ( g_errno ) return true; // we may need to re-call getList if ( needsRecall() ) goto readMore; // we did not block return true; } /* // we were in the g_waitingTable waiting for a list to be read that was already // in progress when our request came in. void Msg5::copyAndSendBackList ( RdbList *listSrc ) { // this returns false and sets g_errno on error if ( ! m_list->copyList(listSrc) ) log("msg5: copylist failed: %s",mstrerror(g_errno)); // set it now m_calledCallback = 1; // when completely done call the callback m_callback ( m_state , m_list , this ); } */ bool Msg5::needsRecall ( ) { bool logIt; // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base = getRdbBase ( m_rdbId , m_coll ); // sanity check if ( ! base && ! g_errno ) { char *xx=NULL;*xx=0; } // . return true if we're done reading // . sometimes we'll need to read more because Msg3 will shorten the // endKey to better meat m_minRecSizes but because of // positive/negative record annihilation on variable-length // records it won't read enough if ( g_errno ) goto done; if ( m_newMinRecSizes <= 0 ) goto done; if ( ! m_mergeLists ) goto done; // limit to just doledb for now in case it results in data loss if(m_readAbsolutelyNothing&& (m_rdbId==RDB_DOLEDB||m_rdbId==RDB_SPIDERDB ) ) goto done; //if ( m_list->getEndKey() >= m_endKey ) goto done; if ( KEYCMP(m_list->getEndKey(),m_endKey,m_ks)>=0 ) goto done; /* // sanity check if ( m_indexdbTruncationLimit < MIN_TRUNC ) { log("disk: trunc limit2 = %li", m_indexdbTruncationLimit); char *xx = NULL; *xx = 0; } // if we are limited by truncation then we are done if ( base->useHalfKeys() && base->m_rdb != g_tfndb.getRdb() && //m_prevCount >= g_indexdb.getTruncationLimit() && m_prevCount >= m_indexdbTruncationLimit && g_indexdb.getTermId(*(key_t *)m_startKey) == g_indexdb.getTermId(*(key_t *)m_endKey) ) goto done; */ // debug msg //if ( g_conf.m_timingDebugEnabled ) // this is kinda important. we have to know if we are abusing // the disk... we should really keep stats on this... logIt = true; // seems to be very common for doledb, so don't log unless extreme //if ( m_rdbId == RDB_DOLEDB && m_round < 15 ) logIt = false; if ( logIt ) logf(LOG_DEBUG,"db: Reading %li again from %s (need %li total " "got %li) this=0x%lx round=%li.", m_newMinRecSizes , base->m_dbname , m_minRecSizes, m_list->m_listSize, (long)this , m_round++ ); // record how many screw ups we had so we know if it hurts performance base->m_rdb->didReSeek ( ); // try to read more from disk return true; done: // . reset merged list ptr // . merge_r() never rests the list ptr, m_listPtr if ( m_list ) m_list->resetListPtr(); /* // bring back termlist caching for posdb for gbdocid:xxxx| queries // in the seo pipeline in xmldoc.cpp because we need to do like // 1M of those things ultra fast! if ( m_addToCache && ! g_errno && m_rdbId == RDB_POSDB ) addToTermListCache(m_coll,m_startKey,m_endKey,m_list); // send back replies to others that are waiting callbackLoop: long slot = g_waitingTable.getSlot(&m_waitingKey); // valid? if ( slot >= 0 ) { // get it Msg5 *THIS = *(Msg5 **)g_waitingTable.getValueFromSlot(slot); // sanity. must have same start keys! if ( KEYCMP(THIS->m_startKey,m_startKey,m_ks) != 0 ) { char *xx=NULL;*xx=0; } // do not call this for the original request if ( THIS != this ) THIS->copyAndSendBackList ( m_list ); // delete it g_waitingTable.deleteSlot(slot); // another one goto callbackLoop; } */ /* // add finalized list to cache if we should if ( m_addToCache && ! g_errno ) { // point to cache RdbCache *cache = base->m_rdb->getCache(); // sanity check //bool ok = m_list->checkList_r ( false , true ); // break out //if ( ! ok ) { char *xx = NULL; *xx = 0; } //if ( ! ok ) log("ADDLIST had problem"); // add it if its ok //if ( ok ) m_cache->addList ( m_cacheKey, m_list ) ; cache->addList ( base->m_collnum , m_cacheKey, m_list ) ; // ignore errors g_errno = 0; } */ // return false cuz we don't need a recall return false; } void gotListWrapper ( void *state ) { Msg5 *THIS = (Msg5 *) state; // . this sets g_errno on error // . this will merge cache/tree and disk lists into m_list // . it will update m_newMinRecSizes // . it will also update m_fileStartKey to the endKey of m_list + 1 // . returns false if it blocks if ( ! THIS->gotList ( ) ) return; // . throw it back into the loop if necessary // . only returns true if COMPLETELY done if ( THIS->needsRecall() && ! THIS->readList() ) return; // sanity check if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; } // set it now THIS->m_calledCallback = 1; // when completely done call the callback THIS->m_callback ( THIS->m_state , THIS->m_list , THIS ); } static void threadDoneWrapper ( void *state , ThreadEntry *t ) ; static void *mergeListsWrapper_r ( void *state , ThreadEntry *t ) ; //static void gotListWrapper2 ( void *state , RdbList *list , Msg5 *msg5); #define TFNDBMINRECSIZES (256*1024) // . this is the NEW gotList() !!! mdw // . returns false if blocked, true otherwise // . sets g_errno on error bool Msg5::gotList ( ) { // we are no longer waiting for the list m_waitingForList = false; // debug msg //log("msg5 got lists from msg3 (msg5=%lu)",(long)this); // return if g_errno is set if ( g_errno && g_errno != ECORRUPTDATA ) return true; // if reading from titledb, read corresponding tfndb list so // we can remove overwritten titleRecs if ( m_rdbId != RDB_TITLEDB ) return gotList2(); // if not merging files on disk, skip this stuff if ( ! m_isRealMerge ) return gotList2(); // this is now obsolete! return gotList2(); } /* // The Tfndb Tfn Bug Fix. We've had this bug for over a year. Now we // need to load the corresponding tfndb list with every titledb list // so we can remove titleRecs that are from files whose tfn does not // match the ones in the tfndblist for that docid. // This will remove titlerecs that would only get removed in a tight // merge. But even more importantly, fixes the problem that when // merging to older titledb files, the tfndb rec of a titlerec in // that merge gets re-added to tfndb, and override the newer one in // tfndb that corresponds to a newer titledb file that contains the // document. // We load the tfndb list after the titledb list because if a titlerec // got added to the tree just after we loaded the titledb list, then // // MUST have this if ( ! m_msg5b ) { log(LOG_LOGIC,"net: No msg5b supplied."); char *xx = NULL; *xx = 0; } m_time1 = gettimeofdayInMilliseconds(); long long docId1 =g_titledb.getDocIdFromKey((key_t *)m_fileStartKey); long long docId2 =g_titledb.getDocIdFromKey((key_t *)m_msg3.m_endKey); key_t startKey = g_tfndb.makeMinKey ( docId1 ) ; key_t endKey = g_tfndb.makeMaxKey ( docId2 ) ; QUICKPOLL((m_niceness)); //endKey.setMax(); // the tfndb list is often already in the page cache, so this is fast if ( ! m_msg5b->getList ( RDB_TFNDB , m_coll , &m_tfndbList, startKey , endKey , TFNDBMINRECSIZES , // minRecSizes true , // includeTree false , // addToCache 0 , // maxCacheAge 0 , // startFileNum -1 , // numFiles this , // state gotListWrapper2, // callback m_niceness , false , // do error correction NULL , // cacheKeyPtr 0 , // retryNum m_maxRetries, // was 5 -- maxRetries true , // compensateForMerge -1LL , // syncpoint NULL , // msg5b false , // isRealMerge m_allowPageCache )) return false; return gotList2(); } void gotListWrapper2 ( void *state , RdbList *list , Msg5 *msg5 ) { Msg5 *THIS = (Msg5 *)state; if ( ! THIS->gotList2() ) return; // sanity check if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; } // set it now THIS->m_calledCallback = 2; // call the original callback THIS->m_callback ( THIS->m_state , THIS->m_list , THIS ); } */ // . this is the NEW gotList() !!! mdw // . returns false if blocked, true otherwise // . sets g_errno on error bool Msg5::gotList2 ( ) { // reset this m_startTime = 0LL; // return if g_errno is set if ( g_errno && g_errno != ECORRUPTDATA ) return true; // put all the lists in an array of list ptrs long n = 0; // all the disk lists for ( long i = 0 ; n < MAX_RDB_FILES && iisEmpty() ) continue; } // . remember the tfn, the secondary id // . the tfn is used by tfndb to map a docid to a titledb // file. each tfndb record has the tfn. each titledb file // has a tfn (secondary id, aka id2) //if ( m_rdbId == RDB_TITLEDB ) m_tfns [n] = m_msg3.getTfn(i); m_listPtrs [ n++ ] = m_msg3.getList(i); } QUICKPOLL(m_niceness); // sanity check. if ( m_msg3.getNumLists() > MAX_RDB_FILES ) log(LOG_LOGIC,"db: Msg3 had more than %li lists.", (long)MAX_RDB_FILES); // . get smallest endKey from all the lists // . all lists from Msg3 should have the same endKey, but // m_treeList::m_endKey may differ // . m_treeList::m_endKey should ALWAYS be >= that of the files // . constrain m_treeList to the startKey/endKey of the files //m_minEndKey = m_endKey; KEYSET(m_minEndKey,m_endKey,m_ks); for ( long i = 0 ; i < n ; i++ ) { //if ( m_listPtrs[i]->getEndKey() < m_minEndKey ) // m_minEndKey = m_listPtrs[i]->getEndKey(); // sanity check //if ( KEYNEG(m_listPtrs[i]->getEndKey()) ) { // char *xx=NULL;*xx=0; } if ( KEYCMP(m_listPtrs[i]->getEndKey(),m_minEndKey,m_ks)<0 ) KEYSET(m_minEndKey,m_listPtrs[i]->getEndKey(),m_ks); } // sanity check //if ( KEYNEG( m_minEndKey) ) {char *xx=NULL;*xx=0; } /* // if we got a tfndblist, constrain the title rec lists to its // transformed endkey. we only read in up to 500k of tfndb list so if // merging two really small titledb files we could potentially be // reading in a much bigger tfndb list. if ( m_rdbId == RDB_TITLEDB && m_isRealMerge && ! g_errno ) { long long time2 = gettimeofdayInMilliseconds(); long long diff = time2 - m_time1 ; log(LOG_DEBUG,"db: Read tfndblist in %lli ms " "(size=%li).",diff,m_tfndbList.m_listSize); // cut it down to m_msg3.m_endKey because that's what we used // to constrain this tfndb list read //if ( m_msg3.m_constrainKey < m_minEndKey ) { // log(LOG_DEBUG,"db: Constraining to tfndb Msg3 " // "m_endKey."); // m_minEndKey = m_msg3.m_constrainKey ; //} // only mess with m_minEndKey if our list was NOT limited // by it. if we were not limited by it, our endKey should // really be virtual inifinite. because the most our endKey // will ever be is g_tfndb.makeMaxKey ( docIdMAX ) as can // be seen above. if ( m_tfndbList.m_listSize >= TFNDBMINRECSIZES ) { // constrain titledb lists to tfndb's endkey if // it's smaller //key_t ekey = m_tfndbList.getEndKey(); char *ekey = m_tfndbList.getEndKey(); long long docid = g_tfndb.getDocId ( (key_t *)ekey ); if ( docid > 0 ) docid = docid - 1; //key_t nkey = g_titledb.makeLastKey ( docid ); char nkey[MAX_KEY_BYTES]; key_t trk = g_titledb.makeLastKey ( docid ); KEYSET ( nkey , (char *)&trk , m_ks ); // sanity check //if ( g_titledb.getKeySize() != m_ks ) { // char *xx = NULL; *xx = 0; } // only do constrain if docid is not 0 //if ( docid > 0 && nkey < m_minEndKey ) { if ( docid > 0 && KEYCMP(nkey,m_minEndKey,m_ks)<0 ) { log(LOG_DEBUG,"db: Tfndb had min key: " //"0x%llx",nkey.n0); "0x%llx",KEY0(nkey)); //m_minEndKey = nkey; KEYSET(m_minEndKey,nkey,m_ks); } } } */ QUICKPOLL(m_niceness); // . is treeList included? // . constrain treelist for the merge // . if used, m_listPtrs [ m_numListPtrs - 1 ] MUST equal &m_treeList // since newer lists are listed last so their records override older if ( m_includeTree && ! m_treeList.isEmpty() ) { // only constrain if we are NOT the sole list because the // constrain routine sets our endKey to virtual infinity it // seems like and that makes SpiderCache think that spiderdb // is exhausted when it is only in the tree. so i added the // if ( n > 0 ) condition here. if ( n > 0 ) { char k[MAX_KEY_BYTES]; m_treeList.getCurrentKey(k); m_treeList.constrain ( m_startKey , m_minEndKey , -1 , // min rec sizes 0 , // hint offset //m_treeList.getCurrentKey() , k, "tree" , m_niceness ); } //if ( m_rdbId == RDB_TITLEDB ) m_tfns [n] = 255; m_listPtrs [ n++ ] = &m_treeList; } // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true; // if not enough lists, use a dummy list to trigger merge so tfndb // filter happens and we have a chance to weed out old titleRecs if ( m_rdbId == RDB_TITLEDB && m_numFiles != 1 && n == 1 && m_isRealMerge ) { log(LOG_LOGIC,"db: Adding dummy list."); //m_tfns [n] = 255; m_dummy.set ( NULL , // list data 0 , // list data size NULL , // alloc 0 , // alloc size m_startKey , m_minEndKey , base->getFixedDataSize() , true , // own data? base->useHalfKeys() , m_ks ); m_listPtrs [ n++ ] = &m_dummy; } // bitch if ( n >= MAX_RDB_FILES ) log(LOG_LOGIC,"net: msg5: Too many lists (%li | %li).", m_msg3.getNumLists() , n); // store # of lists here for use by the call to merge_r() m_numListPtrs = n; // count the sizes m_totalSize = 0; for ( long i = 0 ; i < m_numListPtrs ; i++ ) m_totalSize += m_listPtrs[i]->getListSize(); QUICKPOLL(m_niceness); // . but don't breach minRecSizes // . this totalSize is just to see if we should spawn a thread, really //if ( totalSize > m_minRecSizes ) m_totalSize = m_minRecSizes; #ifdef _SANITYCHECK_ // who uses this now? //log("Msg5:: who is merging?????"); // timing debug // m_startKey.n1, // gettimeofdayInMilliseconds()-m_startTime , // m_diskList.getListSize()); // ensure both lists are legit // there may be negative keys in the tree // diskList may now also have negative recs since Msg3 no longer // removes them for fears of delayed positive keys not finding their // negative key because it was merged out by RdbMerge for ( long i = 0 ; i < m_numListPtrs ; i++ ) m_listPtrs[i]->checkList_r ( false , true ); #endif // if not doing merge, we're all done! if ( ! m_mergeLists ) return doneMerging(); // . if no lists we're done // . if we were a recall, then list may not be empty if ( m_numListPtrs == 0 && m_list->isEmpty() ) { // just copy ptrs from this list into m_list m_list->set ( NULL , // list data 0 , // list data size NULL , // alloc 0 , // alloc size m_startKey , m_endKey , base->getFixedDataSize() , true , // own data? base->useHalfKeys() , m_ks ); // . add m_list to our cache if we should // . this returns false if blocked, true otherwise // . sets g_errno on error // . only blocks if calls msg0 to patch a corrupted list // . it will handle calling callback if that happens return doneMerging(); } if ( m_numListPtrs == 0 ) m_readAbsolutelyNothing = true; // if msg3 had corruption in a list which was detected in contrain_r() if ( g_errno == ECORRUPTDATA ) { // if we only had one list, we were not doing a merge // so return g_errno to the requested so he tries from the // twin if ( m_numListPtrs == 1 ) return true; // assume nothing is wrong g_errno = 0; // if m_doErrorCorrection is true, repairLists_r() should fix } QUICKPOLL((m_niceness)); // . should we remove negative recs from final merged list? // . if we're reading from root and tmp merge file of root // . should we keep this out of the thread in case a file created? long fn = 0; if ( base->m_numFiles > 0 ) fn = base->m_fileIds[m_startFileNum]; if ( fn == 0 || fn == 1 ) m_removeNegRecs = true; else m_removeNegRecs = false; // . if we only have one list, just use it // . Msg3 should have called constrain() on it so it's m_list so // m_listEnd and m_listSize all fit m_startKey/m_endKey/m_minRecSizes // to a tee // . if it's a tree list it already fits to a tee // . same with cache list?? better be... // . if we're only reading one list it should always be empty right? // . i was getting negative keys in my RDB_DOLEDB list which caused // Spider.cpp to core, so i add the "! m_removeNegRecs" constraint // here... // . TODO: add some code to just filter out the negative recs // super quick just for this purpose // . crap, rather than do that just deal with the negative recs // in the caller code... in this case Spider.cpp::gotDoledbList2() if ( m_numListPtrs == 1 && m_list->isEmpty() &&//&&!m_removeNegRecs // just do this logic for doledb now, it was causing us to // return search results whose keys were negative indexdb keys. // or later we can just write some code to remove the neg // recs from the single list! ( m_rdbId == RDB_LINKDB || m_rdbId == RDB_DOLEDB || // this speeds up our queryloop querylog parsing in // seo.cpp quite a bit (m_rdbId == RDB_POSDB && m_numFiles==1) ) ) { // log any problems if ( ! m_listPtrs[0]->m_ownData ) { log(LOG_LOGIC,"db: Msg5: list does not own data."); goto skip; } // . bitch if not empty // . NO! might be our second time around if we had key // annihilations between file #0 and the tree, and now // we only have 1 non-empty list ptr, either from the tree // or from the file //if ( ! m_list->isEmpty() ) // log("Msg5::gotList: why is it not empty? size=%li", // m_list->getListSize() ); // just copy ptrs from this list into m_list m_list->set ( m_listPtrs[0]->getList () , m_listPtrs[0]->getListSize () , m_listPtrs[0]->getAlloc () , m_listPtrs[0]->getAllocSize () , m_listPtrs[0]->getStartKey () , m_listPtrs[0]->getEndKey () , m_listPtrs[0]->getFixedDataSize () , true , // own data? m_listPtrs[0]->useHalfKeys () , m_ks ); // ensure we don't free it when we loop on freeLists() below m_listPtrs[0]->setOwnData ( false ); // gotta set this too! if ( m_listPtrs[0]->m_lastKeyIsValid ) m_list->setLastKey ( m_listPtrs[0]->m_lastKey ); // . remove titleRecs that shouldn't be there // . if the tfn of the file we read the titlerec from does not // match the one in m_tfndbList, then remove it // . but if we're not merging lists, why remove it? //if ( m_rdbId == RDB_TITLEDB && m_msg3.m_numFileNums > 1 ) // stripTitleRecs ( m_list , m_tfns[0] , m_tfndbList ); // . add m_list to our cache if we should // . this returns false if blocked, true otherwise // . sets g_errno on error // . only blocks if calls msg0 to patch a corrupted list // . it will handle calling callback if that happens return doneMerging(); } skip: // time the perparation and merge m_startTime = gettimeofdayInMilliseconds(); // . merge the lists // . the startKey of the finalList is m_startKey, the first time // . but after that, we're adding diskLists, so us m_fileStartKey // . we're called multiple times for the same look-up in case of // delete records in a variable rec-length db cause some recs in our // disk lookups to be wiped out, thereby falling below minRecSizes // . this will set g_errno and return false on error (ENOMEM,...) // . older list goes first so newer list can override // . remove all negative-keyed recs since Msg5 is a high level msg call // . prepare for the merge, grows the buffer // . this returns false and sets g_errno on error // . should not affect the current list in m_list, only build on top if ( ! m_list->prepareForMerge ( m_listPtrs , m_numListPtrs , m_minRecSizes ) ) { log("net: Had error preparing to merge lists from %s: %s", base->m_dbname,mstrerror(g_errno)); return true; } QUICKPOLL((m_niceness)); #ifdef _TESTNEWALGO_ // for testing useBigRootList if ( ! m_list2.prepareForMerge ( m_listPtrs , m_numListPtrs , m_minRecSizes ) ) { log("net: Had error preparing to merge lists from %s: %s", base->m_dbname,mstrerror(g_errno)); return true; } #endif // . if size < 32k of don't bother with thread, should be < ~1 ms // . it seems to be about 1ms per 10k for tfndb recs // . it seems to core dump if we spawn a thread with totalSizes too low // . why??? if ( m_totalSize < 32*1024 ) goto skipThread; // if we are an interruptible niceness 1, do not use a thread, // we can be interrupted by the alarm callback and serve niceness // 0 requests, that is probably better! although the resolution is // on like 10ms on those alarms... BUT if you use a smaller // mergeBufSize of like 100k, that might make it responsive enough! // allow it to do a thread again so we can take advantage of // multiple cores, or hyperthreads i guess because i am seeing // some missed quickpoll log msgs, i suppose because we did not // insert QUICKPOLL() statements in the RdbList::merge_r() code //if ( m_niceness >= 1 ) goto skipThread; // supder duper hack! //if ( m_rdbId == RDB_REVDB ) goto skipThread; // i'm not sure why we core in Msg5's call to RdbList::merge_r(). // the list appears to be corrupt... //if ( m_rdbId == RDB_FACEBOOKDB ) goto skipThread; // skip it for now //goto skipThread; // . if size is big, make a thread // . let's always make niceness 0 since it wasn't being very // aggressive before if ( g_threads.call ( MERGE_THREAD , // threadType m_niceness , // m_niceness , this , // state data for callback threadDoneWrapper , mergeListsWrapper_r ) ) return false; // thread creation failed if ( ! g_threads.areThreadsDisabled() ) log(LOG_INFO, "net: Failed to create thread to merge lists. Doing " "blocking merge. Hurts performance."); // clear g_errno because it really isn't a problem, we just block g_errno = 0; // come here to skip the thread skipThread: // repair any corruption repairLists_r(); // do it mergeLists_r (); // . add m_list to our cache if we should // . this returns false if blocked, true otherwise // . sets g_errno on error // . only blocks if calls msg0 to patch a corrupted list // . it will handle calling callback if that happens return doneMerging(); } // thread will run this first void *mergeListsWrapper_r ( void *state , ThreadEntry *t ) { // we're in a thread now! Msg5 *THIS = (Msg5 *)state; // debug msg //log("Msg5::mergeListsWrapper: begining threaded merge!"); // repair any corruption THIS->repairLists_r(); // do the merge THIS->mergeLists_r(); // now cleanUp wrapper will call it //pthread_exit ( NULL ); // bogus return return NULL; } // . now we're done merging // . when the thread is done we get control back here, in the main process void threadDoneWrapper ( void *state , ThreadEntry *t ) { // we MAY be in a thread now Msg5 *THIS = (Msg5 *)state; // debug msg //log("msg3 back from merge thread (msg5=%lu)",THIS->m_state); // . add m_list to our cache if we should // . this returns false if blocked, true otherwise // . sets g_errno on error // . only blocks if calls msg0 to patch a corrupted list // . it will handle calling callback if that happens if ( ! THIS->doneMerging() ) return; // . throw it back into the loop if necessary // . only returns true if COMPLETELY done if ( THIS->needsRecall() && ! THIS->readList() ) return; // sanity check if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; } // set it now THIS->m_calledCallback = 3; // when completely done call the callback THIS->m_callback ( THIS->m_state , THIS->m_list , THIS ); } // check lists in the thread void Msg5::repairLists_r ( ) { // assume none m_hadCorruption = false; // return if no need to if ( ! m_doErrorCorrection ) return; // or if msg3 already check them and they were ok if ( m_msg3.m_listsChecked ) return; // if msg3 said they were corrupt... this happens when the map // is generated over a bad data file and ends up writing the same key // on more than 500MB worth of data. so when we try to read a list // that has the startkey/endkey covering that key, the read size // is too big to ever happen... if ( m_msg3.m_hadCorruption ) m_hadCorruption = true; // time it //m_checkTime = gettimeofdayInMilliseconds(); for ( long i = 0 ; i < m_numListPtrs ; i++ ) { // . did it breech our minRecSizes? // . only check for indexdb, our keys are all size 12 // . is this a mercenary problem? // . cored on 'twelfth night cake' // . no... this happens after merging the lists. if we had // a bunch of negative recs we over read anticipating some // recs will be deleted, so it isn't really necessary to // bitch about this here.. if ( g_conf.m_logDebugDb && m_rdbId == RDB_POSDB && m_listPtrs[i]->m_listSize > m_minRecSizes + 12 ) // just log it for now, maybe force core later log(LOG_DEBUG,"db: Index list size is %li but " "minRecSizes is %li.", m_listPtrs[i]->m_listSize , m_minRecSizes ); // this took like 50ms (-O3) on lenny on a 4meg list bool status = m_listPtrs[i]->checkList_r(false, // sleep on corruption if doing a sanity check (core dumps) #ifdef _SANITYCHECK_ true #else false #endif ); // if no errors, check the next list if ( status ) continue; // . show the culprit file // . logging the key ranges gives us an idea of how long // it will take to patch the bad data long nn = m_msg3.m_numFileNums; RdbBase *base = getRdbBase ( m_rdbId , m_coll ); if ( i < nn && base ) { long fn = m_msg3.m_fileNums[i]; BigFile *bf = base->getFile ( fn ); log("db: Corrupt filename is %s.",bf->getFilename()); //key_t sk = m_listPtrs[i]->getStartKey(); //key_t ek = m_listPtrs[i]->getEndKey (); //log("db: " // "startKey.n1=%lx n0=%llx " // "endKey.n1=%lx n0=%llx", // sk.n1,sk.n0,ek.n1,ek.n0); char *sk = m_listPtrs[i]->getStartKey(); char *ek = m_listPtrs[i]->getEndKey (); log("db: " "startKey=%s " "endKey=%s ", KEYSTR(sk,m_ks),KEYSTR(ek,m_ks)); } // . remove the bad eggs from the list // . TODO: support non-fixed data sizes if ( m_listPtrs[i]->getFixedDataSize() >= 0 ) m_listPtrs[i]->removeBadData_r(); else m_listPtrs[i]->reset(); // otherwise we have a patchable error m_hadCorruption = true; // don't add a list with errors to cache, please m_addToCache = false; } } void Msg5::mergeLists_r ( ) { // . don't do any merge if this is true // . if our fetch of remote list fails, then we'll be called // again with this set to false if ( m_hadCorruption ) return; // start the timer //long long startTime = gettimeofdayInMilliseconds(); // . if the key of the last key of the previous list we read from // is not below startKey, reset the truncation count to avoid errors // . if caller does the same read over and over again then // we would do a truncation in error eventually // . use m_fileStartKey, not just m_startKey, since we may be doing // a follow-up read //if ( m_prevKey >= m_fileStartKey ) m_prevCount = 0; if ( KEYCMP(m_prevKey,m_fileStartKey,m_ks)>=0 ) m_prevCount = 0; // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) { log("No collection found."); return; } /* if ( m_rdbId == RDB_POSDB ) { m_list->posdbMerge_r ( m_listPtrs , m_numListPtrs , m_startKey , m_minEndKey , m_minRecSizes , m_removeNegRecs , m_prevKey , &m_prevCount , &m_dupsRemoved , base->m_rdb->m_rdbId , &m_filtered , m_isRealMerge , // do group mask? m_isRealMerge , // is real merge? m_niceness ); } */ long niceness = m_niceness; if ( niceness > 0 ) niceness = 2; if ( m_isRealMerge ) niceness = 1; // . all lists must be constrained because indexMerge_r does not check // merged keys to see if they're in [startKey,endKey] // . RIGHT NOW indexdb is the only one that uses half keys!! // . indexMerge_r() will return false and set g_errno on error // . this is messing up linkdb!! /* if ( base->useHalfKeys() && m_rdbId != RDB_POSDB && m_rdbId != RDB2_POSDB2 && 1 == 3 ) { // disable for now! char *xx=NULL;*xx=0; // always assume to use it bool useBigRootList = true; // must include the first file always if ( m_startFileNum != 0 ) useBigRootList = false; // must be 12 bytes per key if ( m_ks != 12 ) useBigRootList = false; // just indexdb for now, tfndb has special merge rules if ( m_rdbId != RDB_POSDB && m_rdbId != RDB2_POSDB2 ) useBigRootList = false; // do not do if merging files on disk if ( m_isRealMerge ) useBigRootList = false; // turn off for now //useBigRootList = false; // do the merge m_list->indexMerge_r ( m_listPtrs , m_numListPtrs , m_startKey , m_minEndKey , m_minRecSizes , m_removeNegRecs , m_prevKey , &m_prevCount , //g_indexdb.getTruncationLimit() , m_indexdbTruncationLimit , &m_dupsRemoved , //base->m_rdb == g_tfndb.getRdb() , base->m_rdb->m_rdbId , &m_filtered , m_isRealMerge , // do group mask? m_isRealMerge , // is real merge? useBigRootList ,// useBigRootList? niceness ); #ifdef _TESTNEWALGO_ // for testing useBigRootList if ( useBigRootList ) { logf(LOG_DEBUG,"db: TRYING bit root list algo."); m_list2.indexMerge_r ( m_listPtrs , m_numListPtrs , m_startKey , m_minEndKey , m_minRecSizes , m_removeNegRecs , m_prevKey , &m_prevCount , m_indexdbTruncationLimit , &m_dupsRemoved , base->m_rdb->m_rdbId , &m_filtered , m_isRealMerge , //dogrpmask? m_isRealMerge , false );//bigRootList // sanity check long size1 = m_list->m_listSize; long size2 = m_list2.m_listSize; char *list1 = (char *)m_list->m_list; char *list2 = (char *)m_list2.m_list; if ( size1 != size2 || memcmp ( list1 , list2 , size1 ) != 0 ) { log("db: Got bad list."); m_list->printList(); m_list2.printList(); //char *xx = NULL; *xx = 0; } } #endif } */ //g_conf.m_useThreads = false; // . old Msg3 notes: // . otherwise, merge the lists together // . this may call growList() via RdbList::addRecord/Key() but it // shouldn't since we called RdbList::prepareForMerge() above // . we aren't allowed to do allocating in a thread! // . TODO: only merge the recs not cached, [m_fileStartKey, endKey] // . merge() might shrink m_endKey in diskList if m_minRecSizes // contrained us OR it might decrement it by 1 if it's a negative key // ......................... // . this MUST start at m_list->m_listPtr cuz this may not be the // 1st time we had to dive in to disk, due to negative rec // annihilation // . old finalList.merge_r() Msg5 notes: // . use startKey of tree // . NOTE: tree may contains some un-annihilated key pairs because // one of them was PROBABLY in the dump queue and we decided in // Rdb::addRecord() NOT to do the annihilation, therefore it's good // to do the merge to do the annihilation //else m_list->merge_r ( m_listPtrs , m_numListPtrs , m_startKey , m_minEndKey , m_minRecSizes , m_removeNegRecs , getIdFromRdb ( base->m_rdb ) , &m_filtered , NULL,//m_tfns , // used for titledb NULL,//&m_tfndbList , // used for titledb m_isRealMerge , niceness ); // maintain this info for truncation purposes if ( m_list->isLastKeyValid() ) //m_prevKey = m_list->getLastKey(); KEYSET(m_prevKey,m_list->getLastKey(),m_ks); else { // . lastKey should be set and valid if list is not empty // . we need it for de-duping dup tfndb recs that fall on our // read boundaries if ( m_list->m_listSize > 0 ) log(LOG_LOGIC,"db: Msg5. Last key invalid."); m_prevCount = 0; } } // . this returns false if blocked, true otherwise // . sets g_errno on error // . only blocks if calls msg0 to patch a corrupted list // . it will handle calling callback if that happens // . this is called when all files are done reading in m_msg3 // . sets g_errno on error // . problem: say maxRecSizes is 1200 (1000 keys) // . there are 10000 keys in the [startKey,endKey] range // . we read 1st 1000 recs from the tree and store in m_treeList // . we read 1st 1000 recs from disk // . all recs in tree are negative and annihilate the 1000 recs from disk // . we are left with an empty list bool Msg5::doneMerging ( ) { // get base, returns NULL and sets g_errno to ENOCOLLREC on error RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true; // . if there was a merge error, bitch about it // . Thread class should propagate g_errno when it was set in a thread if ( g_errno ) { log("net: Had error merging lists from %s: %s.", base->m_dbname,mstrerror(g_errno)); return true; } // . was a list corrupted? // . if so, we did not even begin the merge yet // . try to get the list from a remote brother // . if that fails we have already removed the bad data, so begin // our first merge if ( m_hadCorruption ) { // log it here, cuz logging in thread doesn't work too well log("net: Encountered a corrupt list in rdb=%s", base->m_dbname); // remove error condition, we removed the bad data in thread m_hadCorruption = false; // if(g_numCorrupt++ >= g_conf.m_maxCorruptLists && // g_conf.m_maxCorruptLists > 0) { g_numCorrupt++; if(g_conf.m_maxCorruptLists > 0 && (g_numCorrupt % g_conf.m_maxCorruptLists) == 0) { char msgbuf[1024]; Host *h = g_hostdb.getHost ( 0 ); snprintf(msgbuf, 1024, "%li corrupt lists. " "cluster=%s " "host=%li", g_numCorrupt, iptoa(h->m_ip), g_hostdb.m_hostId); g_pingServer.sendEmail(NULL, msgbuf); } // try to get the list from remote host if ( ! getRemoteList() ) return false; // note that if ( ! g_errno ) { log("net: got remote list without blocking"); char *xx=NULL;*xx=0; } // if it set g_errno, it could not get a remote list // so try to make due with what we have if ( g_errno ) { // log a msg, we actually already removed it in thread log("net: Removed corrupted data."); // clear error g_errno = 0; // . merge the modified lists again // . this is not in a thread // . it should not block mergeLists_r(); } } if ( m_isRealMerge ) log(LOG_DEBUG,"db: merged list is %li bytes long.", m_list->m_listSize); // log it long long now ; // only time it if we actually did a merge, check m_startTime if ( m_startTime ) now = gettimeofdayInMilliseconds(); else now = 0; long long took = now - m_startTime ; if ( g_conf.m_logTimingNet ) { if ( took > 5 ) log(LOG_INFO, "net: Took %llu ms to do merge. %li lists merged " "into one list of %li bytes.", took , m_numListPtrs , m_list->getListSize() ); //log("Msg5:: of that %llu ms was in checkList_r()s", // m_checkTime ); } // . add the stat // . use turquoise for time to merge the disk lists // . we should use another color rather than turquoise // . these clog up the graph, so only log if took more than 1 ms // . only time it if we actually did a merge, check m_startTime if ( took > 1 && m_startTime ) g_stats.addStat_r ( m_minRecSizes , m_startTime , now , //"rdb_list_merge", 0x0000ffff ); // . scan merged list for problems // . this caught an incorrectly set m_list->m_lastKey before #ifdef _SANITYCHECK_ m_list->checkList_r ( false , true , m_rdbId ); #endif // . all done if we did not merge the lists // . posdb doesn't need that so much for calling intersect9_r() if ( ! m_mergeLists ) return true; // . TODO: call freeList() on each m_list[i] here rather than destructr // . free all lists we used // . some of these may be from Msg3, some from cache, some from tree for ( long i = 0 ; i < m_numListPtrs ; i++ ) { m_listPtrs[i]->freeList(); m_listPtrs[i] = NULL; } // and the tree list m_treeList.freeList(); // . update our m_newMinRecSizes // . NOTE: this now ignores the negative records in the tree long long newListSize = m_list->getListSize(); // scale proportionally based on how many got removed during the merge long long percent = 100LL; long long net = newListSize - m_oldListSize; // add 5% for inconsistencies if ( net > 0 ) percent =(((long long)m_newMinRecSizes*100LL)/net)+5LL; else percent = 200; if ( percent <= 0 ) percent = 1; // set old list size in case we get called again m_oldListSize = newListSize; // how many recs do we have left to read? m_newMinRecSizes = m_minRecSizes - (long)newListSize; // return now if we met our minRecSizes quota if ( m_newMinRecSizes <= 0 ) return true; // otherwise, scale proportionately long nn = ((long long)m_newMinRecSizes * percent ) / 100LL; if ( percent > 100 ) { if ( nn > m_newMinRecSizes ) m_newMinRecSizes = nn; else m_newMinRecSizes = 0x7fffffff; } else m_newMinRecSizes = nn; QUICKPOLL(m_niceness); // . don't exceed original min rec sizes by 5 i guess // . watch out for wrap long max = 5 * m_minRecSizes ; if ( max < m_minRecSizes ) max = 0x7fffffff; if ( m_newMinRecSizes > max && max > m_minRecSizes ) m_newMinRecSizes = max; // keep this above a certain point because if we didn't make it now // we got negative records messing with us if ( m_rdbId != RDB_DOLEDB && m_newMinRecSizes < 128000 ) m_newMinRecSizes = 128000; // . update startKey in case we need to read more // . we'll need to read more if endKey < m_endKey && m_newMinRecSizes // is positive // . we read more from files AND from tree //m_fileStartKey = m_list->getEndKey() ; //m_fileStartKey += (unsigned long)1; KEYSET(m_fileStartKey,m_list->getEndKey(),m_ks); KEYADD(m_fileStartKey,1,m_ks); return true; } void gotRemoteListWrapper( void *state );//, RdbList *list ) ; long g_isDumpingRdbFromMain = 0; // . if we discover one of the lists we read from a file is corrupt we go here // . uses Msg5 to try to get list remotely bool Msg5::getRemoteList ( ) { // skip this part if doing a cmd line 'gb dump p main 0 -1 1' cmd or // similar to dump out a local rdb. if ( g_isDumpingRdbFromMain ) { g_errno = 1; return true; } // . this returns false if blocked, true otherwise // . this sets g_errno on error // . get list from ALL files, not just m_startFileNum/m_numFiles // since our files may not be the same // . if doRemotely parm is not supplied replying hostId is unspecified // get our twin host, or a redundant host in our group Host *group = g_hostdb.getGroup ( g_hostdb.m_groupId ); long n = g_hostdb.getNumHostsPerGroup(); // . if we only have 1 host per group, data is unpatchable // . we should not have been called if this is the case!! if ( n == 1 ) { g_errno = EBADENGINEER; //log("Msg5::gotRemoteList: no twins. data unpatchable."); return true; } // tfndb is not shareable, since it has tfns if ( m_rdbId == RDB_TFNDB ) { g_errno = EBADENGINEER; log("net: Cannot patch tfndb data from twin because it is " "not interchangable. Tfndb must be regenerated."); return true; } if ( m_rdbId == RDB_STATSDB ) { g_errno = EBADENGINEER; log("net: Cannot patch statsdb data from twin because it is " "not interchangable."); return true; } // tell them about log("net: Getting remote list from twin instead."); // make a new Msg0 for getting remote list try { m_msg0 = new ( Msg0 ); } // g_errno should be set if this is NULL catch ( ... ) { g_errno = ENOMEM; log("net: Could not allocate memory to get from twin."); return true; } mnew ( m_msg0 , sizeof(Msg0) , "Msg5" ); // select our twin long i; for ( i = 0 ; i < n ; i++ ) if ( group[i].m_hostId != g_hostdb.m_hostId ) break; Host *h = &group[i]; // get our groupnum. the column # long forceParitySplit = h->m_group; // translate base to an id, for the sake of m_msg0 //char rdbId = getIdFromRdb ( base->m_rdb ); // . this returns false if blocked, true otherwise // . this sets g_errno on error // . get list from ALL files, not just m_startFileNum/m_numFiles // since our files may not be the same // . if doRemotely parm is not supplied replying hostId is unspecified // . make minRecSizes as big as possible because it gets from ALL // files and from tree! // . just make it 256k for now lest, msg0 bitch about it being too big // if rdbId == RDB_INDEXDB passed the truncation limit // . wait forever for this host to reply... well, at least a day that // way if he's dead we'll wait for him to come back up to save our // data if ( ! m_msg0->getList ( h->m_hostId , h->m_ip , h->m_port , 0 , // max cached age false , // add to cache? m_rdbId , // rdbId m_coll , m_list , m_startKey , m_endKey , m_minRecSizes , // was 256k minRecSizes this , gotRemoteListWrapper , m_niceness , false , // do error correction? true , // include tree? true , // do merge? (obsolete) -1 , // first hostid 0 , // startFileNum -1 , // numFiles (-1=all) 60*60*24 , // timeout in secs -1 , // syncPoint -1 , // preferLocalReads NULL , // msg5 NULL , // msg5b m_isRealMerge , // merging files? //#ifdef SPLIT_INDEXDB m_allowPageCache , // allow page cache? false , // force local Indexdb false , // doIndexdbSplit // "forceParitySplit" is a group # // (the groupId is a mask) forceParitySplit )) //#else // m_allowPageCache ))// allow page cache? //#endif return false; // this is strange log("msg5: call to msg0 did not block"); // . if we did not block then call this directly // . return false if it blocks return gotRemoteList ( ) ; } void gotRemoteListWrapper( void *state ) { // , RdbList *list ) { Msg5 *THIS = (Msg5 *)state; // return if this blocks if ( ! THIS->gotRemoteList() ) return; // sanity check if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; } // set it now THIS->m_calledCallback = 4; // if it doesn't block call the callback, g_errno may be set THIS->m_callback ( THIS->m_state , THIS->m_list , THIS ); } // returns false if it blocks bool Msg5::gotRemoteList ( ) { // free the Msg0 mdelete ( m_msg0 , sizeof(Msg0) , "Msg5" ); delete ( m_msg0 ); // return true now if everything ok if ( ! g_errno ) { // . i modified checkList to set m_lastKey if it is not set // . we need it for the big merge for getting next key in // RdbDump.cpp // . if it too is invalid, we are fucked if ( ! m_list->checkList_r ( false , false ) ) { log("net: Received bad list from twin."); g_errno = ECORRUPTDATA; goto badList; } // . success messages // . logging the key ranges gives us an idea of how long // it will take to patch the bad data //key_t sk = m_list->getStartKey(); //key_t ek = m_list->getEndKey (); //log("net: Received good list from twin. Requested %li bytes " // "and got %li. " // "startKey.n1=%lx n0=%llx " // "endKey.n1=%lx n0=%llx", // m_minRecSizes , m_list->getListSize() , // sk.n1,sk.n0,ek.n1,ek.n0); char *sk = m_list->getStartKey(); char *ek = m_list->getEndKey (); log("net: Received good list from twin. Requested %li bytes " "and got %li. " "startKey=%s endKey=%s", m_minRecSizes , m_list->getListSize() , KEYSTR(sk,m_ks),KEYSTR(ek,m_ks)); // . HACK: fix it so end key is right // . TODO: fix this in Msg0::gotReply() // . if it is empty, then there must be nothing else left // since the endKey was maxed in call to Msg0::getList() QUICKPOLL(m_niceness); if ( ! m_list->isEmpty() ) m_list->setEndKey ( m_list->getLastKey() ); //key_t k ; //k = m_list->getStartKey(); char *k = m_list->getStartKey(); log(LOG_DEBUG, //"net: Received list skey.n1=%08lx skey.n0=%016llx." , // k.n1 , k.n0 ); "net: Received list skey=%s." , KEYSTR(k,m_ks) ); k = m_list->getEndKey(); log(LOG_DEBUG, //"net: Received list ekey.n1=%08lx ekey.n0=%016llx." , // k.n1 , k.n0 ); "net: Received list ekey=%s", KEYSTR(k,m_ks) ); if ( ! m_list->isEmpty() ) { k = m_list->getLastKey(); //log(LOG_DEBUG,"net: Received list Lkey.n1=%08lx " // "Lkey.n0=%016llx" , k.n1 , k.n0 ); log(LOG_DEBUG,"net: Received list Lkey=%s", KEYSTR(k,m_ks) ); } //log("Msg5::gotRemoteList: received list is good."); return true; } badList: // it points to a corrupted list from twin, so reset m_list->reset(); // because we passed m_list to Msg0, it called m_list->reset() // which set our m_mergeMinListSize to -1, so we have to call // the prepareForMerge() again if ( ! m_list->prepareForMerge ( m_listPtrs , m_numListPtrs , m_minRecSizes ) ) { log("net: Had error preparing for merge: %s.", mstrerror(g_errno)); return true; } // . if g_errno is timed out we couldn't get a patch list // . turn off error correction and try again log("net: Had error getting remote list: %s.", mstrerror(g_errno) ); log("net: Merging repaired lists."); // clear g_errno so RdbMerge doesn't freak out g_errno = 0; // . we have the lists ready to merge // . hadCorruption should be false at this point mergeLists_r(); // process the result return doneMerging(); }