open-source-search-engine/Msg5.cpp
Matt Wells 8772e7fffe overhauled the main loop. (BIGLOOP) in Loop.cpp.
sigtimedwait() was cutting it, it was queueing up too
many DUPLICATE signals and overflowing the rt signal queue.
now gb has its own real-time signal queueing logic that
just sets the bit of the FDs that need attention.
i think threaded reads/writes are better now too but
the performance graph is broken so i need to fix that first.
the threads page looks good though. overhaul this hopefully
is a massive and stable performance improvement.
2014-08-27 14:07:13 -07:00

2201 lines
76 KiB
C++

#include "gb-include.h"
#include "Msg5.h"
#include "RdbBase.h"
#include "Rdb.h"
//#include "Indexdb.h"
#include "Stats.h"
//#include "RdbCache.h"
#include "Threads.h"
#include "Msg0.h"
#include "PingServer.h"
//#include "Indexdb.h" // g_indexdb.getTruncationLimit()
//#include "CollectionRec.h"
//#define _SANITYCHECK_
//#define _TESTNEWALGO_ 1
static void gotListWrapper ( void *state ) ;
long g_numCorrupt = 0;
Msg5::Msg5() {
m_waitingForList = false;
//m_waitingForMerge = false;
m_numListPtrs = 0;
m_mergeLists = true;
reset();
}
Msg5::~Msg5() {
reset();
}
// frees m_treeList
void Msg5::reset() {
if ( m_waitingForList ) { // || m_waitingForMerge ) {
log("disk: Trying to reset a class waiting for a reply.");
// might being doing an urgent exit (mainShutdown(1)) or
// g_process.shutdown(), so do not core here
//char *xx = NULL; *xx = 0;
}
m_treeList.freeList();
//m_tfndbList.freeList();
m_msg3.reset();
m_prevCount = 0;
//m_prevKey.setMin();
KEYMIN(m_prevKey,MAX_KEY_BYTES);// m_ks); m_ks is invalid
// free lists if m_mergeLists was false
for ( long i = 0 ; ! m_mergeLists && i < m_numListPtrs ; i++ )
m_listPtrs[i]->freeList();
m_numListPtrs = 0;
// and the tree list
m_treeList.freeList();
}
/*
//key_t makeCacheKey ( key_t startKey ,
//void makeCacheKey ( key_t startKey ,
// key_t endKey ,
void makeCacheKey ( char *startKey ,
char *endKey ,
bool includeTree ,
long minRecSizes ,
long startFileNum ,
//long numFiles ) {
long numFiles ,
char *cacheKeyPtr ,
char ks ) { // keySize
//key_t cacheKey;
//cacheKey = startKey ;
//cacheKey = hash96 ( endKey , cacheKey );
//cacheKey = hash96 ( (long)includeTree , cacheKey );
//cacheKey = hash96 ( (long)minRecSizes , cacheKey );
//cacheKey = hash96 ( (long)startFileNum , cacheKey );
//cacheKey = hash96 ( (long)numFiles , cacheKey );
if ( ks == 12 ) {
key_t cacheKey = *(key_t *)startKey;
cacheKey = hash96 ( *(key_t *)endKey , cacheKey );
cacheKey = hash96 ( (long)includeTree , cacheKey );
cacheKey = hash96 ( (long)minRecSizes , cacheKey );
cacheKey = hash96 ( (long)startFileNum , cacheKey );
cacheKey = hash96 ( (long)numFiles , cacheKey );
*(key_t *)cacheKeyPtr = cacheKey;
}
else {
key128_t cacheKey = *(key128_t *)startKey;
cacheKey = hash128 ( *(key128_t *)endKey, cacheKey );
cacheKey = hash128 ( (long)includeTree , cacheKey );
cacheKey = hash128 ( (long)minRecSizes , cacheKey );
cacheKey = hash128 ( (long)startFileNum , cacheKey );
cacheKey = hash128 ( (long)numFiles , cacheKey );
*(key128_t *)cacheKeyPtr = cacheKey;
}
//return cacheKey;
}
*/
#include "Tfndb.h"
//#include "Checksumdb.h"
//HashTableX g_waitingTable;
// . return false if blocked, true otherwise
// . set g_errno on error
// . fills "list" with the requested list
// . we want at least "minRecSizes" bytes of records, but not much more
// . we want all our records to have keys in the [startKey,endKey] range
// . final merged list should try to have a size of at least "minRecSizes"
// . if may fall short if not enough records were in [startKey,endKey] range
// . endKey of list will be set so that all records from startKey to that
// endKey are in the list
// . a minRecSizes of 0x7fffffff means virtual inifinty, but it also has
// another special meaning. it tells msg5 to tell RdbTree's getList() to
// pre-allocate the list size by counting the recs ahead of time.
bool Msg5::getList ( char rdbId ,
collnum_t collnum ,
RdbList *list ,
//key_t startKey ,
//key_t endKey ,
void *startKey ,
void *endKey ,
long minRecSizes , // requested scan size(-1 none)
bool includeTree ,
bool addToCache ,
long maxCacheAge , // in secs for cache lookup
long startFileNum , // first file to scan
long numFiles , // rel. to startFileNum,-1 all
void *state , // for callback
void (* callback ) ( void *state ,
RdbList *list ,
Msg5 *msg5 ) ,
long niceness ,
bool doErrorCorrection ,
//key_t *cacheKeyPtr , // NULL if none
char *cacheKeyPtr , // NULL if none
long retryNum ,
long maxRetries ,
bool compensateForMerge ,
long long syncPoint ,
class Msg5 *msg5b ,
bool isRealMerge ,
bool allowPageCache ,
// if this is false we only check the page cache
bool hitDisk ,
bool mergeLists ) {
// make sure we are not being re-used prematurely
if ( m_waitingForList ) {
log("disk: Trying to reset a class waiting for a reply.");
char *xx = NULL; *xx = 0;
}
if ( collnum < 0 ) {
log("msg5: called with bad collnum=%li",(long)collnum);
g_errno = ENOCOLLREC;
return true;
}
// sanity check. we no longer have record caches!
// now we do again for posdb gbdocid:xxx| restricted queries
//if ( addToCache || maxCacheAge ) {char *xx=NULL;*xx=0; }
// assume no error
g_errno = 0;
// sanity
if ( ! list && mergeLists ) { char *xx=NULL;*xx=0; }
// warning
if ( collnum < 0 ) log(LOG_LOGIC,"net: bad collection. msg5.");
// MUST have this
//if ( rdbId == RDB_TITLEDB && ! msg5b ) {
// log(LOG_LOGIC,"net: No msg5b supplied. 1.");
// char *xx = NULL; *xx = 0;
//}
// . reset the provided list
// . this will not free any mem it may have alloc'd but it will set
// m_listSize to 0 so list->isEmpty() will return true
if ( list ) list->reset();
// key size set
m_ks = getKeySizeFromRdbId(rdbId);
// . complain if endKey < startKey
// . no because IndexReadInfo does this to prevent us from reading
// a list
//if ( startKey > endKey ) return true;
if ( KEYCMP((char *)startKey,(char *)endKey,m_ks)>0 ) return true;
// log("Msg5::readList: startKey > endKey warning");
// we no longer allow negative minRecSizes
if ( minRecSizes < 0 ) {
log(LOG_LOGIC,"net: msg5: MinRecSizes < 0, using 1.");
minRecSizes = 1;
char *xx = NULL; *xx = 0;
}
// ensure startKey last bit clear, endKey last bit set
//if ( (startKey.n0 & 0x01) == 0x01 )
if ( !KEYNEG((char *)startKey) )
log(LOG_REMIND,"net: msg5: StartKey lastbit set.");
// fix endkey
//if ( (endKey.n0 & 0x01) == 0x00 ) {
if ( KEYNEG((char *)endKey) ) {
log(LOG_REMIND,"net: msg5: EndKey lastbit clear. Fixing.");
//endKey.n0 |= 0x01;
*((char *)endKey) |= 0x01;
}
QUICKPOLL(niceness);
// debug msg
//log("doing msg5 niceness=%li",niceness);
//if ( niceness == 1 )
// log("hey!");
// timing debug
//m_startTime = gettimeofdayInMilliseconds();
// remember stuff
m_rdbId = rdbId;
m_collnum = collnum;
// why was this here? it was messing up the statsdb ("graph") link
// in the admin panel.
//CollectionRec *ttt = g_collectiondb.getRec ( m_collnum );
//if ( ! ttt ) {
// g_errno = ENOCOLLREC;
// return true;
//}
m_list = list;
//m_startKey = startKey;
//m_endKey = endKey;
KEYSET(m_startKey,(char *)startKey,m_ks);
KEYSET(m_endKey,(char *)endKey,m_ks);
m_minRecSizes = minRecSizes;
m_includeTree = includeTree;
m_addToCache = addToCache;
m_maxCacheAge = maxCacheAge;
m_startFileNum = startFileNum;
m_numFiles = numFiles;
m_state = state;
m_callback = callback;
m_calledCallback= 0;
m_niceness = niceness;
m_maxRetries = maxRetries;
m_oldListSize = 0;
m_dupsRemoved = 0;
m_compensateForMerge = compensateForMerge;
//m_syncPoint = syncPoint;
m_msg5b = msg5b;
m_isRealMerge = isRealMerge;
m_allowPageCache = allowPageCache;
m_hitDisk = hitDisk;
m_mergeLists = mergeLists;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
// point to cache
//RdbCache *cache = base->m_rdb->getCache();
// . these 2 vars are used for error correction
// . doRemoteLookup is -2 if it's up to us to decide
m_doErrorCorrection = doErrorCorrection;
// these get changed both by cache and gotList()
m_newMinRecSizes = minRecSizes;
m_round = 0;
m_readAbsolutelyNothing = false;
//m_fileStartKey = startKey;
KEYSET(m_fileStartKey,m_startKey,m_ks);
QUICKPOLL(m_niceness);
// get trunc limit by collection now, not just in g_conf
m_indexdbTruncationLimit = 0x7fffffff;
#ifdef _SANITYCHECK_
log("msg5: sk=%s", KEYSTR(m_startKey,m_ks));
log("msg5: ek=%s", KEYSTR(m_endKey,m_ks));
#endif
/*
if ( rdbId == RDB_INDEXDB ) {
CollectionRec *cr = g_collectiondb.getRec ( m_coll );
if ( ! cr ) { log("disk: Msg5 no coll rec."); return true; }
//m_indexdbTruncationLimit = cr->m_indexdbTruncationLimit;
// debug
//log("trunc limit = %li",m_indexdbTruncationLimit);
// Parms.cpp should never let this happen...
if ( m_indexdbTruncationLimit < MIN_TRUNC ) {
log("disk: trunc limit = %li",
m_indexdbTruncationLimit);
char *xx = NULL; *xx = 0;
}
}
*/
// debug msg and stuff
//m_startKey.n1 = 1616550649;
//m_startKey.n0 = (unsigned long long)10489958987685363408LL;
//m_includeTree = true;
//m_minRecSizes = 10080000;
//m_startFileNum = 0;
//m_numFiles = -1;
// hack it down
if ( numFiles > base->getNumFiles() )
numFiles = base->getNumFiles();
/*
// if we're storing or reading from cache.. make the cache key now
if ( m_maxCacheAge != 0 || m_addToCache ) {
//if ( cacheKeyPtr ) m_cacheKey = *cacheKeyPtr;
if ( cacheKeyPtr ) KEYSET(m_cacheKey,cacheKeyPtr,m_ks);
//else m_cacheKey = makeCacheKey ( m_startKey ,
else makeCacheKey ( m_startKey ,
m_endKey ,
m_includeTree ,
m_minRecSizes ,
m_startFileNum ,
//m_numFiles );
m_numFiles ,
m_cacheKey ,
m_ks );
}
*/
//log("ck.n1=%lu ck.n0=%llu",m_cacheKey.n1,m_cacheKey.n0);
//exit(-1);
// . make sure we set base above so Msg0.cpp:268 doesn't freak out
// . if startKey is > endKey list is empty
//if ( m_startKey > m_endKey ) return true;
if ( KEYCMP(m_startKey,m_endKey,m_ks)>0 ) return true;
// same if minRecSizes is 0
if ( m_minRecSizes == 0 ) return true;
/*
// bring back cache for posdb. but only if posdb is merging the lists
// like when it does gbdocid:xxx| restriction queries in xmldoc.cpp's
// seo pipe.
if ( m_maxCacheAge &&
m_rdbId == RDB_POSDB &&
//m_mergeLists && // this is obsolete me thinks
getListFromTermListCache ( coll,
m_startKey ,
m_endKey ,
m_maxCacheAge,
list ) )
// list should now be set from the termlistcache!
return true;
// check to see if another request for the exact same termlist
// is in progress. then wait for that if so.
// so use logic like makeCacheKey() was using.
long conti = 0;
m_waitingKey=hash64_cont(m_startKey,m_ks,0LL,&conti);
m_waitingKey=hash64_cont(m_endKey,m_ks,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char *)&m_minRecSizes,4,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char*)&m_startFileNum,4,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char *)&m_numFiles,4,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char *)&m_includeTree,1,m_waitingKey,&conti);
m_waitingKey^= ((unsigned long long)rdbId) << (64-8);
// init it?
static bool s_waitInit = false;
if ( ! s_waitInit ) {
s_waitInit = true;
if ( ! g_waitingTable.set(8,4,2048,NULL,0,
true,m_niceness,"m5wtbl"))
log("msg5: failed to init waiting table");
// ignore error
g_errno = 0;
}
// wait for the reply to come in
bool inTable = g_waitingTable.isInTable(&m_waitingKey);
// always add it
void *THIS = this;
bool added = g_waitingTable.addKey(&m_waitingKey,&THIS);
// wait for in-progress reply?
if ( inTable ) {
// log debug msg
if ( added && rdbId == RDB_POSDB && m_addToCache ) {
long long cks;
cks = getTermListCacheKey(m_startKey,m_endKey);
log("msg5: waiting for rdbid=%li wkey=%llx "
"startkey=%s ckey=%llx",
(long)rdbId,m_waitingKey,KEYSTR(m_startKey,m_ks),
cks);
}
// we blocked and are in the waiting table
if ( added )
return false;
// otherwise, the addkey failed
log("msg5: failed to add waitingkey: %s",mstrerror(g_errno));
g_errno = 0;
}
// . otherwise log as well
// . you should NEVER see a dup waiting key or cks because
// it should all be cached!!!!
if ( rdbId == RDB_POSDB && m_addToCache ) {
long long cks = getTermListCacheKey(m_startKey,m_endKey);
log("msg5: hitting disk for rdbid=%li wkey=%llx "
"startkey=%s ckey=%llx",
(long)rdbId,m_waitingKey,KEYSTR(m_startKey,m_ks),cks);
}
*/
/*
// use g_termListCache for single docid lookups
long long singleDocId = 0LL;
if ( rdbId == RDB_POSDB ) {
long long d1 = g_posdb.getDocId(startKey);
long long d2 = g_posdb.getDocId(endKey);
if ( d1+1 == d2 ) singleDocId = d1;
}
// if in the termlist cache, send it back right away
char *trec;
long trecSize;
if ( singleDocId &&
getRecFromTermListCache(coll,
m_startKey,
m_endKey,
20*60, // 20 mins. maxCacheAge,
&trec,
&trecSize) ) {
// if in cache send it back!
m_list->set ( trec , // list data
trecSize , // list data size
trec , // alloc
trecSize , // alloc size
m_startKey ,
m_endKey ,
0 , // fixeddatasize
true , // own data?
true , // use half keys?
m_ks );
// we got it without blocking
return true;
}
// if not found in cache it might be a query we do not match like
// 'track-track'
if ( singleDocId ) {
// just copy ptrs from this list into m_list
m_list->set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_endKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
return true;
}
*/
// . check in cache first
// . we now cache everything
/*
if ( m_maxCacheAge != 0 ) {
log(LOG_DEBUG,"net: Checking cache for list. "
//"startKey.n1=%lu %llu. cacheKey.n1=%lu %llu.",
//m_startKey.n1,m_startKey.n0,
//m_cacheKey.n1,m_cacheKey.n0);
"startKey.n1=%llx %llx. cacheKey.n1=%llx %llx.",
KEY1(m_startKey,m_ks),KEY0(m_startKey),
KEY1(m_cacheKey,m_ks),KEY0(m_cacheKey));
// is this list in the cache?
if ( cache->getList ( base->m_collnum ,
m_cacheKey ,
m_startKey ,
m_list ,
true , // do copy?
m_maxCacheAge ,
true )) {// incCounts?
// debug msg
log(LOG_DEBUG,"net: msg5: Got cache hit for %s.",
base->m_dbname );
// sanity check
//bool ok = m_list->checkList_r ( false , true );
//if ( ! ok ) log("GETLIST had problem");
// break out
//if ( ! ok ) { char *xx = NULL; *xx = 0; }
return true;
}
}
*/
// timing debug
//log("Msg5:getting list startKey.n1=%lu",m_startKey.n1);
// start the read loop - hopefully, will only loop once
if ( readList ( ) ) return true;
// tell Spider.cpp not to nuke us until we get back!!!
m_waitingForList = true;
// we blocked!!! must call m_callback
return false;
}
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . reads from cache, tree and files
// . calls gotList() to do the merge if we need to
// . loops until m_minRecSizes is satisfied OR m_endKey is reached
bool Msg5::readList ( ) {
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
readMore:
// . reset our tree list
// . sets fixedDataSize here in case m_includeTree is false because
// we don't want merge to have incompatible lists
m_treeList.reset();
m_treeList.setFixedDataSize ( base->getFixedDataSize() );
m_treeList.m_ks = m_ks;
// reset Msg3 in case gotList() is called without calling
// Msg3::readList() first
m_msg3.reset();
// assume lists have no errors in them
m_hadCorruption = false;
// . restrict tree's endkey by calling msg3 now...
// . this saves us from spending 1000ms to read 100k of negative
// spiderdb recs from the tree only to have most of the for naught
// . this call will ONLY set m_msg3.m_endKey
// . but only do this if dealing with spiderdb really
// . also now for tfndb, since we scan that in RdbDump.cpp to dedup
// the spiderdb list we are dumping to disk. it is really for any
// time when the endKey is unbounded, so check that now
char *treeEndKey = m_endKey;
bool compute = true;
if ( ! m_includeTree ) compute = false;
// if endKey is "unbounded" then bound it...
char max[MAX_KEY_BYTES]; KEYMAX(max,m_ks);
if ( KEYCMP(m_endKey,max,m_ks) != 0 ) compute = false;
// BUT don't bother if a small list, probably faster just to get it
if ( m_newMinRecSizes < 1024 ) compute = false;
// try to make merge read threads higher priority than
// regular spider read threads
long niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
if ( compute ) {
m_msg3.readList ( m_rdbId ,
m_collnum ,
m_fileStartKey , // modified by gotList()
m_endKey ,
m_newMinRecSizes , // modified by gotList()
m_startFileNum ,
m_numFiles ,
this ,
gotListWrapper ,
niceness ,
0 , // retry num
m_maxRetries , // -1=def
m_compensateForMerge ,
-1,//m_syncPoint ,
true , // just get endKey?
m_allowPageCache );
if ( g_errno ) {
log("db: Msg5: getting endKey: %s",mstrerrno(g_errno));
return true;
}
treeEndKey = m_msg3.m_constrainKey;
}
QUICKPOLL((m_niceness));
// . get the list from our tree
// . set g_errno and return true on error
// . it is crucial that we get tree list before spawning a thread
// because Msg22 will assume that if the TitleRec is in the tree
// now we'll get it, because we need to have the latest version
// of a particular document and this guarantees it. Otherwise, if
// the doc is not in the tree then tfndb must tell its file number.
// I just don't want to think its in the tree then have it get
// dumped out right before we read it, then we end up getting the
// older version rather than the new one in the tree which tfndb
// does not know about until it is dumped out. so we could lose
// recs between the in-memory and on-disk phases this way.
// . however, if we are getting a titlerec, we first read the tfndb
// list from the tree then disk. if the merge replaces the tfndb rec
// we want with another while we are reading the tfndb list from
// disk, then the tfndb rec we got from the tree was overwritten!
// so then we'd read from the wrong title file number (tfn) and
// not find the rec because the merge just removed it. so keeping
// the tfndb recs truly in sync with the titledb recs requires
// some dancing. the simplest way is to just scan all titleRecs
// in the event of a disagreement... so turn on m_scanAllIfNotFound,
// which essentially disregards tfndb and searches all the titledb
// files for the titleRec.
if ( m_includeTree ) {
// get the mem tree for this rdb
RdbTree *tree = base->m_rdb->getTree();
// how many recs are deletes in this list?
long numNegativeRecs = 0;
long numPositiveRecs = 0;
// set start time
long long start ;
if ( m_newMinRecSizes > 64 )
start = gettimeofdayInMilliseconds();
// save for later
m_treeMinRecSizes = m_newMinRecSizes;
// . returns false on error and sets g_errno
// . endKey of m_treeList may be less than m_endKey
char *structName = "tree";
if(base->m_rdb->useTree()) {
if ( ! tree->getList ( base->m_collnum ,
m_fileStartKey ,
treeEndKey ,
m_newMinRecSizes ,
&m_treeList ,
&numPositiveRecs , // # pos
&numNegativeRecs , // # neg
base->useHalfKeys() ) )
return true;
// debug msg
}
else {
RdbBuckets *buckets = &base->m_rdb->m_buckets;
if ( ! buckets->getList ( base->m_collnum ,
m_fileStartKey ,
treeEndKey ,
m_newMinRecSizes ,
&m_treeList ,
&numPositiveRecs ,
&numNegativeRecs ,
base->useHalfKeys() )) {
return true;
}
structName = "buckets";
}
long long now ;
if ( m_newMinRecSizes > 64 ) {
now = gettimeofdayInMilliseconds();
long long took = now - start ;
if ( took > 9 )
logf(LOG_INFO,"net: Got list from %s "
"in %llu ms. size=%li db=%s "
"niceness=%li.",
structName, took,m_treeList.getListSize(),
base->m_dbname,m_niceness);
}
// if our recSize is fixed we can boost m_minRecSizes to
// compensate for these deletes when we call m_msg3.readList()
long rs = base->getRecSize() ;
// . use an avg. rec size for variable-length records
// . just use tree to estimate avg. rec size
if ( rs == -1) {
if(base->m_rdb->useTree()) {
// how much space do all recs take up in the tree?
long totalSize = tree->getMemOccupiedForList();
// how many recs in the tree
long numRecs = tree->getNumUsedNodes();
// get avg record size
if ( numRecs > 0 ) rs = totalSize / numRecs;
// add 10% for deviations
rs = (rs * 110) / 100;
// what is the minimal record size?
long minrs = sizeof(key_t) + 4;
// ensure a minimal record size
if ( rs < minrs ) rs = minrs;
}
else {
RdbBuckets *buckets = &base->m_rdb->m_buckets;
rs = buckets->getNumKeys() /
buckets->getMemOccupied();
long minrs = buckets->getRecSize() + 4;
// ensure a minimal record size
if ( rs < minrs ) rs = minrs;
}
}
// . TODO: get avg recSize in this rdb (avgRecSize*numNeg..)
// . don't do this if we're not merging because it makes
// it harder to compute the # of bytes to read used to
// pre-allocate a reply buf for Msg0 when !m_doMerge
// . we set endKey for spiderdb when reading from tree above
// based on the current minRecSizes so do not mess with it
// in that case.
if ( m_rdbId != RDB_SPIDERDB ) {
//m_newMinRecSizes += rs * numNegativeRecs;
long nn = m_newMinRecSizes + rs * numNegativeRecs;
if ( rs > 0 && nn < m_newMinRecSizes ) nn = 0x7fffffff;
m_newMinRecSizes = nn;
}
// . if m_endKey = m_startKey + 1 and our list has a rec
// then no need to check the disk, it was in the tree
// . it could be a negative or positive record
// . tree can contain both negative/positive recs for the key
// so we should do the final merge in gotList()
// . that can happen because we don't do an annihilation
// because the positive key may be being dumped out to disk
// but it really wasn't and we get stuck with it
//key_t kk = m_startKey ;
//kk += (unsigned long)1;
//if ( m_endKey == kk && ! m_treeList.isEmpty() ) {
char kk[MAX_KEY_BYTES];
KEYSET(kk,m_startKey,m_ks);
KEYADD(kk,1,m_ks);
// no no no.... gotList() might be returning false because
// it's doing a threaded call to merge_r to take out
// the negative recs i guess...
if ( KEYCMP(m_endKey,kk,m_ks)==0 && ! m_treeList.isEmpty() ) {
return gotList(); } // return true; }
}
// if we don't use the tree then at least set the key bounds cuz we
// pick the min endKey between diskList and treeList below
else m_treeList.set ( m_fileStartKey , m_endKey );
// . if we're reading indexlists from 2 or more sources then some
// will probably be compressed from 12 byte keys to 6 byte keys
// . it is typically only about 1% when files are small,
// and smaller than that when a file is large
// . but just to be save reading an extra 2% won't hurt too much
if ( base->useHalfKeys() ) {
long numSources = m_numFiles;
if ( numSources == -1 )
numSources = base->getNumFiles();
// if tree is empty, don't count it
if ( m_includeTree && ! m_treeList.isEmpty() ) numSources++;
// . if we don't do a merge then we return the list directly
// (see condition where m_numListPtrs == 1 below)
// from Msg3 (or tree) and we must hit minRecSizes as
// close as possible for Msg3's call to constrain() so
// we don't overflow the UdpSlot's TMPBUFSIZE buffer
// . if we just arbitrarily boost m_newMinRecSizes then
// the single list we get back from Msg3 will not have
// been constrained with m_minRecSizes, but constrained
// with m_newMinRecSizes (x2%) and be too big for our UdpSlot
if ( numSources >= 2 ) {
long long newmin = (long long)m_newMinRecSizes ;
newmin = (newmin * 50LL) / 49LL ;
// watch out for wrap around
if ( (long)newmin < m_newMinRecSizes )
m_newMinRecSizes = 0x7fffffff;
else m_newMinRecSizes = (long)newmin;
}
}
// limit to 20MB so we don't go OOM!
if ( m_newMinRecSizes > 2 * m_minRecSizes &&
m_newMinRecSizes > 20000000 )
m_newMinRecSizes = 20000000;
QUICKPOLL((m_niceness));
// debug msg
//log("msg5 calling msg3 for %li bytes (msg5=%lu)",
// m_newMinRecSizes,(long)this);
// . it's pointless to fetch data from disk passed treeList's endKey
// . he only differs from m_endKey if his listSize is at least
// newMinRecSizes
//key_t diskEndKey = m_treeList.getEndKey();
char *diskEndKey = m_treeList.getEndKey();
// sanity check
if ( m_treeList.m_ks != m_ks ) { char *xx = NULL; *xx = 0; }
// we are waiting for the list
//m_waitingForList = true;
// clear just in case
g_errno = 0;
// . now get from disk
// . use the cache-modified constraints to reduce reading time
// . return false if it blocked
// . if compensateForMerge is true then m_startFileNum/m_numFiles
// will be appropriately mapped around the merge
if ( ! m_msg3.readList ( m_rdbId ,
m_collnum ,
m_fileStartKey , // modified by gotList()
diskEndKey ,
m_newMinRecSizes , // modified by gotList()
m_startFileNum ,
m_numFiles ,
this ,
gotListWrapper ,
niceness ,
0 , // retry num
m_maxRetries , // max retries (-1=def)
m_compensateForMerge ,
-1,//m_syncPoint ,
false ,
m_allowPageCache ,
m_hitDisk ))
return false;
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . updates m_newMinRecSizes
// . updates m_fileStartKey to the endKey of m_list + 1
if ( ! gotList () ) return false;
// bail on error from gotList() or Msg3::readList()
if ( g_errno ) return true;
// we may need to re-call getList
if ( needsRecall() ) goto readMore;
// we did not block
return true;
}
/*
// we were in the g_waitingTable waiting for a list to be read that was already
// in progress when our request came in.
void Msg5::copyAndSendBackList ( RdbList *listSrc ) {
// this returns false and sets g_errno on error
if ( ! m_list->copyList(listSrc) )
log("msg5: copylist failed: %s",mstrerror(g_errno));
// set it now
m_calledCallback = 1;
// when completely done call the callback
m_callback ( m_state , m_list , this );
}
*/
bool Msg5::needsRecall ( ) {
bool logIt;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
// if collection was deleted from under us, base will be NULL
if ( ! base && ! g_errno ) {
log("msg5: base lost for rdbid=%li collnum %li",
(long)m_rdbId,(long)m_collnum);
g_errno = ENOCOLLREC;
return false;
}
// sanity check
if ( ! base && ! g_errno ) { char *xx=NULL;*xx=0; }
// . return true if we're done reading
// . sometimes we'll need to read more because Msg3 will shorten the
// endKey to better meat m_minRecSizes but because of
// positive/negative record annihilation on variable-length
// records it won't read enough
if ( g_errno ) goto done;
if ( m_newMinRecSizes <= 0 ) goto done;
if ( ! m_mergeLists ) goto done;
// limit to just doledb for now in case it results in data loss
if(m_readAbsolutelyNothing&&
(m_rdbId==RDB_DOLEDB||m_rdbId==RDB_SPIDERDB ) )
goto done;
//if ( m_list->getEndKey() >= m_endKey ) goto done;
if ( KEYCMP(m_list->getEndKey(),m_endKey,m_ks)>=0 ) goto done;
// if this is NOT the first round and the sum of all our list sizes
// did not increase, then we hit the end...
//
// i think this is sometimes stopping us short. we need to verify
// that each list (from tree and files) is exhausted... which
// the statement above does... !!! it was causing us to miss urls
// in doledb and think a collection was done spidering. i think
// maybe because the startkey of each list would change since we
// merge them and accumulate into m_list. a better way would be
// to make sure doledb or any rdb dumps and tight merges when
// we start having a lot of key annihilations.
/*
if ( m_round >= 1 && m_totalSize == m_lastTotalSize ) {
log("msg5: increasing minrecsizes did nothing. assuming done. "
"db=%s (newsize=%li origsize=%li total "
"got %li totalListSizes=%li sk=%s) "
"cn=%li this=0x%lx round=%li.",
base->m_dbname ,
m_newMinRecSizes,
m_minRecSizes,
m_list->m_listSize,
m_totalSize,
KEYSTR(m_startKey,m_ks),
(long)m_collnum,(long)this, m_round );
goto done;
}
*/
// ok, make sure if we go another round at least one list gains!
m_lastTotalSize = m_totalSize;
/*
// sanity check
if ( m_indexdbTruncationLimit < MIN_TRUNC ) {
log("disk: trunc limit2 = %li", m_indexdbTruncationLimit);
char *xx = NULL; *xx = 0;
}
// if we are limited by truncation then we are done
if ( base->useHalfKeys() &&
base->m_rdb != g_tfndb.getRdb() &&
//m_prevCount >= g_indexdb.getTruncationLimit() &&
m_prevCount >= m_indexdbTruncationLimit &&
g_indexdb.getTermId(*(key_t *)m_startKey) ==
g_indexdb.getTermId(*(key_t *)m_endKey) )
goto done;
*/
// debug msg
//if ( g_conf.m_timingDebugEnabled )
// this is kinda important. we have to know if we are abusing
// the disk... we should really keep stats on this...
logIt = true;
// seems to be very common for doledb, so don't log unless extreme
//if ( m_rdbId == RDB_DOLEDB && m_round < 15 ) logIt = false;
if ( m_round > 100 && (m_round % 1000) != 0 ) logIt = false;
// seems very common when doing rebalancing then merging to have
// to do at least one round of re-reading, so note that
if ( m_round == 0 ) logIt = false;
// so common for doledb because of key annihilations
if ( m_rdbId == RDB_DOLEDB && m_round < 10 ) logIt = false;
if ( logIt )
logf(LOG_DEBUG,"db: Reading %li again from %s (need %li total "
"got %li totalListSizes=%li sk=%s) "
"cn=%li this=0x%lx round=%li.",
m_newMinRecSizes , base->m_dbname , m_minRecSizes,
m_list->m_listSize,
m_totalSize,
KEYSTR(m_startKey,m_ks),
(long)m_collnum,(long)this, m_round );
m_round++;
// record how many screw ups we had so we know if it hurts performance
base->m_rdb->didReSeek ( );
// try to read more from disk
return true;
done:
// . reset merged list ptr
// . merge_r() never rests the list ptr, m_listPtr
if ( m_list ) m_list->resetListPtr();
/*
// bring back termlist caching for posdb for gbdocid:xxxx| queries
// in the seo pipeline in xmldoc.cpp because we need to do like
// 1M of those things ultra fast!
if ( m_addToCache && ! g_errno && m_rdbId == RDB_POSDB )
addToTermListCache(m_coll,m_startKey,m_endKey,m_list);
// send back replies to others that are waiting
callbackLoop:
long slot = g_waitingTable.getSlot(&m_waitingKey);
// valid?
if ( slot >= 0 ) {
// get it
Msg5 *THIS = *(Msg5 **)g_waitingTable.getValueFromSlot(slot);
// sanity. must have same start keys!
if ( KEYCMP(THIS->m_startKey,m_startKey,m_ks) != 0 ) {
char *xx=NULL;*xx=0; }
// do not call this for the original request
if ( THIS != this ) THIS->copyAndSendBackList ( m_list );
// delete it
g_waitingTable.deleteSlot(slot);
// another one
goto callbackLoop;
}
*/
/*
// add finalized list to cache if we should
if ( m_addToCache && ! g_errno ) {
// point to cache
RdbCache *cache = base->m_rdb->getCache();
// sanity check
//bool ok = m_list->checkList_r ( false , true );
// break out
//if ( ! ok ) { char *xx = NULL; *xx = 0; }
//if ( ! ok ) log("ADDLIST had problem");
// add it if its ok
//if ( ok ) m_cache->addList ( m_cacheKey, m_list ) ;
cache->addList ( base->m_collnum , m_cacheKey, m_list ) ;
// ignore errors
g_errno = 0;
}
*/
// return false cuz we don't need a recall
return false;
}
void gotListWrapper ( void *state ) {
Msg5 *THIS = (Msg5 *) state;
// . this sets g_errno on error
// . this will merge cache/tree and disk lists into m_list
// . it will update m_newMinRecSizes
// . it will also update m_fileStartKey to the endKey of m_list + 1
// . returns false if it blocks
if ( ! THIS->gotList ( ) ) return;
// . throw it back into the loop if necessary
// . only returns true if COMPLETELY done
if ( THIS->needsRecall() && ! THIS->readList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// set it now
THIS->m_calledCallback = 1;
// we are no longer waiting for the list
THIS->m_waitingForList = false;
// when completely done call the callback
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
static void threadDoneWrapper ( void *state , ThreadEntry *t ) ;
static void *mergeListsWrapper_r ( void *state , ThreadEntry *t ) ;
//static void gotListWrapper2 ( void *state , RdbList *list , Msg5 *msg5);
#define TFNDBMINRECSIZES (256*1024)
// . this is the NEW gotList() !!! mdw
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg5::gotList ( ) {
// we are no longer waiting for the list
//m_waitingForList = false;
// debug msg
//log("msg5 got lists from msg3 (msg5=%lu)",(long)this);
// return if g_errno is set
if ( g_errno && g_errno != ECORRUPTDATA ) return true;
// if reading from titledb, read corresponding tfndb list so
// we can remove overwritten titleRecs
if ( m_rdbId != RDB_TITLEDB ) return gotList2();
// if not merging files on disk, skip this stuff
if ( ! m_isRealMerge ) return gotList2();
// this is now obsolete!
return gotList2();
}
/*
// The Tfndb Tfn Bug Fix. We've had this bug for over a year. Now we
// need to load the corresponding tfndb list with every titledb list
// so we can remove titleRecs that are from files whose tfn does not
// match the ones in the tfndblist for that docid.
// This will remove titlerecs that would only get removed in a tight
// merge. But even more importantly, fixes the problem that when
// merging to older titledb files, the tfndb rec of a titlerec in
// that merge gets re-added to tfndb, and override the newer one in
// tfndb that corresponds to a newer titledb file that contains the
// document.
// We load the tfndb list after the titledb list because if a titlerec
// got added to the tree just after we loaded the titledb list, then
//
// MUST have this
if ( ! m_msg5b ) {
log(LOG_LOGIC,"net: No msg5b supplied.");
char *xx = NULL; *xx = 0;
}
m_time1 = gettimeofdayInMilliseconds();
long long docId1 =g_titledb.getDocIdFromKey((key_t *)m_fileStartKey);
long long docId2 =g_titledb.getDocIdFromKey((key_t *)m_msg3.m_endKey);
key_t startKey = g_tfndb.makeMinKey ( docId1 ) ;
key_t endKey = g_tfndb.makeMaxKey ( docId2 ) ;
QUICKPOLL((m_niceness));
//endKey.setMax();
// the tfndb list is often already in the page cache, so this is fast
if ( ! m_msg5b->getList ( RDB_TFNDB ,
m_coll ,
&m_tfndbList,
startKey ,
endKey ,
TFNDBMINRECSIZES , // minRecSizes
true , // includeTree
false , // addToCache
0 , // maxCacheAge
0 , // startFileNum
-1 , // numFiles
this , // state
gotListWrapper2, // callback
m_niceness ,
false , // do error correction
NULL , // cacheKeyPtr
0 , // retryNum
m_maxRetries, // was 5 -- maxRetries
true , // compensateForMerge
-1LL , // syncpoint
NULL , // msg5b
false , // isRealMerge
m_allowPageCache ))
return false;
return gotList2();
}
void gotListWrapper2 ( void *state , RdbList *list , Msg5 *msg5 ) {
Msg5 *THIS = (Msg5 *)state;
if ( ! THIS->gotList2() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// set it now
THIS->m_calledCallback = 2;
// call the original callback
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
*/
// . this is the NEW gotList() !!! mdw
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg5::gotList2 ( ) {
// reset this
m_startTime = 0LL;
// return if g_errno is set
if ( g_errno && g_errno != ECORRUPTDATA ) return true;
// put all the lists in an array of list ptrs
long n = 0;
// all the disk lists
for ( long i = 0 ; n < MAX_RDB_FILES && i<m_msg3.getNumLists(); i++ ) {
// . skip list if empty
// . was this causing problems?
if ( ! m_isRealMerge ) {
RdbList *list = m_msg3.getList(i);
if ( list->isEmpty() ) continue;
}
// . remember the tfn, the secondary id
// . the tfn is used by tfndb to map a docid to a titledb
// file. each tfndb record has the tfn. each titledb file
// has a tfn (secondary id, aka id2)
//if ( m_rdbId == RDB_TITLEDB ) m_tfns [n] = m_msg3.getTfn(i);
m_listPtrs [ n++ ] = m_msg3.getList(i);
}
QUICKPOLL(m_niceness);
// sanity check.
if ( m_msg3.getNumLists() > MAX_RDB_FILES )
log(LOG_LOGIC,"db: Msg3 had more than %li lists.",
(long)MAX_RDB_FILES);
// . get smallest endKey from all the lists
// . all lists from Msg3 should have the same endKey, but
// m_treeList::m_endKey may differ
// . m_treeList::m_endKey should ALWAYS be >= that of the files
// . constrain m_treeList to the startKey/endKey of the files
//m_minEndKey = m_endKey;
KEYSET(m_minEndKey,m_endKey,m_ks);
for ( long i = 0 ; i < n ; i++ ) {
//if ( m_listPtrs[i]->getEndKey() < m_minEndKey )
// m_minEndKey = m_listPtrs[i]->getEndKey();
// sanity check
//if ( KEYNEG(m_listPtrs[i]->getEndKey()) ) {
// char *xx=NULL;*xx=0; }
if ( KEYCMP(m_listPtrs[i]->getEndKey(),m_minEndKey,m_ks)<0 ) {
KEYSET(m_minEndKey,m_listPtrs[i]->getEndKey(),m_ks);
// crap, if list is all negative keys, then the
// end key seems negative too! however in this
// case RdbScan::m_endKey seems positive so
// maybe we got a negative endkey in constrain?
//if (! (m_minEndKey[0] & 0x01) )
// log("msg5: list had bad endkey");
}
}
// sanity check
//if ( KEYNEG( m_minEndKey) ) {char *xx=NULL;*xx=0; }
/*
// if we got a tfndblist, constrain the title rec lists to its
// transformed endkey. we only read in up to 500k of tfndb list so if
// merging two really small titledb files we could potentially be
// reading in a much bigger tfndb list.
if ( m_rdbId == RDB_TITLEDB && m_isRealMerge && ! g_errno ) {
long long time2 = gettimeofdayInMilliseconds();
long long diff = time2 - m_time1 ;
log(LOG_DEBUG,"db: Read tfndblist in %lli ms "
"(size=%li).",diff,m_tfndbList.m_listSize);
// cut it down to m_msg3.m_endKey because that's what we used
// to constrain this tfndb list read
//if ( m_msg3.m_constrainKey < m_minEndKey ) {
// log(LOG_DEBUG,"db: Constraining to tfndb Msg3 "
// "m_endKey.");
// m_minEndKey = m_msg3.m_constrainKey ;
//}
// only mess with m_minEndKey if our list was NOT limited
// by it. if we were not limited by it, our endKey should
// really be virtual inifinite. because the most our endKey
// will ever be is g_tfndb.makeMaxKey ( docIdMAX ) as can
// be seen above.
if ( m_tfndbList.m_listSize >= TFNDBMINRECSIZES ) {
// constrain titledb lists to tfndb's endkey if
// it's smaller
//key_t ekey = m_tfndbList.getEndKey();
char *ekey = m_tfndbList.getEndKey();
long long docid = g_tfndb.getDocId ( (key_t *)ekey );
if ( docid > 0 ) docid = docid - 1;
//key_t nkey = g_titledb.makeLastKey ( docid );
char nkey[MAX_KEY_BYTES];
key_t trk = g_titledb.makeLastKey ( docid );
KEYSET ( nkey , (char *)&trk , m_ks );
// sanity check
//if ( g_titledb.getKeySize() != m_ks ) {
// char *xx = NULL; *xx = 0; }
// only do constrain if docid is not 0
//if ( docid > 0 && nkey < m_minEndKey ) {
if ( docid > 0 && KEYCMP(nkey,m_minEndKey,m_ks)<0 ) {
log(LOG_DEBUG,"db: Tfndb had min key: "
//"0x%llx",nkey.n0);
"0x%llx",KEY0(nkey));
//m_minEndKey = nkey;
KEYSET(m_minEndKey,nkey,m_ks);
}
}
}
*/
QUICKPOLL(m_niceness);
// . is treeList included?
// . constrain treelist for the merge
// . if used, m_listPtrs [ m_numListPtrs - 1 ] MUST equal &m_treeList
// since newer lists are listed last so their records override older
if ( m_includeTree && ! m_treeList.isEmpty() ) {
// only constrain if we are NOT the sole list because the
// constrain routine sets our endKey to virtual infinity it
// seems like and that makes SpiderCache think that spiderdb
// is exhausted when it is only in the tree. so i added the
// if ( n > 0 ) condition here.
if ( n > 0 ) {
char k[MAX_KEY_BYTES];
m_treeList.getCurrentKey(k);
m_treeList.constrain ( m_startKey ,
m_minEndKey ,
-1 , // min rec sizes
0 , // hint offset
//m_treeList.getCurrentKey() ,
k,
"tree" ,
m_niceness );
}
//if ( m_rdbId == RDB_TITLEDB ) m_tfns [n] = 255;
m_listPtrs [ n++ ] = &m_treeList;
}
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
// if not enough lists, use a dummy list to trigger merge so tfndb
// filter happens and we have a chance to weed out old titleRecs
if ( m_rdbId == RDB_TITLEDB && m_numFiles != 1 && n == 1 &&
m_isRealMerge ) {
//log(LOG_LOGIC,"db: Adding dummy list.");
//m_tfns [n] = 255;
m_dummy.set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_minEndKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
m_listPtrs [ n++ ] = &m_dummy;
}
// bitch
if ( n >= MAX_RDB_FILES )
log(LOG_LOGIC,"net: msg5: Too many lists (%li | %li).",
m_msg3.getNumLists() , n);
// store # of lists here for use by the call to merge_r()
m_numListPtrs = n;
// count the sizes
m_totalSize = 0;
for ( long i = 0 ; i < m_numListPtrs ; i++ )
m_totalSize += m_listPtrs[i]->getListSize();
QUICKPOLL(m_niceness);
// . but don't breach minRecSizes
// . this totalSize is just to see if we should spawn a thread, really
//if ( totalSize > m_minRecSizes ) m_totalSize = m_minRecSizes;
#ifdef _SANITYCHECK_
// who uses this now?
//log("Msg5:: who is merging?????");
// timing debug
// m_startKey.n1,
// gettimeofdayInMilliseconds()-m_startTime ,
// m_diskList.getListSize());
// ensure both lists are legit
// there may be negative keys in the tree
// diskList may now also have negative recs since Msg3 no longer
// removes them for fears of delayed positive keys not finding their
// negative key because it was merged out by RdbMerge
for ( long i = 0 ; i < m_numListPtrs ; i++ )
m_listPtrs[i]->checkList_r ( false , true );
#endif
// if not doing merge, we're all done!
if ( ! m_mergeLists )
return doneMerging();
// . if no lists we're done
// . if we were a recall, then list may not be empty
if ( m_numListPtrs == 0 && m_list->isEmpty() ) {
// just copy ptrs from this list into m_list
m_list->set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_endKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
if ( m_numListPtrs == 0 ) m_readAbsolutelyNothing = true;
// if msg3 had corruption in a list which was detected in contrain_r()
if ( g_errno == ECORRUPTDATA ) {
// if we only had one list, we were not doing a merge
// so return g_errno to the requested so he tries from the
// twin
if ( m_numListPtrs == 1 ) return true;
// assume nothing is wrong
g_errno = 0;
// if m_doErrorCorrection is true, repairLists_r() should fix
}
QUICKPOLL((m_niceness));
// . should we remove negative recs from final merged list?
// . if we're reading from root and tmp merge file of root
// . should we keep this out of the thread in case a file created?
long fn = 0;
if ( base->m_numFiles > 0 ) fn = base->m_fileIds[m_startFileNum];
if ( fn == 0 || fn == 1 ) m_removeNegRecs = true;
else m_removeNegRecs = false;
// . if we only have one list, just use it
// . Msg3 should have called constrain() on it so it's m_list so
// m_listEnd and m_listSize all fit m_startKey/m_endKey/m_minRecSizes
// to a tee
// . if it's a tree list it already fits to a tee
// . same with cache list?? better be...
// . if we're only reading one list it should always be empty right?
// . i was getting negative keys in my RDB_DOLEDB list which caused
// Spider.cpp to core, so i add the "! m_removeNegRecs" constraint
// here...
// . TODO: add some code to just filter out the negative recs
// super quick just for this purpose
// . crap, rather than do that just deal with the negative recs
// in the caller code... in this case Spider.cpp::gotDoledbList2()
if ( m_numListPtrs == 1 && m_list->isEmpty() &&//&&!m_removeNegRecs
// just do this logic for doledb now, it was causing us to
// return search results whose keys were negative indexdb keys.
// or later we can just write some code to remove the neg
// recs from the single list!
( m_rdbId == RDB_LINKDB || m_rdbId == RDB_DOLEDB ||
// this speeds up our queryloop querylog parsing in
// seo.cpp quite a bit
(m_rdbId == RDB_POSDB && m_numFiles==1) ) ) {
// log any problems
if ( ! m_listPtrs[0]->m_ownData ) {
log(LOG_LOGIC,"db: Msg5: list does not own data.");
goto skip;
}
// . bitch if not empty
// . NO! might be our second time around if we had key
// annihilations between file #0 and the tree, and now
// we only have 1 non-empty list ptr, either from the tree
// or from the file
//if ( ! m_list->isEmpty() )
// log("Msg5::gotList: why is it not empty? size=%li",
// m_list->getListSize() );
// just copy ptrs from this list into m_list
m_list->set ( m_listPtrs[0]->getList () ,
m_listPtrs[0]->getListSize () ,
m_listPtrs[0]->getAlloc () ,
m_listPtrs[0]->getAllocSize () ,
m_listPtrs[0]->getStartKey () ,
m_listPtrs[0]->getEndKey () ,
m_listPtrs[0]->getFixedDataSize () ,
true , // own data?
m_listPtrs[0]->useHalfKeys () ,
m_ks );
// ensure we don't free it when we loop on freeLists() below
m_listPtrs[0]->setOwnData ( false );
// gotta set this too!
if ( m_listPtrs[0]->m_lastKeyIsValid )
m_list->setLastKey ( m_listPtrs[0]->m_lastKey );
// . remove titleRecs that shouldn't be there
// . if the tfn of the file we read the titlerec from does not
// match the one in m_tfndbList, then remove it
// . but if we're not merging lists, why remove it?
//if ( m_rdbId == RDB_TITLEDB && m_msg3.m_numFileNums > 1 )
// stripTitleRecs ( m_list , m_tfns[0] , m_tfndbList );
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
skip:
// time the perparation and merge
m_startTime = gettimeofdayInMilliseconds();
// . merge the lists
// . the startKey of the finalList is m_startKey, the first time
// . but after that, we're adding diskLists, so us m_fileStartKey
// . we're called multiple times for the same look-up in case of
// delete records in a variable rec-length db cause some recs in our
// disk lookups to be wiped out, thereby falling below minRecSizes
// . this will set g_errno and return false on error (ENOMEM,...)
// . older list goes first so newer list can override
// . remove all negative-keyed recs since Msg5 is a high level msg call
// . prepare for the merge, grows the buffer
// . this returns false and sets g_errno on error
// . should not affect the current list in m_list, only build on top
if ( ! m_list->prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing to merge lists from %s: %s",
base->m_dbname,mstrerror(g_errno));
return true;
}
QUICKPOLL((m_niceness));
#ifdef _TESTNEWALGO_
// for testing useBigRootList
if ( ! m_list2.prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing to merge lists from %s: %s",
base->m_dbname,mstrerror(g_errno));
return true;
}
#endif
// . if size < 32k of don't bother with thread, should be < ~1 ms
// . it seems to be about 1ms per 10k for tfndb recs
// . it seems to core dump if we spawn a thread with totalSizes too low
// . why???
if ( m_totalSize < 32*1024 ) goto skipThread;
// if we are an interruptible niceness 1, do not use a thread,
// we can be interrupted by the alarm callback and serve niceness
// 0 requests, that is probably better! although the resolution is
// on like 10ms on those alarms... BUT if you use a smaller
// mergeBufSize of like 100k, that might make it responsive enough!
// allow it to do a thread again so we can take advantage of
// multiple cores, or hyperthreads i guess because i am seeing
// some missed quickpoll log msgs, i suppose because we did not
// insert QUICKPOLL() statements in the RdbList::merge_r() code
//if ( m_niceness >= 1 ) goto skipThread;
// supder duper hack!
//if ( m_rdbId == RDB_REVDB ) goto skipThread;
// i'm not sure why we core in Msg5's call to RdbList::merge_r().
// the list appears to be corrupt...
//if ( m_rdbId == RDB_FACEBOOKDB ) goto skipThread;
// skip it for now
//goto skipThread;
//m_waitingForMerge = true;
// . if size is big, make a thread
// . let's always make niceness 0 since it wasn't being very
// aggressive before
if ( g_threads.call ( MERGE_THREAD , // threadType
m_niceness , // m_niceness ,
this , // state data for callback
threadDoneWrapper ,
mergeListsWrapper_r ) )
return false;
//m_waitingForMerge = false;
// thread creation failed
if ( g_conf.m_useThreads && ! g_threads.m_disabled )
log(LOG_INFO,
"net: Failed to create thread to merge lists. Doing "
"blocking merge. Hurts performance.");
// clear g_errno because it really isn't a problem, we just block
g_errno = 0;
// come here to skip the thread
skipThread:
// repair any corruption
repairLists_r();
// do it
mergeLists_r ();
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
// thread will run this first
void *mergeListsWrapper_r ( void *state , ThreadEntry *t ) {
// we're in a thread now!
Msg5 *THIS = (Msg5 *)state;
// debug msg
//log("Msg5::mergeListsWrapper: begining threaded merge!");
// repair any corruption
THIS->repairLists_r();
// do the merge
THIS->mergeLists_r();
// now cleanUp wrapper will call it
//pthread_exit ( NULL );
// bogus return
return NULL;
}
// . now we're done merging
// . when the thread is done we get control back here, in the main process
void threadDoneWrapper ( void *state , ThreadEntry *t ) {
// we MAY be in a thread now
Msg5 *THIS = (Msg5 *)state;
// debug msg
//log("msg3 back from merge thread (msg5=%lu)",THIS->m_state);
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
if ( ! THIS->doneMerging() ) return;
// . throw it back into the loop if necessary
// . only returns true if COMPLETELY done
if ( THIS->needsRecall() && ! THIS->readList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// we are no longer waiting for the list
THIS->m_waitingForList = false;
// set it now
THIS->m_calledCallback = 3;
// when completely done call the callback
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
// check lists in the thread
void Msg5::repairLists_r ( ) {
// assume none
m_hadCorruption = false;
// return if no need to
if ( ! m_doErrorCorrection ) return;
// or if msg3 already check them and they were ok
if ( m_msg3.m_listsChecked ) return;
// if msg3 said they were corrupt... this happens when the map
// is generated over a bad data file and ends up writing the same key
// on more than 500MB worth of data. so when we try to read a list
// that has the startkey/endkey covering that key, the read size
// is too big to ever happen...
if ( m_msg3.m_hadCorruption ) m_hadCorruption = true;
// time it
//m_checkTime = gettimeofdayInMilliseconds();
for ( long i = 0 ; i < m_numListPtrs ; i++ ) {
// . did it breech our minRecSizes?
// . only check for indexdb, our keys are all size 12
// . is this a mercenary problem?
// . cored on 'twelfth night cake'
// . no... this happens after merging the lists. if we had
// a bunch of negative recs we over read anticipating some
// recs will be deleted, so it isn't really necessary to
// bitch about this here..
if ( g_conf.m_logDebugDb &&
m_rdbId == RDB_POSDB &&
m_listPtrs[i]->m_listSize > m_minRecSizes + 12 )
// just log it for now, maybe force core later
log(LOG_DEBUG,"db: Index list size is %li but "
"minRecSizes is %li.",
m_listPtrs[i]->m_listSize ,
m_minRecSizes );
// this took like 50ms (-O3) on lenny on a 4meg list
bool status = m_listPtrs[i]->checkList_r(false,
// sleep on corruption if doing a sanity check (core dumps)
#ifdef _SANITYCHECK_
true
#else
false
#endif
);
// if no errors, check the next list
if ( status ) continue;
// . show the culprit file
// . logging the key ranges gives us an idea of how long
// it will take to patch the bad data
long nn = m_msg3.m_numFileNums;
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
if ( i < nn && base ) {
long fn = m_msg3.m_fileNums[i];
BigFile *bf = base->getFile ( fn );
log("db: Corrupt filename is %s in collnum %li."
,bf->getFilename()
,(long)m_collnum);
//key_t sk = m_listPtrs[i]->getStartKey();
//key_t ek = m_listPtrs[i]->getEndKey ();
//log("db: "
// "startKey.n1=%lx n0=%llx "
// "endKey.n1=%lx n0=%llx",
// sk.n1,sk.n0,ek.n1,ek.n0);
char *sk = m_listPtrs[i]->getStartKey();
char *ek = m_listPtrs[i]->getEndKey ();
log("db: "
"startKey=%s "
"endKey=%s ",
KEYSTR(sk,m_ks),KEYSTR(ek,m_ks));
}
// . remove the bad eggs from the list
// . TODO: support non-fixed data sizes
//if ( m_listPtrs[i]->getFixedDataSize() >= 0 )
m_listPtrs[i]->removeBadData_r();
//else
//m_listPtrs[i]->reset();
// otherwise we have a patchable error
m_hadCorruption = true;
// don't add a list with errors to cache, please
m_addToCache = false;
}
}
void Msg5::mergeLists_r ( ) {
// . don't do any merge if this is true
// . if our fetch of remote list fails, then we'll be called
// again with this set to false
if ( m_hadCorruption ) return;
// start the timer
//long long startTime = gettimeofdayInMilliseconds();
// . if the key of the last key of the previous list we read from
// is not below startKey, reset the truncation count to avoid errors
// . if caller does the same read over and over again then
// we would do a truncation in error eventually
// . use m_fileStartKey, not just m_startKey, since we may be doing
// a follow-up read
//if ( m_prevKey >= m_fileStartKey ) m_prevCount = 0;
if ( KEYCMP(m_prevKey,m_fileStartKey,m_ks)>=0 ) m_prevCount = 0;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) {
log("No collection found."); return; }
/*
if ( m_rdbId == RDB_POSDB ) {
m_list->posdbMerge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
m_prevKey ,
&m_prevCount ,
&m_dupsRemoved ,
base->m_rdb->m_rdbId ,
&m_filtered ,
m_isRealMerge , // do group mask?
m_isRealMerge , // is real merge?
m_niceness );
}
*/
long niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
// . all lists must be constrained because indexMerge_r does not check
// merged keys to see if they're in [startKey,endKey]
// . RIGHT NOW indexdb is the only one that uses half keys!!
// . indexMerge_r() will return false and set g_errno on error
// . this is messing up linkdb!!
/*
if ( base->useHalfKeys() &&
m_rdbId != RDB_POSDB &&
m_rdbId != RDB2_POSDB2 &&
1 == 3 ) {
// disable for now!
char *xx=NULL;*xx=0;
// always assume to use it
bool useBigRootList = true;
// must include the first file always
if ( m_startFileNum != 0 ) useBigRootList = false;
// must be 12 bytes per key
if ( m_ks != 12 ) useBigRootList = false;
// just indexdb for now, tfndb has special merge rules
if ( m_rdbId != RDB_POSDB && m_rdbId != RDB2_POSDB2 )
useBigRootList = false;
// do not do if merging files on disk
if ( m_isRealMerge ) useBigRootList = false;
// turn off for now
//useBigRootList = false;
// do the merge
m_list->indexMerge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
m_prevKey ,
&m_prevCount ,
//g_indexdb.getTruncationLimit() ,
m_indexdbTruncationLimit ,
&m_dupsRemoved ,
//base->m_rdb == g_tfndb.getRdb() ,
base->m_rdb->m_rdbId ,
&m_filtered ,
m_isRealMerge , // do group mask?
m_isRealMerge , // is real merge?
useBigRootList ,// useBigRootList?
niceness );
#ifdef _TESTNEWALGO_
// for testing useBigRootList
if ( useBigRootList ) {
logf(LOG_DEBUG,"db: TRYING bit root list algo.");
m_list2.indexMerge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
m_prevKey ,
&m_prevCount ,
m_indexdbTruncationLimit ,
&m_dupsRemoved ,
base->m_rdb->m_rdbId ,
&m_filtered ,
m_isRealMerge , //dogrpmask?
m_isRealMerge ,
false );//bigRootList
// sanity check
long size1 = m_list->m_listSize;
long size2 = m_list2.m_listSize;
char *list1 = (char *)m_list->m_list;
char *list2 = (char *)m_list2.m_list;
if ( size1 != size2 ||
memcmp ( list1 , list2 , size1 ) != 0 ) {
log("db: Got bad list.");
m_list->printList();
m_list2.printList();
//char *xx = NULL; *xx = 0;
}
}
#endif
}
*/
//g_conf.m_useThreads = false;
// . old Msg3 notes:
// . otherwise, merge the lists together
// . this may call growList() via RdbList::addRecord/Key() but it
// shouldn't since we called RdbList::prepareForMerge() above
// . we aren't allowed to do allocating in a thread!
// . TODO: only merge the recs not cached, [m_fileStartKey, endKey]
// . merge() might shrink m_endKey in diskList if m_minRecSizes
// contrained us OR it might decrement it by 1 if it's a negative key
// .........................
// . this MUST start at m_list->m_listPtr cuz this may not be the
// 1st time we had to dive in to disk, due to negative rec
// annihilation
// . old finalList.merge_r() Msg5 notes:
// . use startKey of tree
// . NOTE: tree may contains some un-annihilated key pairs because
// one of them was PROBABLY in the dump queue and we decided in
// Rdb::addRecord() NOT to do the annihilation, therefore it's good
// to do the merge to do the annihilation
//else
m_list->merge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
getIdFromRdb ( base->m_rdb ) ,
&m_filtered ,
NULL,//m_tfns , // used for titledb
NULL,//&m_tfndbList , // used for titledb
m_isRealMerge ,
niceness );
// maintain this info for truncation purposes
if ( m_list->isLastKeyValid() )
//m_prevKey = m_list->getLastKey();
KEYSET(m_prevKey,m_list->getLastKey(),m_ks);
else {
// . lastKey should be set and valid if list is not empty
// . we need it for de-duping dup tfndb recs that fall on our
// read boundaries
if ( m_list->m_listSize > 0 )
log(LOG_LOGIC,"db: Msg5. Last key invalid.");
m_prevCount = 0;
}
}
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
// . this is called when all files are done reading in m_msg3
// . sets g_errno on error
// . problem: say maxRecSizes is 1200 (1000 keys)
// . there are 10000 keys in the [startKey,endKey] range
// . we read 1st 1000 recs from the tree and store in m_treeList
// . we read 1st 1000 recs from disk
// . all recs in tree are negative and annihilate the 1000 recs from disk
// . we are left with an empty list
bool Msg5::doneMerging ( ) {
//m_waitingForMerge = false;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
// . if there was a merge error, bitch about it
// . Thread class should propagate g_errno when it was set in a thread
if ( g_errno ) {
log("net: Had error merging lists from %s: %s.",
base->m_dbname,mstrerror(g_errno));
return true;
}
// . was a list corrupted?
// . if so, we did not even begin the merge yet
// . try to get the list from a remote brother
// . if that fails we have already removed the bad data, so begin
// our first merge
if ( m_hadCorruption ) {
// log it here, cuz logging in thread doesn't work too well
log("net: Encountered a corrupt list in rdb=%s collnum=%li",
base->m_dbname,(long)m_collnum);
// remove error condition, we removed the bad data in thread
m_hadCorruption = false;
// if(g_numCorrupt++ >= g_conf.m_maxCorruptLists &&
// g_conf.m_maxCorruptLists > 0) {
g_numCorrupt++;
if(g_conf.m_maxCorruptLists > 0 &&
(g_numCorrupt % g_conf.m_maxCorruptLists) == 0) {
char msgbuf[1024];
Host *h = g_hostdb.getHost ( 0 );
snprintf(msgbuf, 1024,
"%li corrupt lists. "
"cluster=%s "
"host=%li",
g_numCorrupt,
iptoa(h->m_ip),
g_hostdb.m_hostId);
g_pingServer.sendEmail(NULL, msgbuf);
}
// try to get the list from remote host
if ( ! getRemoteList() ) return false;
// note that
if ( ! g_errno ) {
log("net: got remote list without blocking");
char *xx=NULL;*xx=0;
}
// if it set g_errno, it could not get a remote list
// so try to make due with what we have
if ( g_errno ) {
// log a msg, we actually already removed it in thread
log("net: Removed corrupted data.");
// clear error
g_errno = 0;
// . merge the modified lists again
// . this is not in a thread
// . it should not block
mergeLists_r();
}
}
if ( m_isRealMerge )
log(LOG_DEBUG,"db: merged list is %li bytes long.",
m_list->m_listSize);
// log it
long long now ;
// only time it if we actually did a merge, check m_startTime
if ( m_startTime ) now = gettimeofdayInMilliseconds();
else now = 0;
long long took = now - m_startTime ;
if ( g_conf.m_logTimingNet ) {
if ( took > 5 )
log(LOG_INFO,
"net: Took %llu ms to do merge. %li lists merged "
"into one list of %li bytes.",
took , m_numListPtrs , m_list->getListSize() );
//log("Msg5:: of that %llu ms was in checkList_r()s",
// m_checkTime );
}
// . add the stat
// . use turquoise for time to merge the disk lists
// . we should use another color rather than turquoise
// . these clog up the graph, so only log if took more than 1 ms
// . only time it if we actually did a merge, check m_startTime
if ( took > 1 && m_startTime )
g_stats.addStat_r ( m_minRecSizes ,
m_startTime ,
now ,
//"rdb_list_merge",
0x0000ffff );
// . scan merged list for problems
// . this caught an incorrectly set m_list->m_lastKey before
#ifdef _SANITYCHECK_
m_list->checkList_r ( false , true , m_rdbId );
#endif
// . all done if we did not merge the lists
// . posdb doesn't need that so much for calling intersect9_r()
if ( ! m_mergeLists )
return true;
// . TODO: call freeList() on each m_list[i] here rather than destructr
// . free all lists we used
// . some of these may be from Msg3, some from cache, some from tree
for ( long i = 0 ; i < m_numListPtrs ; i++ ) {
m_listPtrs[i]->freeList();
m_listPtrs[i] = NULL;
}
// and the tree list
m_treeList.freeList();
// . update our m_newMinRecSizes
// . NOTE: this now ignores the negative records in the tree
long long newListSize = m_list->getListSize();
// scale proportionally based on how many got removed during the merge
long long percent = 100LL;
long long net = newListSize - m_oldListSize;
// add 5% for inconsistencies
if ( net > 0 ) percent =(((long long)m_newMinRecSizes*100LL)/net)+5LL;
else percent = 200;
if ( percent <= 0 ) percent = 1;
// set old list size in case we get called again
m_oldListSize = newListSize;
//long delta = m_minRecSizes - (long)newListSize;
// how many recs do we have left to read?
m_newMinRecSizes = m_minRecSizes - (long)newListSize;
// return now if we met our minRecSizes quota
if ( m_newMinRecSizes <= 0 ) return true;
// if we gained something this round then try to read the remainder
//if ( net > 0 ) m_newMinRecSizes = delta;
// otherwise, scale proportionately
long nn = ((long long)m_newMinRecSizes * percent ) / 100LL;
if ( percent > 100 ) {
if ( nn > m_newMinRecSizes ) m_newMinRecSizes = nn;
else m_newMinRecSizes = 0x7fffffff;
}
else m_newMinRecSizes = nn;
// . for every round we get call increase by 10 percent
// . try to fix all those negative recs in the rebalance re-run
m_newMinRecSizes *= (long)(1.0 + (m_round * .10));
// wrap around?
if ( m_newMinRecSizes < 0 || m_newMinRecSizes > 1000000000 )
m_newMinRecSizes = 1000000000;
QUICKPOLL(m_niceness);
// . don't exceed original min rec sizes by 5 i guess
// . watch out for wrap
//long max = 5 * m_minRecSizes ;
//if ( max < m_minRecSizes ) max = 0x7fffffff;
//if ( m_newMinRecSizes > max && max > m_minRecSizes )
// m_newMinRecSizes = max;
// keep this above a certain point because if we didn't make it now
// we got negative records messing with us
if ( m_rdbId != RDB_DOLEDB &&
m_newMinRecSizes < 128000 ) m_newMinRecSizes = 128000;
// . update startKey in case we need to read more
// . we'll need to read more if endKey < m_endKey && m_newMinRecSizes
// is positive
// . we read more from files AND from tree
//m_fileStartKey = m_list->getEndKey() ;
//m_fileStartKey += (unsigned long)1;
KEYSET(m_fileStartKey,m_list->getEndKey(),m_ks);
KEYADD(m_fileStartKey,1,m_ks);
return true;
}
void gotRemoteListWrapper( void *state );//, RdbList *list ) ;
long g_isDumpingRdbFromMain = 0;
// . if we discover one of the lists we read from a file is corrupt we go here
// . uses Msg5 to try to get list remotely
bool Msg5::getRemoteList ( ) {
// skip this part if doing a cmd line 'gb dump p main 0 -1 1' cmd or
// similar to dump out a local rdb.
if ( g_isDumpingRdbFromMain ) {
g_errno = 1;
return true;
}
// . this returns false if blocked, true otherwise
// . this sets g_errno on error
// . get list from ALL files, not just m_startFileNum/m_numFiles
// since our files may not be the same
// . if doRemotely parm is not supplied replying hostId is unspecified
// get our twin host, or a redundant host in our group
//Host *group = g_hostdb.getGroup ( g_hostdb.m_groupId );
Host *group = g_hostdb.getMyShard();
long n = g_hostdb.getNumHostsPerShard();
// . if we only have 1 host per group, data is unpatchable
// . we should not have been called if this is the case!!
if ( n == 1 ) {
g_errno = EBADENGINEER;
//log("Msg5::gotRemoteList: no twins. data unpatchable.");
return true;
}
// tfndb is not shareable, since it has tfns
if ( m_rdbId == RDB_TFNDB ) {
g_errno = EBADENGINEER;
log("net: Cannot patch tfndb data from twin because it is "
"not interchangable. Tfndb must be regenerated.");
return true;
}
if ( m_rdbId == RDB_STATSDB ) {
g_errno = EBADENGINEER;
log("net: Cannot patch statsdb data from twin because it is "
"not interchangable.");
return true;
}
// tell them about
log("net: Getting remote list from twin instead.");
// make a new Msg0 for getting remote list
try { m_msg0 = new ( Msg0 ); }
// g_errno should be set if this is NULL
catch ( ... ) {
g_errno = ENOMEM;
log("net: Could not allocate memory to get from twin.");
return true;
}
mnew ( m_msg0 , sizeof(Msg0) , "Msg5" );
// select our twin
long i;
for ( i = 0 ; i < n ; i++ )
if ( group[i].m_hostId != g_hostdb.m_hostId ) break;
Host *h = &group[i];
// get our groupnum. the column #
long forceParitySplit = h->m_shardNum;//group;
// translate base to an id, for the sake of m_msg0
//char rdbId = getIdFromRdb ( base->m_rdb );
// . this returns false if blocked, true otherwise
// . this sets g_errno on error
// . get list from ALL files, not just m_startFileNum/m_numFiles
// since our files may not be the same
// . if doRemotely parm is not supplied replying hostId is unspecified
// . make minRecSizes as big as possible because it gets from ALL
// files and from tree!
// . just make it 256k for now lest, msg0 bitch about it being too big
// if rdbId == RDB_INDEXDB passed the truncation limit
// . wait forever for this host to reply... well, at least a day that
// way if he's dead we'll wait for him to come back up to save our
// data
if ( ! m_msg0->getList ( h->m_hostId ,
h->m_ip ,
h->m_port ,
0 , // max cached age
false , // add to cache?
m_rdbId , // rdbId
m_collnum ,
m_list ,
m_startKey ,
m_endKey ,
m_minRecSizes , // was 256k minRecSizes
this ,
gotRemoteListWrapper ,
m_niceness ,
false , // do error correction?
true , // include tree?
true , // do merge? (obsolete)
-1 , // first hostid
0 , // startFileNum
-1 , // numFiles (-1=all)
60*60*24 , // timeout in secs
-1 , // syncPoint
-1 , // preferLocalReads
NULL , // msg5
NULL , // msg5b
m_isRealMerge , // merging files?
//#ifdef SPLIT_INDEXDB
m_allowPageCache , // allow page cache?
false , // force local Indexdb
false , // doIndexdbSplit
// "forceParitySplit" is a group #
// (the groupId is a mask)
forceParitySplit ))
//#else
// m_allowPageCache ))// allow page cache?
//#endif
return false;
// this is strange
log("msg5: call to msg0 did not block");
// . if we did not block then call this directly
// . return false if it blocks
return gotRemoteList ( ) ;
}
void gotRemoteListWrapper( void *state ) { // , RdbList *list ) {
Msg5 *THIS = (Msg5 *)state;
// return if this blocks
if ( ! THIS->gotRemoteList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// we are no longer waiting for the list
THIS->m_waitingForList = false;
// set it now
THIS->m_calledCallback = 4;
// if it doesn't block call the callback, g_errno may be set
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
// returns false if it blocks
bool Msg5::gotRemoteList ( ) {
// free the Msg0
mdelete ( m_msg0 , sizeof(Msg0) , "Msg5" );
delete ( m_msg0 );
// return true now if everything ok
if ( ! g_errno ) {
// . i modified checkList to set m_lastKey if it is not set
// . we need it for the big merge for getting next key in
// RdbDump.cpp
// . if it too is invalid, we are fucked
if ( ! m_list->checkList_r ( false , false ) ) {
log("net: Received bad list from twin.");
g_errno = ECORRUPTDATA;
goto badList;
}
// . success messages
// . logging the key ranges gives us an idea of how long
// it will take to patch the bad data
//key_t sk = m_list->getStartKey();
//key_t ek = m_list->getEndKey ();
//log("net: Received good list from twin. Requested %li bytes "
// "and got %li. "
// "startKey.n1=%lx n0=%llx "
// "endKey.n1=%lx n0=%llx",
// m_minRecSizes , m_list->getListSize() ,
// sk.n1,sk.n0,ek.n1,ek.n0);
char *sk = m_list->getStartKey();
char *ek = m_list->getEndKey ();
log("net: Received good list from twin. Requested %li bytes "
"and got %li. "
"startKey=%s endKey=%s",
m_minRecSizes , m_list->getListSize() ,
KEYSTR(sk,m_ks),KEYSTR(ek,m_ks));
// . HACK: fix it so end key is right
// . TODO: fix this in Msg0::gotReply()
// . if it is empty, then there must be nothing else left
// since the endKey was maxed in call to Msg0::getList()
QUICKPOLL(m_niceness);
if ( ! m_list->isEmpty() )
m_list->setEndKey ( m_list->getLastKey() );
//key_t k ;
//k = m_list->getStartKey();
char *k = m_list->getStartKey();
log(LOG_DEBUG,
//"net: Received list skey.n1=%08lx skey.n0=%016llx." ,
// k.n1 , k.n0 );
"net: Received list skey=%s." ,
KEYSTR(k,m_ks) );
k = m_list->getEndKey();
log(LOG_DEBUG,
//"net: Received list ekey.n1=%08lx ekey.n0=%016llx." ,
// k.n1 , k.n0 );
"net: Received list ekey=%s",
KEYSTR(k,m_ks) );
if ( ! m_list->isEmpty() ) {
k = m_list->getLastKey();
//log(LOG_DEBUG,"net: Received list Lkey.n1=%08lx "
// "Lkey.n0=%016llx" , k.n1 , k.n0 );
log(LOG_DEBUG,"net: Received list Lkey=%s",
KEYSTR(k,m_ks) );
}
//log("Msg5::gotRemoteList: received list is good.");
return true;
}
badList:
// it points to a corrupted list from twin, so reset
m_list->reset();
// because we passed m_list to Msg0, it called m_list->reset()
// which set our m_mergeMinListSize to -1, so we have to call
// the prepareForMerge() again
if ( ! m_list->prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing for merge: %s.",
mstrerror(g_errno));
return true;
}
// . if g_errno is timed out we couldn't get a patch list
// . turn off error correction and try again
log("net: Had error getting remote list: %s.", mstrerror(g_errno) );
log("net: Merging repaired lists.");
// clear g_errno so RdbMerge doesn't freak out
g_errno = 0;
// . we have the lists ready to merge
// . hadCorruption should be false at this point
mergeLists_r();
// process the result
return doneMerging();
}