open-source-search-engine/Msg5.cpp

2215 lines
77 KiB
C++
Raw Normal View History

2013-08-03 00:12:24 +04:00
#include "gb-include.h"
#include "Msg5.h"
#include "RdbBase.h"
#include "Rdb.h"
//#include "Indexdb.h"
#include "Stats.h"
//#include "RdbCache.h"
#include "Threads.h"
#include "Msg0.h"
#include "PingServer.h"
//#include "Indexdb.h" // g_indexdb.getTruncationLimit()
//#include "CollectionRec.h"
2013-08-03 00:12:24 +04:00
//#define _SANITYCHECK_
//#define _TESTNEWALGO_ 1
static void gotListWrapper ( void *state ) ;
long g_numCorrupt = 0;
Msg5::Msg5() {
m_waitingForList = false;
//m_waitingForMerge = false;
2013-08-03 00:12:24 +04:00
m_numListPtrs = 0;
m_mergeLists = true;
reset();
}
Msg5::~Msg5() {
reset();
}
// frees m_treeList
void Msg5::reset() {
if ( m_waitingForList ) { // || m_waitingForMerge ) {
2013-08-03 00:12:24 +04:00
log("disk: Trying to reset a class waiting for a reply.");
// might being doing an urgent exit (mainShutdown(1)) or
// g_process.shutdown(), so do not core here
//char *xx = NULL; *xx = 0;
}
m_treeList.freeList();
//m_tfndbList.freeList();
m_msg3.reset();
m_prevCount = 0;
//m_prevKey.setMin();
KEYMIN(m_prevKey,MAX_KEY_BYTES);// m_ks); m_ks is invalid
// free lists if m_mergeLists was false
for ( long i = 0 ; ! m_mergeLists && i < m_numListPtrs ; i++ )
m_listPtrs[i]->freeList();
m_numListPtrs = 0;
// and the tree list
m_treeList.freeList();
}
/*
//key_t makeCacheKey ( key_t startKey ,
//void makeCacheKey ( key_t startKey ,
// key_t endKey ,
void makeCacheKey ( char *startKey ,
char *endKey ,
bool includeTree ,
long minRecSizes ,
long startFileNum ,
//long numFiles ) {
long numFiles ,
char *cacheKeyPtr ,
char ks ) { // keySize
//key_t cacheKey;
//cacheKey = startKey ;
//cacheKey = hash96 ( endKey , cacheKey );
//cacheKey = hash96 ( (long)includeTree , cacheKey );
//cacheKey = hash96 ( (long)minRecSizes , cacheKey );
//cacheKey = hash96 ( (long)startFileNum , cacheKey );
//cacheKey = hash96 ( (long)numFiles , cacheKey );
if ( ks == 12 ) {
key_t cacheKey = *(key_t *)startKey;
cacheKey = hash96 ( *(key_t *)endKey , cacheKey );
cacheKey = hash96 ( (long)includeTree , cacheKey );
cacheKey = hash96 ( (long)minRecSizes , cacheKey );
cacheKey = hash96 ( (long)startFileNum , cacheKey );
cacheKey = hash96 ( (long)numFiles , cacheKey );
*(key_t *)cacheKeyPtr = cacheKey;
}
else {
key128_t cacheKey = *(key128_t *)startKey;
cacheKey = hash128 ( *(key128_t *)endKey, cacheKey );
cacheKey = hash128 ( (long)includeTree , cacheKey );
cacheKey = hash128 ( (long)minRecSizes , cacheKey );
cacheKey = hash128 ( (long)startFileNum , cacheKey );
cacheKey = hash128 ( (long)numFiles , cacheKey );
*(key128_t *)cacheKeyPtr = cacheKey;
}
//return cacheKey;
}
*/
#include "Tfndb.h"
//#include "Checksumdb.h"
//HashTableX g_waitingTable;
// . return false if blocked, true otherwise
// . set g_errno on error
// . fills "list" with the requested list
// . we want at least "minRecSizes" bytes of records, but not much more
// . we want all our records to have keys in the [startKey,endKey] range
// . final merged list should try to have a size of at least "minRecSizes"
// . if may fall short if not enough records were in [startKey,endKey] range
// . endKey of list will be set so that all records from startKey to that
// endKey are in the list
// . a minRecSizes of 0x7fffffff means virtual inifinty, but it also has
// another special meaning. it tells msg5 to tell RdbTree's getList() to
// pre-allocate the list size by counting the recs ahead of time.
bool Msg5::getList ( char rdbId ,
collnum_t collnum ,
2013-08-03 00:12:24 +04:00
RdbList *list ,
//key_t startKey ,
//key_t endKey ,
void *startKey ,
void *endKey ,
long minRecSizes , // requested scan size(-1 none)
bool includeTree ,
bool addToCache ,
long maxCacheAge , // in secs for cache lookup
long startFileNum , // first file to scan
long numFiles , // rel. to startFileNum,-1 all
void *state , // for callback
void (* callback ) ( void *state ,
RdbList *list ,
Msg5 *msg5 ) ,
long niceness ,
bool doErrorCorrection ,
//key_t *cacheKeyPtr , // NULL if none
char *cacheKeyPtr , // NULL if none
long retryNum ,
long maxRetries ,
bool compensateForMerge ,
long long syncPoint ,
class Msg5 *msg5b ,
bool isRealMerge ,
bool allowPageCache ,
// if this is false we only check the page cache
bool hitDisk ,
bool mergeLists ) {
// make sure we are not being re-used prematurely
if ( m_waitingForList ) {
log("disk: Trying to reset a class waiting for a reply.");
char *xx = NULL; *xx = 0;
}
if ( collnum < 0 ) {
log("msg5: called with bad collnum=%li",(long)collnum);
g_errno = ENOCOLLREC;
return true;
}
2013-08-03 00:12:24 +04:00
// sanity check. we no longer have record caches!
// now we do again for posdb gbdocid:xxx| restricted queries
//if ( addToCache || maxCacheAge ) {char *xx=NULL;*xx=0; }
// assume no error
g_errno = 0;
// sanity
if ( ! list && mergeLists ) { char *xx=NULL;*xx=0; }
// warning
if ( collnum < 0 ) log(LOG_LOGIC,"net: bad collection. msg5.");
2013-08-03 00:12:24 +04:00
// MUST have this
2013-09-30 23:48:37 +04:00
//if ( rdbId == RDB_TITLEDB && ! msg5b ) {
// log(LOG_LOGIC,"net: No msg5b supplied. 1.");
// char *xx = NULL; *xx = 0;
//}
2013-08-03 00:12:24 +04:00
// . reset the provided list
// . this will not free any mem it may have alloc'd but it will set
// m_listSize to 0 so list->isEmpty() will return true
if ( list ) list->reset();
// key size set
m_ks = getKeySizeFromRdbId(rdbId);
// . complain if endKey < startKey
// . no because IndexReadInfo does this to prevent us from reading
// a list
//if ( startKey > endKey ) return true;
if ( KEYCMP((char *)startKey,(char *)endKey,m_ks)>0 ) return true;
// log("Msg5::readList: startKey > endKey warning");
// we no longer allow negative minRecSizes
if ( minRecSizes < 0 ) {
log(LOG_LOGIC,"net: msg5: MinRecSizes < 0, using 1.");
minRecSizes = 1;
char *xx = NULL; *xx = 0;
}
// ensure startKey last bit clear, endKey last bit set
//if ( (startKey.n0 & 0x01) == 0x01 )
if ( !KEYNEG((char *)startKey) )
log(LOG_REMIND,"net: msg5: StartKey lastbit set.");
// fix endkey
//if ( (endKey.n0 & 0x01) == 0x00 ) {
if ( KEYNEG((char *)endKey) ) {
log(LOG_REMIND,"net: msg5: EndKey lastbit clear. Fixing.");
//endKey.n0 |= 0x01;
*((char *)endKey) |= 0x01;
}
QUICKPOLL(niceness);
// debug msg
//log("doing msg5 niceness=%li",niceness);
//if ( niceness == 1 )
// log("hey!");
// timing debug
//m_startTime = gettimeofdayInMilliseconds();
// remember stuff
m_rdbId = rdbId;
m_collnum = collnum;
2014-07-04 20:53:42 +04:00
// why was this here? it was messing up the statsdb ("graph") link
// in the admin panel.
//CollectionRec *ttt = g_collectiondb.getRec ( m_collnum );
//if ( ! ttt ) {
// g_errno = ENOCOLLREC;
// return true;
//}
2013-08-03 00:12:24 +04:00
m_list = list;
//m_startKey = startKey;
//m_endKey = endKey;
KEYSET(m_startKey,(char *)startKey,m_ks);
KEYSET(m_endKey,(char *)endKey,m_ks);
m_minRecSizes = minRecSizes;
m_includeTree = includeTree;
m_addToCache = addToCache;
m_maxCacheAge = maxCacheAge;
m_startFileNum = startFileNum;
m_numFiles = numFiles;
m_state = state;
m_callback = callback;
m_calledCallback= 0;
m_niceness = niceness;
m_maxRetries = maxRetries;
m_oldListSize = 0;
m_dupsRemoved = 0;
m_compensateForMerge = compensateForMerge;
//m_syncPoint = syncPoint;
m_msg5b = msg5b;
m_isRealMerge = isRealMerge;
m_allowPageCache = allowPageCache;
m_hitDisk = hitDisk;
m_mergeLists = mergeLists;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
2013-08-03 00:12:24 +04:00
// point to cache
//RdbCache *cache = base->m_rdb->getCache();
// . these 2 vars are used for error correction
// . doRemoteLookup is -2 if it's up to us to decide
m_doErrorCorrection = doErrorCorrection;
// these get changed both by cache and gotList()
m_newMinRecSizes = minRecSizes;
m_round = 0;
m_readAbsolutelyNothing = false;
//m_fileStartKey = startKey;
KEYSET(m_fileStartKey,m_startKey,m_ks);
QUICKPOLL(m_niceness);
// get trunc limit by collection now, not just in g_conf
m_indexdbTruncationLimit = 0x7fffffff;
#ifdef _SANITYCHECK_
log("msg5: sk=%s", KEYSTR(m_startKey,m_ks));
log("msg5: ek=%s", KEYSTR(m_endKey,m_ks));
#endif
/*
if ( rdbId == RDB_INDEXDB ) {
CollectionRec *cr = g_collectiondb.getRec ( m_coll );
if ( ! cr ) { log("disk: Msg5 no coll rec."); return true; }
//m_indexdbTruncationLimit = cr->m_indexdbTruncationLimit;
// debug
//log("trunc limit = %li",m_indexdbTruncationLimit);
// Parms.cpp should never let this happen...
if ( m_indexdbTruncationLimit < MIN_TRUNC ) {
log("disk: trunc limit = %li",
m_indexdbTruncationLimit);
char *xx = NULL; *xx = 0;
}
}
*/
// debug msg and stuff
//m_startKey.n1 = 1616550649;
//m_startKey.n0 = (uint64_t)10489958987685363408LL;
2013-08-03 00:12:24 +04:00
//m_includeTree = true;
//m_minRecSizes = 10080000;
//m_startFileNum = 0;
//m_numFiles = -1;
// hack it down
if ( numFiles > base->getNumFiles() )
numFiles = base->getNumFiles();
/*
// if we're storing or reading from cache.. make the cache key now
if ( m_maxCacheAge != 0 || m_addToCache ) {
//if ( cacheKeyPtr ) m_cacheKey = *cacheKeyPtr;
if ( cacheKeyPtr ) KEYSET(m_cacheKey,cacheKeyPtr,m_ks);
//else m_cacheKey = makeCacheKey ( m_startKey ,
else makeCacheKey ( m_startKey ,
m_endKey ,
m_includeTree ,
m_minRecSizes ,
m_startFileNum ,
//m_numFiles );
m_numFiles ,
m_cacheKey ,
m_ks );
}
*/
//log("ck.n1=%lu ck.n0=%llu",m_cacheKey.n1,m_cacheKey.n0);
//exit(-1);
// . make sure we set base above so Msg0.cpp:268 doesn't freak out
// . if startKey is > endKey list is empty
//if ( m_startKey > m_endKey ) return true;
if ( KEYCMP(m_startKey,m_endKey,m_ks)>0 ) return true;
// same if minRecSizes is 0
if ( m_minRecSizes == 0 ) return true;
/*
// bring back cache for posdb. but only if posdb is merging the lists
// like when it does gbdocid:xxx| restriction queries in xmldoc.cpp's
// seo pipe.
if ( m_maxCacheAge &&
m_rdbId == RDB_POSDB &&
//m_mergeLists && // this is obsolete me thinks
getListFromTermListCache ( coll,
m_startKey ,
m_endKey ,
m_maxCacheAge,
list ) )
// list should now be set from the termlistcache!
return true;
// check to see if another request for the exact same termlist
// is in progress. then wait for that if so.
// so use logic like makeCacheKey() was using.
long conti = 0;
m_waitingKey=hash64_cont(m_startKey,m_ks,0LL,&conti);
m_waitingKey=hash64_cont(m_endKey,m_ks,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char *)&m_minRecSizes,4,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char*)&m_startFileNum,4,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char *)&m_numFiles,4,m_waitingKey,&conti);
m_waitingKey=hash64_cont((char *)&m_includeTree,1,m_waitingKey,&conti);
m_waitingKey^= ((uint64_t)rdbId) << (64-8);
2013-08-03 00:12:24 +04:00
// init it?
static bool s_waitInit = false;
if ( ! s_waitInit ) {
s_waitInit = true;
if ( ! g_waitingTable.set(8,4,2048,NULL,0,
true,m_niceness,"m5wtbl"))
log("msg5: failed to init waiting table");
// ignore error
g_errno = 0;
}
// wait for the reply to come in
bool inTable = g_waitingTable.isInTable(&m_waitingKey);
// always add it
void *THIS = this;
bool added = g_waitingTable.addKey(&m_waitingKey,&THIS);
// wait for in-progress reply?
if ( inTable ) {
// log debug msg
if ( added && rdbId == RDB_POSDB && m_addToCache ) {
long long cks;
cks = getTermListCacheKey(m_startKey,m_endKey);
log("msg5: waiting for rdbid=%li wkey=%llx "
"startkey=%s ckey=%llx",
(long)rdbId,m_waitingKey,KEYSTR(m_startKey,m_ks),
cks);
}
// we blocked and are in the waiting table
if ( added )
return false;
// otherwise, the addkey failed
log("msg5: failed to add waitingkey: %s",mstrerror(g_errno));
g_errno = 0;
}
// . otherwise log as well
// . you should NEVER see a dup waiting key or cks because
// it should all be cached!!!!
if ( rdbId == RDB_POSDB && m_addToCache ) {
long long cks = getTermListCacheKey(m_startKey,m_endKey);
log("msg5: hitting disk for rdbid=%li wkey=%llx "
"startkey=%s ckey=%llx",
(long)rdbId,m_waitingKey,KEYSTR(m_startKey,m_ks),cks);
}
*/
/*
// use g_termListCache for single docid lookups
long long singleDocId = 0LL;
if ( rdbId == RDB_POSDB ) {
long long d1 = g_posdb.getDocId(startKey);
long long d2 = g_posdb.getDocId(endKey);
if ( d1+1 == d2 ) singleDocId = d1;
}
// if in the termlist cache, send it back right away
char *trec;
long trecSize;
if ( singleDocId &&
getRecFromTermListCache(coll,
m_startKey,
m_endKey,
20*60, // 20 mins. maxCacheAge,
&trec,
&trecSize) ) {
// if in cache send it back!
m_list->set ( trec , // list data
trecSize , // list data size
trec , // alloc
trecSize , // alloc size
m_startKey ,
m_endKey ,
0 , // fixeddatasize
true , // own data?
true , // use half keys?
m_ks );
// we got it without blocking
return true;
}
// if not found in cache it might be a query we do not match like
// 'track-track'
if ( singleDocId ) {
// just copy ptrs from this list into m_list
m_list->set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_endKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
return true;
}
*/
// . check in cache first
// . we now cache everything
/*
if ( m_maxCacheAge != 0 ) {
log(LOG_DEBUG,"net: Checking cache for list. "
//"startKey.n1=%lu %llu. cacheKey.n1=%lu %llu.",
//m_startKey.n1,m_startKey.n0,
//m_cacheKey.n1,m_cacheKey.n0);
"startKey.n1=%llx %llx. cacheKey.n1=%llx %llx.",
KEY1(m_startKey,m_ks),KEY0(m_startKey),
KEY1(m_cacheKey,m_ks),KEY0(m_cacheKey));
// is this list in the cache?
if ( cache->getList ( base->m_collnum ,
m_cacheKey ,
m_startKey ,
m_list ,
true , // do copy?
m_maxCacheAge ,
true )) {// incCounts?
// debug msg
log(LOG_DEBUG,"net: msg5: Got cache hit for %s.",
base->m_dbname );
// sanity check
//bool ok = m_list->checkList_r ( false , true );
//if ( ! ok ) log("GETLIST had problem");
// break out
//if ( ! ok ) { char *xx = NULL; *xx = 0; }
return true;
}
}
*/
// timing debug
//log("Msg5:getting list startKey.n1=%lu",m_startKey.n1);
// start the read loop - hopefully, will only loop once
if ( readList ( ) ) return true;
// tell Spider.cpp not to nuke us until we get back!!!
m_waitingForList = true;
// we blocked!!! must call m_callback
return false;
2013-08-03 00:12:24 +04:00
}
// . returns false if blocked, true otherwise
// . sets g_errno on error
// . reads from cache, tree and files
// . calls gotList() to do the merge if we need to
// . loops until m_minRecSizes is satisfied OR m_endKey is reached
bool Msg5::readList ( ) {
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
2013-08-03 00:12:24 +04:00
readMore:
// . reset our tree list
// . sets fixedDataSize here in case m_includeTree is false because
// we don't want merge to have incompatible lists
m_treeList.reset();
m_treeList.setFixedDataSize ( base->getFixedDataSize() );
m_treeList.m_ks = m_ks;
// reset Msg3 in case gotList() is called without calling
// Msg3::readList() first
m_msg3.reset();
// assume lists have no errors in them
m_hadCorruption = false;
// . restrict tree's endkey by calling msg3 now...
// . this saves us from spending 1000ms to read 100k of negative
// spiderdb recs from the tree only to have most of the for naught
// . this call will ONLY set m_msg3.m_endKey
// . but only do this if dealing with spiderdb really
// . also now for tfndb, since we scan that in RdbDump.cpp to dedup
// the spiderdb list we are dumping to disk. it is really for any
// time when the endKey is unbounded, so check that now
char *treeEndKey = m_endKey;
bool compute = true;
if ( ! m_includeTree ) compute = false;
// if endKey is "unbounded" then bound it...
char max[MAX_KEY_BYTES]; KEYMAX(max,m_ks);
if ( KEYCMP(m_endKey,max,m_ks) != 0 ) compute = false;
// BUT don't bother if a small list, probably faster just to get it
if ( m_newMinRecSizes < 1024 ) compute = false;
// try to make merge read threads higher priority than
// regular spider read threads
long niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
if ( compute ) {
m_msg3.readList ( m_rdbId ,
m_collnum ,
2013-08-03 00:12:24 +04:00
m_fileStartKey , // modified by gotList()
m_endKey ,
m_newMinRecSizes , // modified by gotList()
m_startFileNum ,
m_numFiles ,
this ,
gotListWrapper ,
niceness ,
0 , // retry num
m_maxRetries , // -1=def
m_compensateForMerge ,
-1,//m_syncPoint ,
true , // just get endKey?
m_allowPageCache );
if ( g_errno ) {
log("db: Msg5: getting endKey: %s",mstrerrno(g_errno));
return true;
}
treeEndKey = m_msg3.m_constrainKey;
}
QUICKPOLL((m_niceness));
// . get the list from our tree
// . set g_errno and return true on error
// . it is crucial that we get tree list before spawning a thread
// because Msg22 will assume that if the TitleRec is in the tree
// now we'll get it, because we need to have the latest version
// of a particular document and this guarantees it. Otherwise, if
// the doc is not in the tree then tfndb must tell its file number.
// I just don't want to think its in the tree then have it get
// dumped out right before we read it, then we end up getting the
// older version rather than the new one in the tree which tfndb
// does not know about until it is dumped out. so we could lose
// recs between the in-memory and on-disk phases this way.
// . however, if we are getting a titlerec, we first read the tfndb
// list from the tree then disk. if the merge replaces the tfndb rec
// we want with another while we are reading the tfndb list from
// disk, then the tfndb rec we got from the tree was overwritten!
// so then we'd read from the wrong title file number (tfn) and
// not find the rec because the merge just removed it. so keeping
// the tfndb recs truly in sync with the titledb recs requires
// some dancing. the simplest way is to just scan all titleRecs
// in the event of a disagreement... so turn on m_scanAllIfNotFound,
// which essentially disregards tfndb and searches all the titledb
// files for the titleRec.
if ( m_includeTree ) {
// get the mem tree for this rdb
RdbTree *tree = base->m_rdb->getTree();
// how many recs are deletes in this list?
long numNegativeRecs = 0;
long numPositiveRecs = 0;
// set start time
long long start ;
if ( m_newMinRecSizes > 64 )
start = gettimeofdayInMilliseconds();
// save for later
m_treeMinRecSizes = m_newMinRecSizes;
// . returns false on error and sets g_errno
// . endKey of m_treeList may be less than m_endKey
char *structName = "tree";
if(base->m_rdb->useTree()) {
if ( ! tree->getList ( base->m_collnum ,
m_fileStartKey ,
treeEndKey ,
m_newMinRecSizes ,
&m_treeList ,
&numPositiveRecs , // # pos
&numNegativeRecs , // # neg
base->useHalfKeys() ) )
return true;
// debug msg
}
else {
RdbBuckets *buckets = &base->m_rdb->m_buckets;
if ( ! buckets->getList ( base->m_collnum ,
m_fileStartKey ,
treeEndKey ,
m_newMinRecSizes ,
&m_treeList ,
&numPositiveRecs ,
&numNegativeRecs ,
base->useHalfKeys() )) {
return true;
}
structName = "buckets";
}
long long now ;
if ( m_newMinRecSizes > 64 ) {
now = gettimeofdayInMilliseconds();
long long took = now - start ;
if ( took > 9 )
logf(LOG_INFO,"net: Got list from %s "
"in %llu ms. size=%li db=%s "
"niceness=%li.",
structName, took,m_treeList.getListSize(),
base->m_dbname,m_niceness);
}
// if our recSize is fixed we can boost m_minRecSizes to
// compensate for these deletes when we call m_msg3.readList()
long rs = base->getRecSize() ;
// . use an avg. rec size for variable-length records
// . just use tree to estimate avg. rec size
if ( rs == -1) {
if(base->m_rdb->useTree()) {
// how much space do all recs take up in the tree?
long totalSize = tree->getMemOccupiedForList();
// how many recs in the tree
long numRecs = tree->getNumUsedNodes();
// get avg record size
if ( numRecs > 0 ) rs = totalSize / numRecs;
// add 10% for deviations
rs = (rs * 110) / 100;
// what is the minimal record size?
long minrs = sizeof(key_t) + 4;
// ensure a minimal record size
if ( rs < minrs ) rs = minrs;
}
else {
RdbBuckets *buckets = &base->m_rdb->m_buckets;
rs = buckets->getNumKeys() /
buckets->getMemOccupied();
long minrs = buckets->getRecSize() + 4;
// ensure a minimal record size
if ( rs < minrs ) rs = minrs;
}
}
// . TODO: get avg recSize in this rdb (avgRecSize*numNeg..)
// . don't do this if we're not merging because it makes
// it harder to compute the # of bytes to read used to
// pre-allocate a reply buf for Msg0 when !m_doMerge
// . we set endKey for spiderdb when reading from tree above
// based on the current minRecSizes so do not mess with it
// in that case.
if ( m_rdbId != RDB_SPIDERDB ) {
//m_newMinRecSizes += rs * numNegativeRecs;
long nn = m_newMinRecSizes + rs * numNegativeRecs;
if ( rs > 0 && nn < m_newMinRecSizes ) nn = 0x7fffffff;
m_newMinRecSizes = nn;
}
// . if m_endKey = m_startKey + 1 and our list has a rec
// then no need to check the disk, it was in the tree
// . it could be a negative or positive record
// . tree can contain both negative/positive recs for the key
// so we should do the final merge in gotList()
// . that can happen because we don't do an annihilation
// because the positive key may be being dumped out to disk
// but it really wasn't and we get stuck with it
//key_t kk = m_startKey ;
//kk += (unsigned long)1;
//if ( m_endKey == kk && ! m_treeList.isEmpty() ) {
char kk[MAX_KEY_BYTES];
KEYSET(kk,m_startKey,m_ks);
KEYADD(kk,1,m_ks);
// no no no.... gotList() might be returning false because
// it's doing a threaded call to merge_r to take out
// the negative recs i guess...
if ( KEYCMP(m_endKey,kk,m_ks)==0 && ! m_treeList.isEmpty() ) {
return gotList(); } // return true; }
}
// if we don't use the tree then at least set the key bounds cuz we
// pick the min endKey between diskList and treeList below
else m_treeList.set ( m_fileStartKey , m_endKey );
// . if we're reading indexlists from 2 or more sources then some
// will probably be compressed from 12 byte keys to 6 byte keys
// . it is typically only about 1% when files are small,
// and smaller than that when a file is large
// . but just to be save reading an extra 2% won't hurt too much
if ( base->useHalfKeys() ) {
long numSources = m_numFiles;
if ( numSources == -1 )
numSources = base->getNumFiles();
// if tree is empty, don't count it
if ( m_includeTree && ! m_treeList.isEmpty() ) numSources++;
// . if we don't do a merge then we return the list directly
// (see condition where m_numListPtrs == 1 below)
// from Msg3 (or tree) and we must hit minRecSizes as
// close as possible for Msg3's call to constrain() so
// we don't overflow the UdpSlot's TMPBUFSIZE buffer
// . if we just arbitrarily boost m_newMinRecSizes then
// the single list we get back from Msg3 will not have
// been constrained with m_minRecSizes, but constrained
// with m_newMinRecSizes (x2%) and be too big for our UdpSlot
if ( numSources >= 2 ) {
long long newmin = (long long)m_newMinRecSizes ;
newmin = (newmin * 50LL) / 49LL ;
// watch out for wrap around
if ( (long)newmin < m_newMinRecSizes )
m_newMinRecSizes = 0x7fffffff;
else m_newMinRecSizes = (long)newmin;
}
}
2014-03-16 01:56:11 +04:00
// limit to 20MB so we don't go OOM!
if ( m_newMinRecSizes > 2 * m_minRecSizes &&
m_newMinRecSizes > 20000000 )
m_newMinRecSizes = 20000000;
2013-08-03 00:12:24 +04:00
QUICKPOLL((m_niceness));
// debug msg
//log("msg5 calling msg3 for %li bytes (msg5=%lu)",
// m_newMinRecSizes,(long)this);
// . it's pointless to fetch data from disk passed treeList's endKey
// . he only differs from m_endKey if his listSize is at least
// newMinRecSizes
//key_t diskEndKey = m_treeList.getEndKey();
char *diskEndKey = m_treeList.getEndKey();
// sanity check
if ( m_treeList.m_ks != m_ks ) { char *xx = NULL; *xx = 0; }
// we are waiting for the list
//m_waitingForList = true;
2013-08-03 00:12:24 +04:00
// clear just in case
g_errno = 0;
// . now get from disk
// . use the cache-modified constraints to reduce reading time
// . return false if it blocked
// . if compensateForMerge is true then m_startFileNum/m_numFiles
// will be appropriately mapped around the merge
if ( ! m_msg3.readList ( m_rdbId ,
m_collnum ,
2013-08-03 00:12:24 +04:00
m_fileStartKey , // modified by gotList()
diskEndKey ,
m_newMinRecSizes , // modified by gotList()
m_startFileNum ,
m_numFiles ,
this ,
gotListWrapper ,
niceness ,
0 , // retry num
m_maxRetries , // max retries (-1=def)
m_compensateForMerge ,
-1,//m_syncPoint ,
false ,
m_allowPageCache ,
m_hitDisk ))
return false;
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . updates m_newMinRecSizes
// . updates m_fileStartKey to the endKey of m_list + 1
if ( ! gotList () ) return false;
// bail on error from gotList() or Msg3::readList()
if ( g_errno ) return true;
// we may need to re-call getList
if ( needsRecall() ) goto readMore;
// we did not block
return true;
}
/*
// we were in the g_waitingTable waiting for a list to be read that was already
// in progress when our request came in.
void Msg5::copyAndSendBackList ( RdbList *listSrc ) {
// this returns false and sets g_errno on error
if ( ! m_list->copyList(listSrc) )
log("msg5: copylist failed: %s",mstrerror(g_errno));
// set it now
m_calledCallback = 1;
// when completely done call the callback
m_callback ( m_state , m_list , this );
}
*/
bool Msg5::needsRecall ( ) {
bool logIt;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
// if collection was deleted from under us, base will be NULL
if ( ! base && ! g_errno ) {
2014-04-25 22:52:17 +04:00
log("msg5: base lost for rdbid=%li collnum %li",
(long)m_rdbId,(long)m_collnum);
g_errno = ENOCOLLREC;
return false;
}
2013-08-03 00:12:24 +04:00
// sanity check
if ( ! base && ! g_errno ) { char *xx=NULL;*xx=0; }
// . return true if we're done reading
// . sometimes we'll need to read more because Msg3 will shorten the
// endKey to better meat m_minRecSizes but because of
// positive/negative record annihilation on variable-length
// records it won't read enough
if ( g_errno ) goto done;
if ( m_newMinRecSizes <= 0 ) goto done;
if ( ! m_mergeLists ) goto done;
// limit to just doledb for now in case it results in data loss
if(m_readAbsolutelyNothing&&
(m_rdbId==RDB_DOLEDB||m_rdbId==RDB_SPIDERDB ) )
goto done;
// seems to be ok, let's open it up to fix this bug where we try
// to read too many bytes a small titledb and it does an infinite loop
if ( m_readAbsolutelyNothing ) {
log("rdb: read absolutely nothing more for dbname=%s on cn=%li",
base->m_dbname,(long)m_collnum);
goto done;
}
2013-08-03 00:12:24 +04:00
//if ( m_list->getEndKey() >= m_endKey ) goto done;
if ( KEYCMP(m_list->getEndKey(),m_endKey,m_ks)>=0 ) goto done;
// if this is NOT the first round and the sum of all our list sizes
// did not increase, then we hit the end...
//
// i think this is sometimes stopping us short. we need to verify
// that each list (from tree and files) is exhausted... which
// the statement above does... !!! it was causing us to miss urls
// in doledb and think a collection was done spidering. i think
// maybe because the startkey of each list would change since we
// merge them and accumulate into m_list. a better way would be
// to make sure doledb or any rdb dumps and tight merges when
// we start having a lot of key annihilations.
/*
if ( m_round >= 1 && m_totalSize == m_lastTotalSize ) {
2014-05-22 21:36:44 +04:00
log("msg5: increasing minrecsizes did nothing. assuming done. "
"db=%s (newsize=%li origsize=%li total "
"got %li totalListSizes=%li sk=%s) "
"cn=%li this=0x%lx round=%li.",
base->m_dbname ,
m_newMinRecSizes,
m_minRecSizes,
m_list->m_listSize,
m_totalSize,
KEYSTR(m_startKey,m_ks),
(long)m_collnum,(long)this, m_round );
goto done;
}
*/
// ok, make sure if we go another round at least one list gains!
m_lastTotalSize = m_totalSize;
2013-08-03 00:12:24 +04:00
/*
// sanity check
if ( m_indexdbTruncationLimit < MIN_TRUNC ) {
log("disk: trunc limit2 = %li", m_indexdbTruncationLimit);
char *xx = NULL; *xx = 0;
}
// if we are limited by truncation then we are done
if ( base->useHalfKeys() &&
base->m_rdb != g_tfndb.getRdb() &&
//m_prevCount >= g_indexdb.getTruncationLimit() &&
m_prevCount >= m_indexdbTruncationLimit &&
g_indexdb.getTermId(*(key_t *)m_startKey) ==
g_indexdb.getTermId(*(key_t *)m_endKey) )
goto done;
*/
// debug msg
//if ( g_conf.m_timingDebugEnabled )
// this is kinda important. we have to know if we are abusing
// the disk... we should really keep stats on this...
logIt = true;
// seems to be very common for doledb, so don't log unless extreme
//if ( m_rdbId == RDB_DOLEDB && m_round < 15 ) logIt = false;
2014-01-16 07:08:47 +04:00
if ( m_round > 100 && (m_round % 1000) != 0 ) logIt = false;
2014-03-16 01:56:11 +04:00
// seems very common when doing rebalancing then merging to have
// to do at least one round of re-reading, so note that
if ( m_round == 0 ) logIt = false;
// so common for doledb because of key annihilations
if ( m_rdbId == RDB_DOLEDB && m_round < 10 ) logIt = false;
2013-08-03 00:12:24 +04:00
if ( logIt )
logf(LOG_DEBUG,"db: Reading %li again from %s (need %li total "
2014-05-22 21:36:44 +04:00
"got %li totalListSizes=%li sk=%s) "
"cn=%li this=0x%lx round=%li.",
2013-08-03 00:12:24 +04:00
m_newMinRecSizes , base->m_dbname , m_minRecSizes,
2014-05-22 21:36:44 +04:00
m_list->m_listSize,
m_totalSize,
KEYSTR(m_startKey,m_ks),
(long)m_collnum,(long)this, m_round );
2014-01-16 07:08:47 +04:00
m_round++;
2013-08-03 00:12:24 +04:00
// record how many screw ups we had so we know if it hurts performance
base->m_rdb->didReSeek ( );
// try to read more from disk
return true;
done:
// . reset merged list ptr
// . merge_r() never rests the list ptr, m_listPtr
if ( m_list ) m_list->resetListPtr();
/*
// bring back termlist caching for posdb for gbdocid:xxxx| queries
// in the seo pipeline in xmldoc.cpp because we need to do like
// 1M of those things ultra fast!
if ( m_addToCache && ! g_errno && m_rdbId == RDB_POSDB )
addToTermListCache(m_coll,m_startKey,m_endKey,m_list);
// send back replies to others that are waiting
callbackLoop:
long slot = g_waitingTable.getSlot(&m_waitingKey);
// valid?
if ( slot >= 0 ) {
// get it
Msg5 *THIS = *(Msg5 **)g_waitingTable.getValueFromSlot(slot);
// sanity. must have same start keys!
if ( KEYCMP(THIS->m_startKey,m_startKey,m_ks) != 0 ) {
char *xx=NULL;*xx=0; }
// do not call this for the original request
if ( THIS != this ) THIS->copyAndSendBackList ( m_list );
// delete it
g_waitingTable.deleteSlot(slot);
// another one
goto callbackLoop;
}
*/
/*
// add finalized list to cache if we should
if ( m_addToCache && ! g_errno ) {
// point to cache
RdbCache *cache = base->m_rdb->getCache();
// sanity check
//bool ok = m_list->checkList_r ( false , true );
// break out
//if ( ! ok ) { char *xx = NULL; *xx = 0; }
//if ( ! ok ) log("ADDLIST had problem");
// add it if its ok
//if ( ok ) m_cache->addList ( m_cacheKey, m_list ) ;
cache->addList ( base->m_collnum , m_cacheKey, m_list ) ;
// ignore errors
g_errno = 0;
}
*/
// return false cuz we don't need a recall
return false;
}
void gotListWrapper ( void *state ) {
Msg5 *THIS = (Msg5 *) state;
// . this sets g_errno on error
// . this will merge cache/tree and disk lists into m_list
// . it will update m_newMinRecSizes
// . it will also update m_fileStartKey to the endKey of m_list + 1
// . returns false if it blocks
if ( ! THIS->gotList ( ) ) return;
// . throw it back into the loop if necessary
// . only returns true if COMPLETELY done
if ( THIS->needsRecall() && ! THIS->readList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// set it now
THIS->m_calledCallback = 1;
// we are no longer waiting for the list
THIS->m_waitingForList = false;
2013-08-03 00:12:24 +04:00
// when completely done call the callback
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
static void threadDoneWrapper ( void *state , ThreadEntry *t ) ;
static void *mergeListsWrapper_r ( void *state , ThreadEntry *t ) ;
//static void gotListWrapper2 ( void *state , RdbList *list , Msg5 *msg5);
#define TFNDBMINRECSIZES (256*1024)
// . this is the NEW gotList() !!! mdw
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg5::gotList ( ) {
// we are no longer waiting for the list
//m_waitingForList = false;
2013-08-03 00:12:24 +04:00
// debug msg
//log("msg5 got lists from msg3 (msg5=%lu)",(long)this);
// return if g_errno is set
if ( g_errno && g_errno != ECORRUPTDATA ) return true;
// if reading from titledb, read corresponding tfndb list so
// we can remove overwritten titleRecs
if ( m_rdbId != RDB_TITLEDB ) return gotList2();
// if not merging files on disk, skip this stuff
if ( ! m_isRealMerge ) return gotList2();
// this is now obsolete!
return gotList2();
}
/*
// The Tfndb Tfn Bug Fix. We've had this bug for over a year. Now we
// need to load the corresponding tfndb list with every titledb list
// so we can remove titleRecs that are from files whose tfn does not
// match the ones in the tfndblist for that docid.
// This will remove titlerecs that would only get removed in a tight
// merge. But even more importantly, fixes the problem that when
// merging to older titledb files, the tfndb rec of a titlerec in
// that merge gets re-added to tfndb, and override the newer one in
// tfndb that corresponds to a newer titledb file that contains the
// document.
// We load the tfndb list after the titledb list because if a titlerec
// got added to the tree just after we loaded the titledb list, then
//
// MUST have this
if ( ! m_msg5b ) {
log(LOG_LOGIC,"net: No msg5b supplied.");
char *xx = NULL; *xx = 0;
}
m_time1 = gettimeofdayInMilliseconds();
long long docId1 =g_titledb.getDocIdFromKey((key_t *)m_fileStartKey);
long long docId2 =g_titledb.getDocIdFromKey((key_t *)m_msg3.m_endKey);
key_t startKey = g_tfndb.makeMinKey ( docId1 ) ;
key_t endKey = g_tfndb.makeMaxKey ( docId2 ) ;
QUICKPOLL((m_niceness));
//endKey.setMax();
// the tfndb list is often already in the page cache, so this is fast
if ( ! m_msg5b->getList ( RDB_TFNDB ,
m_coll ,
&m_tfndbList,
startKey ,
endKey ,
TFNDBMINRECSIZES , // minRecSizes
true , // includeTree
false , // addToCache
0 , // maxCacheAge
0 , // startFileNum
-1 , // numFiles
this , // state
gotListWrapper2, // callback
m_niceness ,
false , // do error correction
NULL , // cacheKeyPtr
0 , // retryNum
m_maxRetries, // was 5 -- maxRetries
true , // compensateForMerge
-1LL , // syncpoint
NULL , // msg5b
false , // isRealMerge
m_allowPageCache ))
return false;
return gotList2();
}
void gotListWrapper2 ( void *state , RdbList *list , Msg5 *msg5 ) {
Msg5 *THIS = (Msg5 *)state;
if ( ! THIS->gotList2() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// set it now
THIS->m_calledCallback = 2;
// call the original callback
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
*/
// . this is the NEW gotList() !!! mdw
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool Msg5::gotList2 ( ) {
// reset this
m_startTime = 0LL;
// return if g_errno is set
if ( g_errno && g_errno != ECORRUPTDATA ) return true;
// put all the lists in an array of list ptrs
long n = 0;
// all the disk lists
for ( long i = 0 ; n < MAX_RDB_FILES && i<m_msg3.getNumLists(); i++ ) {
// . skip list if empty
// . was this causing problems?
if ( ! m_isRealMerge ) {
RdbList *list = m_msg3.getList(i);
if ( list->isEmpty() ) continue;
}
// . remember the tfn, the secondary id
// . the tfn is used by tfndb to map a docid to a titledb
// file. each tfndb record has the tfn. each titledb file
// has a tfn (secondary id, aka id2)
//if ( m_rdbId == RDB_TITLEDB ) m_tfns [n] = m_msg3.getTfn(i);
m_listPtrs [ n++ ] = m_msg3.getList(i);
}
QUICKPOLL(m_niceness);
// sanity check.
if ( m_msg3.getNumLists() > MAX_RDB_FILES )
log(LOG_LOGIC,"db: Msg3 had more than %li lists.",
(long)MAX_RDB_FILES);
// . get smallest endKey from all the lists
// . all lists from Msg3 should have the same endKey, but
// m_treeList::m_endKey may differ
// . m_treeList::m_endKey should ALWAYS be >= that of the files
// . constrain m_treeList to the startKey/endKey of the files
//m_minEndKey = m_endKey;
KEYSET(m_minEndKey,m_endKey,m_ks);
for ( long i = 0 ; i < n ; i++ ) {
//if ( m_listPtrs[i]->getEndKey() < m_minEndKey )
// m_minEndKey = m_listPtrs[i]->getEndKey();
// sanity check
//if ( KEYNEG(m_listPtrs[i]->getEndKey()) ) {
// char *xx=NULL;*xx=0; }
2014-01-22 10:39:01 +04:00
if ( KEYCMP(m_listPtrs[i]->getEndKey(),m_minEndKey,m_ks)<0 ) {
2013-08-03 00:12:24 +04:00
KEYSET(m_minEndKey,m_listPtrs[i]->getEndKey(),m_ks);
2014-01-22 10:39:01 +04:00
// crap, if list is all negative keys, then the
// end key seems negative too! however in this
// case RdbScan::m_endKey seems positive so
// maybe we got a negative endkey in constrain?
//if (! (m_minEndKey[0] & 0x01) )
// log("msg5: list had bad endkey");
}
2013-08-03 00:12:24 +04:00
}
// sanity check
//if ( KEYNEG( m_minEndKey) ) {char *xx=NULL;*xx=0; }
/*
// if we got a tfndblist, constrain the title rec lists to its
// transformed endkey. we only read in up to 500k of tfndb list so if
// merging two really small titledb files we could potentially be
// reading in a much bigger tfndb list.
if ( m_rdbId == RDB_TITLEDB && m_isRealMerge && ! g_errno ) {
long long time2 = gettimeofdayInMilliseconds();
long long diff = time2 - m_time1 ;
log(LOG_DEBUG,"db: Read tfndblist in %lli ms "
"(size=%li).",diff,m_tfndbList.m_listSize);
// cut it down to m_msg3.m_endKey because that's what we used
// to constrain this tfndb list read
//if ( m_msg3.m_constrainKey < m_minEndKey ) {
// log(LOG_DEBUG,"db: Constraining to tfndb Msg3 "
// "m_endKey.");
// m_minEndKey = m_msg3.m_constrainKey ;
//}
// only mess with m_minEndKey if our list was NOT limited
// by it. if we were not limited by it, our endKey should
// really be virtual inifinite. because the most our endKey
// will ever be is g_tfndb.makeMaxKey ( docIdMAX ) as can
// be seen above.
if ( m_tfndbList.m_listSize >= TFNDBMINRECSIZES ) {
// constrain titledb lists to tfndb's endkey if
// it's smaller
//key_t ekey = m_tfndbList.getEndKey();
char *ekey = m_tfndbList.getEndKey();
long long docid = g_tfndb.getDocId ( (key_t *)ekey );
if ( docid > 0 ) docid = docid - 1;
//key_t nkey = g_titledb.makeLastKey ( docid );
char nkey[MAX_KEY_BYTES];
key_t trk = g_titledb.makeLastKey ( docid );
KEYSET ( nkey , (char *)&trk , m_ks );
// sanity check
//if ( g_titledb.getKeySize() != m_ks ) {
// char *xx = NULL; *xx = 0; }
// only do constrain if docid is not 0
//if ( docid > 0 && nkey < m_minEndKey ) {
if ( docid > 0 && KEYCMP(nkey,m_minEndKey,m_ks)<0 ) {
log(LOG_DEBUG,"db: Tfndb had min key: "
//"0x%llx",nkey.n0);
"0x%llx",KEY0(nkey));
//m_minEndKey = nkey;
KEYSET(m_minEndKey,nkey,m_ks);
}
}
}
*/
QUICKPOLL(m_niceness);
// . is treeList included?
// . constrain treelist for the merge
// . if used, m_listPtrs [ m_numListPtrs - 1 ] MUST equal &m_treeList
// since newer lists are listed last so their records override older
if ( m_includeTree && ! m_treeList.isEmpty() ) {
// only constrain if we are NOT the sole list because the
// constrain routine sets our endKey to virtual infinity it
// seems like and that makes SpiderCache think that spiderdb
// is exhausted when it is only in the tree. so i added the
// if ( n > 0 ) condition here.
if ( n > 0 ) {
char k[MAX_KEY_BYTES];
m_treeList.getCurrentKey(k);
m_treeList.constrain ( m_startKey ,
m_minEndKey ,
-1 , // min rec sizes
0 , // hint offset
//m_treeList.getCurrentKey() ,
k,
"tree" ,
m_niceness );
}
//if ( m_rdbId == RDB_TITLEDB ) m_tfns [n] = 255;
m_listPtrs [ n++ ] = &m_treeList;
}
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
2013-08-03 00:12:24 +04:00
// if not enough lists, use a dummy list to trigger merge so tfndb
// filter happens and we have a chance to weed out old titleRecs
if ( m_rdbId == RDB_TITLEDB && m_numFiles != 1 && n == 1 &&
m_isRealMerge ) {
//log(LOG_LOGIC,"db: Adding dummy list.");
2013-08-03 00:12:24 +04:00
//m_tfns [n] = 255;
m_dummy.set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_minEndKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
m_listPtrs [ n++ ] = &m_dummy;
}
// bitch
if ( n >= MAX_RDB_FILES )
log(LOG_LOGIC,"net: msg5: Too many lists (%li | %li).",
m_msg3.getNumLists() , n);
// store # of lists here for use by the call to merge_r()
m_numListPtrs = n;
// count the sizes
m_totalSize = 0;
for ( long i = 0 ; i < m_numListPtrs ; i++ )
m_totalSize += m_listPtrs[i]->getListSize();
QUICKPOLL(m_niceness);
// . but don't breach minRecSizes
// . this totalSize is just to see if we should spawn a thread, really
//if ( totalSize > m_minRecSizes ) m_totalSize = m_minRecSizes;
#ifdef _SANITYCHECK_
// who uses this now?
//log("Msg5:: who is merging?????");
// timing debug
// m_startKey.n1,
// gettimeofdayInMilliseconds()-m_startTime ,
// m_diskList.getListSize());
// ensure both lists are legit
// there may be negative keys in the tree
// diskList may now also have negative recs since Msg3 no longer
// removes them for fears of delayed positive keys not finding their
// negative key because it was merged out by RdbMerge
for ( long i = 0 ; i < m_numListPtrs ; i++ )
m_listPtrs[i]->checkList_r ( false , true );
#endif
// if not doing merge, we're all done!
if ( ! m_mergeLists )
return doneMerging();
// . if no lists we're done
// . if we were a recall, then list may not be empty
if ( m_numListPtrs == 0 && m_list->isEmpty() ) {
// just copy ptrs from this list into m_list
m_list->set ( NULL , // list data
0 , // list data size
NULL , // alloc
0 , // alloc size
m_startKey ,
m_endKey ,
base->getFixedDataSize() ,
true , // own data?
base->useHalfKeys() ,
m_ks );
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
if ( m_numListPtrs == 0 ) m_readAbsolutelyNothing = true;
// if all lists from msg3 were 0... tree still might have something
if ( m_totalSize == 0 && m_treeList.isEmpty() )
m_readAbsolutelyNothing = true;
2013-08-03 00:12:24 +04:00
// if msg3 had corruption in a list which was detected in contrain_r()
if ( g_errno == ECORRUPTDATA ) {
// if we only had one list, we were not doing a merge
// so return g_errno to the requested so he tries from the
// twin
if ( m_numListPtrs == 1 ) return true;
// assume nothing is wrong
g_errno = 0;
// if m_doErrorCorrection is true, repairLists_r() should fix
}
QUICKPOLL((m_niceness));
// . should we remove negative recs from final merged list?
// . if we're reading from root and tmp merge file of root
// . should we keep this out of the thread in case a file created?
long fn = 0;
if ( base->m_numFiles > 0 ) fn = base->m_fileIds[m_startFileNum];
if ( fn == 0 || fn == 1 ) m_removeNegRecs = true;
else m_removeNegRecs = false;
// . if we only have one list, just use it
// . Msg3 should have called constrain() on it so it's m_list so
// m_listEnd and m_listSize all fit m_startKey/m_endKey/m_minRecSizes
// to a tee
// . if it's a tree list it already fits to a tee
// . same with cache list?? better be...
// . if we're only reading one list it should always be empty right?
// . i was getting negative keys in my RDB_DOLEDB list which caused
// Spider.cpp to core, so i add the "! m_removeNegRecs" constraint
// here...
// . TODO: add some code to just filter out the negative recs
// super quick just for this purpose
// . crap, rather than do that just deal with the negative recs
// in the caller code... in this case Spider.cpp::gotDoledbList2()
if ( m_numListPtrs == 1 && m_list->isEmpty() &&//&&!m_removeNegRecs
// just do this logic for doledb now, it was causing us to
// return search results whose keys were negative indexdb keys.
// or later we can just write some code to remove the neg
// recs from the single list!
( m_rdbId == RDB_LINKDB || m_rdbId == RDB_DOLEDB ||
// this speeds up our queryloop querylog parsing in
// seo.cpp quite a bit
(m_rdbId == RDB_POSDB && m_numFiles==1) ) ) {
// log any problems
if ( ! m_listPtrs[0]->m_ownData ) {
log(LOG_LOGIC,"db: Msg5: list does not own data.");
goto skip;
}
// . bitch if not empty
// . NO! might be our second time around if we had key
// annihilations between file #0 and the tree, and now
// we only have 1 non-empty list ptr, either from the tree
// or from the file
//if ( ! m_list->isEmpty() )
// log("Msg5::gotList: why is it not empty? size=%li",
// m_list->getListSize() );
// just copy ptrs from this list into m_list
m_list->set ( m_listPtrs[0]->getList () ,
m_listPtrs[0]->getListSize () ,
m_listPtrs[0]->getAlloc () ,
m_listPtrs[0]->getAllocSize () ,
m_listPtrs[0]->getStartKey () ,
m_listPtrs[0]->getEndKey () ,
m_listPtrs[0]->getFixedDataSize () ,
true , // own data?
m_listPtrs[0]->useHalfKeys () ,
m_ks );
// ensure we don't free it when we loop on freeLists() below
m_listPtrs[0]->setOwnData ( false );
// gotta set this too!
if ( m_listPtrs[0]->m_lastKeyIsValid )
m_list->setLastKey ( m_listPtrs[0]->m_lastKey );
// . remove titleRecs that shouldn't be there
// . if the tfn of the file we read the titlerec from does not
// match the one in m_tfndbList, then remove it
// . but if we're not merging lists, why remove it?
//if ( m_rdbId == RDB_TITLEDB && m_msg3.m_numFileNums > 1 )
// stripTitleRecs ( m_list , m_tfns[0] , m_tfndbList );
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
skip:
// time the perparation and merge
m_startTime = gettimeofdayInMilliseconds();
// . merge the lists
// . the startKey of the finalList is m_startKey, the first time
// . but after that, we're adding diskLists, so us m_fileStartKey
// . we're called multiple times for the same look-up in case of
// delete records in a variable rec-length db cause some recs in our
// disk lookups to be wiped out, thereby falling below minRecSizes
// . this will set g_errno and return false on error (ENOMEM,...)
// . older list goes first so newer list can override
// . remove all negative-keyed recs since Msg5 is a high level msg call
// . prepare for the merge, grows the buffer
// . this returns false and sets g_errno on error
// . should not affect the current list in m_list, only build on top
if ( ! m_list->prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing to merge lists from %s: %s",
base->m_dbname,mstrerror(g_errno));
return true;
}
QUICKPOLL((m_niceness));
#ifdef _TESTNEWALGO_
// for testing useBigRootList
if ( ! m_list2.prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing to merge lists from %s: %s",
base->m_dbname,mstrerror(g_errno));
return true;
}
#endif
// . if size < 32k of don't bother with thread, should be < ~1 ms
// . it seems to be about 1ms per 10k for tfndb recs
// . it seems to core dump if we spawn a thread with totalSizes too low
// . why???
if ( m_totalSize < 32*1024 ) goto skipThread;
// if we are an interruptible niceness 1, do not use a thread,
// we can be interrupted by the alarm callback and serve niceness
// 0 requests, that is probably better! although the resolution is
// on like 10ms on those alarms... BUT if you use a smaller
// mergeBufSize of like 100k, that might make it responsive enough!
// allow it to do a thread again so we can take advantage of
// multiple cores, or hyperthreads i guess because i am seeing
// some missed quickpoll log msgs, i suppose because we did not
// insert QUICKPOLL() statements in the RdbList::merge_r() code
//if ( m_niceness >= 1 ) goto skipThread;
// supder duper hack!
//if ( m_rdbId == RDB_REVDB ) goto skipThread;
// i'm not sure why we core in Msg5's call to RdbList::merge_r().
// the list appears to be corrupt...
//if ( m_rdbId == RDB_FACEBOOKDB ) goto skipThread;
// skip it for now
//goto skipThread;
//m_waitingForMerge = true;
2013-08-03 00:12:24 +04:00
// . if size is big, make a thread
// . let's always make niceness 0 since it wasn't being very
// aggressive before
if ( g_threads.call ( MERGE_THREAD , // threadType
m_niceness , // m_niceness ,
this , // state data for callback
threadDoneWrapper ,
mergeListsWrapper_r ) )
return false;
//m_waitingForMerge = false;
2013-08-03 00:12:24 +04:00
// thread creation failed
if ( g_errno ) // g_conf.m_useThreads && ! g_threads.m_disabled )
2013-08-03 00:12:24 +04:00
log(LOG_INFO,
"net: Failed to create thread to merge lists. Doing "
"blocking merge. (%s)",mstrerror(g_errno));
2013-08-03 00:12:24 +04:00
// clear g_errno because it really isn't a problem, we just block
g_errno = 0;
// come here to skip the thread
skipThread:
// repair any corruption
repairLists_r();
// do it
mergeLists_r ();
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
return doneMerging();
}
// thread will run this first
void *mergeListsWrapper_r ( void *state , ThreadEntry *t ) {
// we're in a thread now!
Msg5 *THIS = (Msg5 *)state;
// debug msg
//log("Msg5::mergeListsWrapper: begining threaded merge!");
// repair any corruption
THIS->repairLists_r();
// do the merge
THIS->mergeLists_r();
// now cleanUp wrapper will call it
//pthread_exit ( NULL );
// bogus return
return NULL;
}
// . now we're done merging
// . when the thread is done we get control back here, in the main process
void threadDoneWrapper ( void *state , ThreadEntry *t ) {
// we MAY be in a thread now
Msg5 *THIS = (Msg5 *)state;
// debug msg
//log("msg3 back from merge thread (msg5=%lu)",THIS->m_state);
// . add m_list to our cache if we should
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
if ( ! THIS->doneMerging() ) return;
// . throw it back into the loop if necessary
// . only returns true if COMPLETELY done
if ( THIS->needsRecall() && ! THIS->readList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// we are no longer waiting for the list
THIS->m_waitingForList = false;
2013-08-03 00:12:24 +04:00
// set it now
THIS->m_calledCallback = 3;
// when completely done call the callback
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
// check lists in the thread
void Msg5::repairLists_r ( ) {
// assume none
m_hadCorruption = false;
// return if no need to
if ( ! m_doErrorCorrection ) return;
// or if msg3 already check them and they were ok
if ( m_msg3.m_listsChecked ) return;
// if msg3 said they were corrupt... this happens when the map
// is generated over a bad data file and ends up writing the same key
// on more than 500MB worth of data. so when we try to read a list
// that has the startkey/endkey covering that key, the read size
// is too big to ever happen...
if ( m_msg3.m_hadCorruption ) m_hadCorruption = true;
// time it
//m_checkTime = gettimeofdayInMilliseconds();
for ( long i = 0 ; i < m_numListPtrs ; i++ ) {
// . did it breech our minRecSizes?
// . only check for indexdb, our keys are all size 12
// . is this a mercenary problem?
// . cored on 'twelfth night cake'
// . no... this happens after merging the lists. if we had
// a bunch of negative recs we over read anticipating some
// recs will be deleted, so it isn't really necessary to
// bitch about this here..
if ( g_conf.m_logDebugDb &&
m_rdbId == RDB_POSDB &&
m_listPtrs[i]->m_listSize > m_minRecSizes + 12 )
// just log it for now, maybe force core later
log(LOG_DEBUG,"db: Index list size is %li but "
"minRecSizes is %li.",
m_listPtrs[i]->m_listSize ,
m_minRecSizes );
// this took like 50ms (-O3) on lenny on a 4meg list
bool status = m_listPtrs[i]->checkList_r(false,
// sleep on corruption if doing a sanity check (core dumps)
#ifdef _SANITYCHECK_
true
#else
false
#endif
);
// if no errors, check the next list
if ( status ) continue;
// . show the culprit file
// . logging the key ranges gives us an idea of how long
// it will take to patch the bad data
long nn = m_msg3.m_numFileNums;
2014-09-30 07:16:39 +04:00
// TODO: fix this. can't call Collectiondb::getBase from
// within a thread!
RdbBase *base = getRdbBase ( m_rdbId , m_collnum );
2013-08-03 00:12:24 +04:00
if ( i < nn && base ) {
long fn = m_msg3.m_fileNums[i];
BigFile *bf = base->getFile ( fn );
log("db: Corrupt filename is %s in collnum %li."
,bf->getFilename()
,(long)m_collnum);
2013-08-03 00:12:24 +04:00
//key_t sk = m_listPtrs[i]->getStartKey();
//key_t ek = m_listPtrs[i]->getEndKey ();
//log("db: "
// "startKey.n1=%lx n0=%llx "
// "endKey.n1=%lx n0=%llx",
// sk.n1,sk.n0,ek.n1,ek.n0);
char *sk = m_listPtrs[i]->getStartKey();
char *ek = m_listPtrs[i]->getEndKey ();
log("db: "
"startKey=%s "
"endKey=%s ",
KEYSTR(sk,m_ks),KEYSTR(ek,m_ks));
}
// . remove the bad eggs from the list
// . TODO: support non-fixed data sizes
//if ( m_listPtrs[i]->getFixedDataSize() >= 0 )
m_listPtrs[i]->removeBadData_r();
//else
//m_listPtrs[i]->reset();
2013-08-03 00:12:24 +04:00
// otherwise we have a patchable error
m_hadCorruption = true;
// don't add a list with errors to cache, please
m_addToCache = false;
}
}
void Msg5::mergeLists_r ( ) {
// . don't do any merge if this is true
// . if our fetch of remote list fails, then we'll be called
// again with this set to false
if ( m_hadCorruption ) return;
// start the timer
//long long startTime = gettimeofdayInMilliseconds();
// . if the key of the last key of the previous list we read from
// is not below startKey, reset the truncation count to avoid errors
// . if caller does the same read over and over again then
// we would do a truncation in error eventually
// . use m_fileStartKey, not just m_startKey, since we may be doing
// a follow-up read
//if ( m_prevKey >= m_fileStartKey ) m_prevCount = 0;
if ( KEYCMP(m_prevKey,m_fileStartKey,m_ks)>=0 ) m_prevCount = 0;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
2014-09-30 07:16:39 +04:00
//RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) {
// log("No collection found."); return; }
2013-08-03 00:12:24 +04:00
/*
if ( m_rdbId == RDB_POSDB ) {
m_list->posdbMerge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
m_prevKey ,
&m_prevCount ,
&m_dupsRemoved ,
base->m_rdb->m_rdbId ,
&m_filtered ,
m_isRealMerge , // do group mask?
m_isRealMerge , // is real merge?
m_niceness );
}
*/
long niceness = m_niceness;
if ( niceness > 0 ) niceness = 2;
if ( m_isRealMerge ) niceness = 1;
// . all lists must be constrained because indexMerge_r does not check
// merged keys to see if they're in [startKey,endKey]
// . RIGHT NOW indexdb is the only one that uses half keys!!
// . indexMerge_r() will return false and set g_errno on error
// . this is messing up linkdb!!
/*
if ( base->useHalfKeys() &&
m_rdbId != RDB_POSDB &&
m_rdbId != RDB2_POSDB2 &&
1 == 3 ) {
// disable for now!
char *xx=NULL;*xx=0;
// always assume to use it
bool useBigRootList = true;
// must include the first file always
if ( m_startFileNum != 0 ) useBigRootList = false;
// must be 12 bytes per key
if ( m_ks != 12 ) useBigRootList = false;
// just indexdb for now, tfndb has special merge rules
if ( m_rdbId != RDB_POSDB && m_rdbId != RDB2_POSDB2 )
useBigRootList = false;
// do not do if merging files on disk
if ( m_isRealMerge ) useBigRootList = false;
// turn off for now
//useBigRootList = false;
// do the merge
m_list->indexMerge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
m_prevKey ,
&m_prevCount ,
//g_indexdb.getTruncationLimit() ,
m_indexdbTruncationLimit ,
&m_dupsRemoved ,
//base->m_rdb == g_tfndb.getRdb() ,
base->m_rdb->m_rdbId ,
&m_filtered ,
m_isRealMerge , // do group mask?
m_isRealMerge , // is real merge?
useBigRootList ,// useBigRootList?
niceness );
#ifdef _TESTNEWALGO_
// for testing useBigRootList
if ( useBigRootList ) {
logf(LOG_DEBUG,"db: TRYING bit root list algo.");
m_list2.indexMerge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
m_prevKey ,
&m_prevCount ,
m_indexdbTruncationLimit ,
&m_dupsRemoved ,
base->m_rdb->m_rdbId ,
&m_filtered ,
m_isRealMerge , //dogrpmask?
m_isRealMerge ,
false );//bigRootList
// sanity check
long size1 = m_list->m_listSize;
long size2 = m_list2.m_listSize;
char *list1 = (char *)m_list->m_list;
char *list2 = (char *)m_list2.m_list;
if ( size1 != size2 ||
memcmp ( list1 , list2 , size1 ) != 0 ) {
log("db: Got bad list.");
m_list->printList();
m_list2.printList();
//char *xx = NULL; *xx = 0;
}
}
#endif
}
*/
//g_conf.m_useThreads = false;
// . old Msg3 notes:
// . otherwise, merge the lists together
// . this may call growList() via RdbList::addRecord/Key() but it
// shouldn't since we called RdbList::prepareForMerge() above
// . we aren't allowed to do allocating in a thread!
// . TODO: only merge the recs not cached, [m_fileStartKey, endKey]
// . merge() might shrink m_endKey in diskList if m_minRecSizes
// contrained us OR it might decrement it by 1 if it's a negative key
// .........................
// . this MUST start at m_list->m_listPtr cuz this may not be the
// 1st time we had to dive in to disk, due to negative rec
// annihilation
// . old finalList.merge_r() Msg5 notes:
// . use startKey of tree
// . NOTE: tree may contains some un-annihilated key pairs because
// one of them was PROBABLY in the dump queue and we decided in
// Rdb::addRecord() NOT to do the annihilation, therefore it's good
// to do the merge to do the annihilation
//else
m_list->merge_r ( m_listPtrs ,
m_numListPtrs ,
m_startKey ,
m_minEndKey ,
m_minRecSizes ,
m_removeNegRecs ,
2014-09-30 07:16:39 +04:00
//getIdFromRdb ( base->m_rdb ) ,
m_rdbId ,
2013-08-03 00:12:24 +04:00
&m_filtered ,
NULL,//m_tfns , // used for titledb
NULL,//&m_tfndbList , // used for titledb
m_isRealMerge ,
niceness );
// maintain this info for truncation purposes
if ( m_list->isLastKeyValid() )
//m_prevKey = m_list->getLastKey();
KEYSET(m_prevKey,m_list->getLastKey(),m_ks);
else {
// . lastKey should be set and valid if list is not empty
// . we need it for de-duping dup tfndb recs that fall on our
// read boundaries
if ( m_list->m_listSize > 0 )
log(LOG_LOGIC,"db: Msg5. Last key invalid.");
m_prevCount = 0;
}
}
// . this returns false if blocked, true otherwise
// . sets g_errno on error
// . only blocks if calls msg0 to patch a corrupted list
// . it will handle calling callback if that happens
// . this is called when all files are done reading in m_msg3
// . sets g_errno on error
// . problem: say maxRecSizes is 1200 (1000 keys)
// . there are 10000 keys in the [startKey,endKey] range
// . we read 1st 1000 recs from the tree and store in m_treeList
// . we read 1st 1000 recs from disk
// . all recs in tree are negative and annihilate the 1000 recs from disk
// . we are left with an empty list
bool Msg5::doneMerging ( ) {
//m_waitingForMerge = false;
2013-08-03 00:12:24 +04:00
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
2013-08-03 00:12:24 +04:00
// . if there was a merge error, bitch about it
// . Thread class should propagate g_errno when it was set in a thread
if ( g_errno ) {
log("net: Had error merging lists from %s: %s.",
base->m_dbname,mstrerror(g_errno));
return true;
}
// . was a list corrupted?
// . if so, we did not even begin the merge yet
// . try to get the list from a remote brother
// . if that fails we have already removed the bad data, so begin
// our first merge
if ( m_hadCorruption ) {
// log it here, cuz logging in thread doesn't work too well
log("net: Encountered a corrupt list in rdb=%s collnum=%li",
base->m_dbname,(long)m_collnum);
2013-08-03 00:12:24 +04:00
// remove error condition, we removed the bad data in thread
m_hadCorruption = false;
// if(g_numCorrupt++ >= g_conf.m_maxCorruptLists &&
// g_conf.m_maxCorruptLists > 0) {
g_numCorrupt++;
if(g_conf.m_maxCorruptLists > 0 &&
(g_numCorrupt % g_conf.m_maxCorruptLists) == 0) {
char msgbuf[1024];
Host *h = g_hostdb.getHost ( 0 );
snprintf(msgbuf, 1024,
"%li corrupt lists. "
"cluster=%s "
"host=%li",
g_numCorrupt,
iptoa(h->m_ip),
g_hostdb.m_hostId);
g_pingServer.sendEmail(NULL, msgbuf);
}
// try to get the list from remote host
if ( ! getRemoteList() ) return false;
// note that
if ( ! g_errno ) {
log("net: got remote list without blocking");
char *xx=NULL;*xx=0;
}
// if it set g_errno, it could not get a remote list
// so try to make due with what we have
if ( g_errno ) {
// log a msg, we actually already removed it in thread
log("net: Removed corrupted data.");
// clear error
g_errno = 0;
// . merge the modified lists again
// . this is not in a thread
// . it should not block
mergeLists_r();
}
}
if ( m_isRealMerge )
log(LOG_DEBUG,"db: merged list is %li bytes long.",
m_list->m_listSize);
// log it
long long now ;
// only time it if we actually did a merge, check m_startTime
if ( m_startTime ) now = gettimeofdayInMilliseconds();
else now = 0;
long long took = now - m_startTime ;
if ( g_conf.m_logTimingNet ) {
if ( took > 5 )
log(LOG_INFO,
"net: Took %llu ms to do merge. %li lists merged "
"into one list of %li bytes.",
took , m_numListPtrs , m_list->getListSize() );
//log("Msg5:: of that %llu ms was in checkList_r()s",
// m_checkTime );
}
// . add the stat
// . use turquoise for time to merge the disk lists
// . we should use another color rather than turquoise
// . these clog up the graph, so only log if took more than 1 ms
// . only time it if we actually did a merge, check m_startTime
if ( took > 1 && m_startTime )
g_stats.addStat_r ( m_minRecSizes ,
m_startTime ,
now ,
//"rdb_list_merge",
0x0000ffff );
// . scan merged list for problems
// . this caught an incorrectly set m_list->m_lastKey before
#ifdef _SANITYCHECK_
m_list->checkList_r ( false , true , m_rdbId );
#endif
// . all done if we did not merge the lists
// . posdb doesn't need that so much for calling intersect9_r()
if ( ! m_mergeLists )
return true;
// . TODO: call freeList() on each m_list[i] here rather than destructr
// . free all lists we used
// . some of these may be from Msg3, some from cache, some from tree
for ( long i = 0 ; i < m_numListPtrs ; i++ ) {
m_listPtrs[i]->freeList();
m_listPtrs[i] = NULL;
}
// and the tree list
m_treeList.freeList();
// . update our m_newMinRecSizes
// . NOTE: this now ignores the negative records in the tree
long long newListSize = m_list->getListSize();
// scale proportionally based on how many got removed during the merge
long long percent = 100LL;
long long net = newListSize - m_oldListSize;
// add 5% for inconsistencies
if ( net > 0 ) percent =(((long long)m_newMinRecSizes*100LL)/net)+5LL;
else percent = 200;
if ( percent <= 0 ) percent = 1;
// set old list size in case we get called again
m_oldListSize = newListSize;
2014-05-22 21:36:44 +04:00
//long delta = m_minRecSizes - (long)newListSize;
2013-08-03 00:12:24 +04:00
// how many recs do we have left to read?
m_newMinRecSizes = m_minRecSizes - (long)newListSize;
// return now if we met our minRecSizes quota
if ( m_newMinRecSizes <= 0 ) return true;
2014-05-22 21:36:44 +04:00
// if we gained something this round then try to read the remainder
//if ( net > 0 ) m_newMinRecSizes = delta;
2013-08-03 00:12:24 +04:00
// otherwise, scale proportionately
long nn = ((long long)m_newMinRecSizes * percent ) / 100LL;
if ( percent > 100 ) {
if ( nn > m_newMinRecSizes ) m_newMinRecSizes = nn;
else m_newMinRecSizes = 0x7fffffff;
}
else m_newMinRecSizes = nn;
2014-01-16 07:08:47 +04:00
// . for every round we get call increase by 10 percent
// . try to fix all those negative recs in the rebalance re-run
2014-02-08 10:08:04 +04:00
m_newMinRecSizes *= (long)(1.0 + (m_round * .10));
2014-01-16 07:08:47 +04:00
// wrap around?
if ( m_newMinRecSizes < 0 || m_newMinRecSizes > 1000000000 )
m_newMinRecSizes = 1000000000;
2013-08-03 00:12:24 +04:00
QUICKPOLL(m_niceness);
// . don't exceed original min rec sizes by 5 i guess
// . watch out for wrap
//long max = 5 * m_minRecSizes ;
//if ( max < m_minRecSizes ) max = 0x7fffffff;
//if ( m_newMinRecSizes > max && max > m_minRecSizes )
// m_newMinRecSizes = max;
2013-08-03 00:12:24 +04:00
// keep this above a certain point because if we didn't make it now
// we got negative records messing with us
if ( m_rdbId != RDB_DOLEDB &&
m_newMinRecSizes < 128000 ) m_newMinRecSizes = 128000;
// . update startKey in case we need to read more
// . we'll need to read more if endKey < m_endKey && m_newMinRecSizes
// is positive
// . we read more from files AND from tree
//m_fileStartKey = m_list->getEndKey() ;
//m_fileStartKey += (unsigned long)1;
KEYSET(m_fileStartKey,m_list->getEndKey(),m_ks);
KEYADD(m_fileStartKey,1,m_ks);
return true;
}
void gotRemoteListWrapper( void *state );//, RdbList *list ) ;
long g_isDumpingRdbFromMain = 0;
// . if we discover one of the lists we read from a file is corrupt we go here
// . uses Msg5 to try to get list remotely
bool Msg5::getRemoteList ( ) {
// skip this part if doing a cmd line 'gb dump p main 0 -1 1' cmd or
// similar to dump out a local rdb.
if ( g_isDumpingRdbFromMain ) {
g_errno = 1;
return true;
}
// . this returns false if blocked, true otherwise
// . this sets g_errno on error
// . get list from ALL files, not just m_startFileNum/m_numFiles
// since our files may not be the same
// . if doRemotely parm is not supplied replying hostId is unspecified
// get our twin host, or a redundant host in our group
//Host *group = g_hostdb.getGroup ( g_hostdb.m_groupId );
Host *group = g_hostdb.getMyShard();
long n = g_hostdb.getNumHostsPerShard();
2013-08-03 00:12:24 +04:00
// . if we only have 1 host per group, data is unpatchable
// . we should not have been called if this is the case!!
if ( n == 1 ) {
g_errno = EBADENGINEER;
//log("Msg5::gotRemoteList: no twins. data unpatchable.");
return true;
}
// tfndb is not shareable, since it has tfns
if ( m_rdbId == RDB_TFNDB ) {
g_errno = EBADENGINEER;
log("net: Cannot patch tfndb data from twin because it is "
"not interchangable. Tfndb must be regenerated.");
return true;
}
if ( m_rdbId == RDB_STATSDB ) {
g_errno = EBADENGINEER;
log("net: Cannot patch statsdb data from twin because it is "
"not interchangable.");
return true;
}
// tell them about
log("net: Getting remote list from twin instead.");
// make a new Msg0 for getting remote list
try { m_msg0 = new ( Msg0 ); }
// g_errno should be set if this is NULL
catch ( ... ) {
g_errno = ENOMEM;
log("net: Could not allocate memory to get from twin.");
return true;
}
mnew ( m_msg0 , sizeof(Msg0) , "Msg5" );
// select our twin
long i;
for ( i = 0 ; i < n ; i++ )
if ( group[i].m_hostId != g_hostdb.m_hostId ) break;
Host *h = &group[i];
// get our groupnum. the column #
long forceParitySplit = h->m_shardNum;//group;
2013-08-03 00:12:24 +04:00
// translate base to an id, for the sake of m_msg0
//char rdbId = getIdFromRdb ( base->m_rdb );
// . this returns false if blocked, true otherwise
// . this sets g_errno on error
// . get list from ALL files, not just m_startFileNum/m_numFiles
// since our files may not be the same
// . if doRemotely parm is not supplied replying hostId is unspecified
// . make minRecSizes as big as possible because it gets from ALL
// files and from tree!
// . just make it 256k for now lest, msg0 bitch about it being too big
// if rdbId == RDB_INDEXDB passed the truncation limit
// . wait forever for this host to reply... well, at least a day that
// way if he's dead we'll wait for him to come back up to save our
// data
if ( ! m_msg0->getList ( h->m_hostId ,
h->m_ip ,
h->m_port ,
0 , // max cached age
false , // add to cache?
m_rdbId , // rdbId
m_collnum ,
2013-08-03 00:12:24 +04:00
m_list ,
m_startKey ,
m_endKey ,
m_minRecSizes , // was 256k minRecSizes
this ,
gotRemoteListWrapper ,
m_niceness ,
false , // do error correction?
true , // include tree?
true , // do merge? (obsolete)
-1 , // first hostid
0 , // startFileNum
-1 , // numFiles (-1=all)
60*60*24 , // timeout in secs
-1 , // syncPoint
-1 , // preferLocalReads
NULL , // msg5
NULL , // msg5b
m_isRealMerge , // merging files?
//#ifdef SPLIT_INDEXDB
m_allowPageCache , // allow page cache?
false , // force local Indexdb
false , // doIndexdbSplit
// "forceParitySplit" is a group #
// (the groupId is a mask)
forceParitySplit ))
//#else
// m_allowPageCache ))// allow page cache?
//#endif
return false;
// this is strange
log("msg5: call to msg0 did not block");
// . if we did not block then call this directly
// . return false if it blocks
return gotRemoteList ( ) ;
}
void gotRemoteListWrapper( void *state ) { // , RdbList *list ) {
Msg5 *THIS = (Msg5 *)state;
// return if this blocks
if ( ! THIS->gotRemoteList() ) return;
// sanity check
if ( THIS->m_calledCallback ) { char *xx=NULL;*xx=0; }
// we are no longer waiting for the list
THIS->m_waitingForList = false;
2013-08-03 00:12:24 +04:00
// set it now
THIS->m_calledCallback = 4;
// if it doesn't block call the callback, g_errno may be set
THIS->m_callback ( THIS->m_state , THIS->m_list , THIS );
}
// returns false if it blocks
bool Msg5::gotRemoteList ( ) {
// free the Msg0
mdelete ( m_msg0 , sizeof(Msg0) , "Msg5" );
delete ( m_msg0 );
// return true now if everything ok
if ( ! g_errno ) {
// . i modified checkList to set m_lastKey if it is not set
// . we need it for the big merge for getting next key in
// RdbDump.cpp
// . if it too is invalid, we are fucked
if ( ! m_list->checkList_r ( false , false ) ) {
log("net: Received bad list from twin.");
g_errno = ECORRUPTDATA;
goto badList;
}
// . success messages
// . logging the key ranges gives us an idea of how long
// it will take to patch the bad data
//key_t sk = m_list->getStartKey();
//key_t ek = m_list->getEndKey ();
//log("net: Received good list from twin. Requested %li bytes "
// "and got %li. "
// "startKey.n1=%lx n0=%llx "
// "endKey.n1=%lx n0=%llx",
// m_minRecSizes , m_list->getListSize() ,
// sk.n1,sk.n0,ek.n1,ek.n0);
char *sk = m_list->getStartKey();
char *ek = m_list->getEndKey ();
log("net: Received good list from twin. Requested %li bytes "
"and got %li. "
"startKey=%s endKey=%s",
m_minRecSizes , m_list->getListSize() ,
KEYSTR(sk,m_ks),KEYSTR(ek,m_ks));
// . HACK: fix it so end key is right
// . TODO: fix this in Msg0::gotReply()
// . if it is empty, then there must be nothing else left
// since the endKey was maxed in call to Msg0::getList()
QUICKPOLL(m_niceness);
if ( ! m_list->isEmpty() )
m_list->setEndKey ( m_list->getLastKey() );
//key_t k ;
//k = m_list->getStartKey();
char *k = m_list->getStartKey();
log(LOG_DEBUG,
//"net: Received list skey.n1=%08lx skey.n0=%016llx." ,
// k.n1 , k.n0 );
"net: Received list skey=%s." ,
KEYSTR(k,m_ks) );
k = m_list->getEndKey();
log(LOG_DEBUG,
//"net: Received list ekey.n1=%08lx ekey.n0=%016llx." ,
// k.n1 , k.n0 );
"net: Received list ekey=%s",
KEYSTR(k,m_ks) );
if ( ! m_list->isEmpty() ) {
k = m_list->getLastKey();
//log(LOG_DEBUG,"net: Received list Lkey.n1=%08lx "
// "Lkey.n0=%016llx" , k.n1 , k.n0 );
log(LOG_DEBUG,"net: Received list Lkey=%s",
KEYSTR(k,m_ks) );
}
//log("Msg5::gotRemoteList: received list is good.");
return true;
}
badList:
// it points to a corrupted list from twin, so reset
m_list->reset();
// because we passed m_list to Msg0, it called m_list->reset()
// which set our m_mergeMinListSize to -1, so we have to call
// the prepareForMerge() again
if ( ! m_list->prepareForMerge ( m_listPtrs ,
m_numListPtrs ,
m_minRecSizes ) ) {
log("net: Had error preparing for merge: %s.",
mstrerror(g_errno));
return true;
}
// . if g_errno is timed out we couldn't get a patch list
// . turn off error correction and try again
log("net: Had error getting remote list: %s.", mstrerror(g_errno) );
log("net: Merging repaired lists.");
// clear g_errno so RdbMerge doesn't freak out
g_errno = 0;
// . we have the lists ready to merge
// . hadCorruption should be false at this point
mergeLists_r();
// process the result
return doneMerging();
}