if coll is deleted or reset in a middle of a dump

or merge then stop the dump/merge with ENOCOLLREC
error. avoid calling "base->" functions since it
could be NULL if deleted.
This commit is contained in:
Matt Wells 2013-12-25 17:12:09 -08:00
parent f9d7b9dbc7
commit 048b715962
7 changed files with 101 additions and 40 deletions

14
Rdb.cpp
View File

@ -1466,7 +1466,7 @@ bool Rdb::dumpCollLoop ( ) {
// . RdbMap should dump itself out CLOSE!
// . it returns false if blocked, true otherwise & sets g_errno on err
// . but we only return false on error here
if ( ! m_dump.set ( base->m_coll ,
if ( ! m_dump.set ( base->m_collnum ,
base->m_files[m_fn] ,
id2 , // to set tfndb recs for titledb
m_isTitledb,// tdb2? this == g_titledb.getRdb() ,
@ -2837,6 +2837,18 @@ RdbBase *getRdbBase ( uint8_t rdbId , char *coll ) {
return rdb->getBase(collnum);
}
// get the RdbBase class for an rdbId and collection name
RdbBase *getRdbBase ( uint8_t rdbId , collnum_t collnum ) {
Rdb *rdb = getRdbFromId ( rdbId );
if ( ! rdb ) {
log("db: Collection #%li does not exist.",(long)collnum);
return NULL;
}
if ( rdb->m_isCollectionLess ) collnum = (collnum_t) 0;
return rdb->getBase(collnum);
}
// get group responsible for holding record with this key
/*
RdbCache *getCache ( uint8_t rdbId ) {

1
Rdb.h
View File

@ -62,6 +62,7 @@ extern long g_numUrgentMerges;
// get the RdbBase class for an rdbId and collection name
class RdbBase *getRdbBase ( uint8_t rdbId , char *coll );
class RdbBase *getRdbBase ( uint8_t rdbId , collnum_t collnum );
// maps an rdbId to an Rdb
class Rdb *getRdbFromId ( uint8_t rdbId ) ;
// the reverse of the above

View File

@ -1929,7 +1929,7 @@ void RdbBase::gotTokenForMerge ( ) {
// . start the merge
// . returns false if blocked, true otherwise & sets g_errno
if ( ! m->merge ( rdbId ,
m_coll ,
m_collnum ,
m_files[mergeFileNum] ,
m_maps [mergeFileNum] ,
id2 ,

View File

@ -21,7 +21,8 @@ void doneReadingForVerifyWrapper ( void *state ) ;
// . return false if blocked, true otherwise
// . sets g_errno on error
bool RdbDump::set ( char *coll ,
bool RdbDump::set ( //char *coll ,
collnum_t collnum ,
BigFile *file ,
long id2 , // in Rdb::m_files[] array
bool isTitledb ,
@ -49,8 +50,14 @@ bool RdbDump::set ( char *coll ,
char *xx = NULL; *xx = 0;
}
//if ( ! coll &&
if ( ! coll && rdb->m_isCollectionLess )
strcpy(m_coll,rdb->m_dbname);
//if ( ! coll && rdb->m_isCollectionLess )
// strcpy(m_coll,rdb->m_dbname);
m_collnum = collnum;
// use 0 for collectionless
if ( rdb && rdb->m_isCollectionLess ) m_collnum = 0;
/*
if ( ! coll && g_catdb.getRdb() == rdb )
strcpy(m_coll, "catdb");
@ -59,8 +66,8 @@ bool RdbDump::set ( char *coll ,
else if ( ! coll && g_accessdb.getRdb() == rdb )
strcpy(m_coll, "accessdb");
*/
else
strcpy ( m_coll , coll );
//else
// strcpy ( m_coll , coll );
m_file = file;
m_id2 = id2;
m_isTitledb = isTitledb;
@ -187,6 +194,9 @@ void RdbDump::reset ( ) {
}
void RdbDump::doneDumping ( ) {
long saved = g_errno;
m_isDumping = false;
// print stats
log(LOG_INFO,
@ -204,6 +214,10 @@ void RdbDump::doneDumping ( ) {
if ( m_list ) m_list->freeList();
// reset verify buffer
reset();
// did collection get deleted/reset from under us?
if ( saved == ENOCOLLREC ) return;
// save the map to disk
m_map->writeMap();
#ifdef _SANITYCHECK_
@ -280,13 +294,16 @@ bool RdbDump::dumpTree ( bool recall ) {
// this list will hold the list of nodes/recs from m_tree
m_list = &m_ourList;
// convert coll to collnum
collnum_t collnum = g_collectiondb.getCollnum ( m_coll );
if ( collnum < 0 ) {
//if ( g_catdb->getRdb() == m_rdb )
if ( ! m_rdb->m_isCollectionLess ) return true;
g_errno = 0;
collnum = 0;
}
//collnum_t collnum = g_collectiondb.getCollnum ( m_coll );
// a collnum of -1 is for collectionless rdbs
//if ( collnum < 0 ) {
// //if ( g_catdb->getRdb() == m_rdb )
// if ( ! m_rdb->m_isCollectionLess ) {
// char *xx=NULL;*xx=0; //return true;
// }
// g_errno = 0;
// collnum = 0;
//}
// getMemOccupiedForList2() can take some time, so breathe
long niceness = 1;
loop:
@ -313,7 +330,7 @@ bool RdbDump::dumpTree ( bool recall ) {
//log("RdbDump:: getting list");
m_t1 = gettimeofdayInMilliseconds();
if(m_tree)
status = m_tree->getList ( collnum ,
status = m_tree->getList ( m_collnum ,
m_nextKey ,
maxEndKey ,
m_maxBufSize , // max recSizes
@ -323,7 +340,7 @@ bool RdbDump::dumpTree ( bool recall ) {
m_useHalfKeys ,
niceness );
else if(m_buckets)
status = m_buckets->getList ( collnum,
status = m_buckets->getList ( m_collnum,
m_nextKey ,
maxEndKey ,
m_maxBufSize , // max recSizes
@ -833,19 +850,19 @@ bool RdbDump::doneReadingForVerify ( ) {
//log("RdbDump:: deleting list");
long long t1 = gettimeofdayInMilliseconds();
// convert to number, this is -1 if no longer exists
collnum_t collnum = g_collectiondb.getCollnum ( m_coll );
if ( collnum < 0 && m_rdb->m_isCollectionLess ) {
collnum = 0;
g_errno = 0;
}
//collnum_t collnum = g_collectiondb.getCollnum ( m_coll );
//if ( collnum < 0 && m_rdb->m_isCollectionLess ) {
// collnum = 0;
// g_errno = 0;
//}
//m_tree->deleteOrderedList ( m_list , false /*do balancing?*/ );
// tree delete is slow due to checking for leaks, not balancing
bool s;
if(m_tree) {
s = m_tree->deleteList(collnum,m_list,true /*do balancing?*/);
s = m_tree->deleteList(m_collnum,m_list,true/*do balancing?*/);
}
else if(m_buckets) {
s = m_buckets->deleteList(collnum, m_list);
s = m_buckets->deleteList(m_collnum, m_list);
}
// problem?
if ( ! s && ! m_tried ) {
@ -995,6 +1012,11 @@ void doneWritingWrapper ( void *state ) {
}
void RdbDump::continueDumping() {
// if someone reset/deleted the collection we were dumping...
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
if ( ! cr ) g_errno = ENOCOLLREC;
// bitch about errors
if (g_errno)log("db: Dump to %s had error writing: %s.",
m_file->getFilename(),mstrerror(g_errno));
@ -1006,7 +1028,10 @@ void RdbDump::continueDumping() {
}
// . continue dumping the tree
// . return if this blocks
if ( ! dumpTree ( false ) ) return;
// . if the collrec was deleted or reset then g_errno will be
// ENOCOLLREC and we want to skip call to dumpTree(
if ( g_errno != ENOCOLLREC && ! dumpTree ( false ) )
return;
// close it up
doneDumping ( );
// call the callback

View File

@ -30,7 +30,8 @@ class RdbDump {
// . set up for a dump of rdb records to a file
// . returns false and sets errno on error
bool set ( char *coll ,
bool set ( //char *coll ,
collnum_t collnum ,
BigFile *file ,
long id2 , // in Rdb::m_files[] array
bool isTitledb , // are we dumping TitleRecs?
@ -179,7 +180,8 @@ class RdbDump {
// for setting m_rdb->m_needsSave after deleting list from tree
class Rdb *m_rdb;
char m_coll [ MAX_COLL_LEN + 1 ];
//char m_coll [ MAX_COLL_LEN + 1 ];
collnum_t m_collnum;
bool m_tried;

View File

@ -29,7 +29,8 @@ void RdbMerge::reset () { m_isMerging = false; m_isSuspended = false; }
// there's too much contention from spider lookups on disk for the merge
// to finish in a decent amount of time and we end up getting too many files!
bool RdbMerge::merge ( char rdbId ,
char *coll , //RdbBase *base ,
//char *coll , //RdbBase *base ,
collnum_t collnum,
BigFile *target ,
RdbMap *targetMap ,
long id2 , // target's secondary id
@ -45,16 +46,20 @@ bool RdbMerge::merge ( char rdbId ,
m_rdbId = rdbId;
Rdb *rdb = getRdbFromId ( rdbId );
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,coll))) return true;
RdbBase *base; if (!(base=getRdbBase(m_rdbId,collnum))) return true;
// don't breech the max
//if ( numFiles > m_maxFilesToMerge ) numFiles = m_maxFilesToMerge;
// reset this map! it's m_crcs needs to be reset
//targetMap->reset();
// remember some parms
if ( ! coll && rdb->m_isCollectionLess )
strcpy ( m_coll , rdb->m_dbname );
else
strcpy ( m_coll , coll );
//if ( ! coll && rdb->m_isCollectionLess )
// strcpy ( m_coll , rdb->m_dbname );
//else
// strcpy ( m_coll , coll );
m_collnum = collnum;
if ( rdb->m_isCollectionLess ) m_collnum = 0;
m_target = target;
m_targetMap = targetMap;
m_id2 = id2;
@ -179,12 +184,12 @@ bool RdbMerge::gotLock ( ) {
else KEYMIN(prevLastKey,m_ks);
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true;
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
// . set up a a file to dump the records into
// . returns false and sets g_errno on error
// . this will open m_target as O_RDWR | O_NONBLOCK | O_ASYNC ...
m_dump.set ( m_coll ,
m_dump.set ( m_collnum ,
m_target ,
m_id2 ,
//m_startFileNum - 1 , // merge fileNum in Rdb::m_files[]
@ -297,7 +302,7 @@ bool RdbMerge::getNextList ( ) {
// no chop threads
m_numThreads = 0;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true;
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
// . if a contributor has just surpassed a "part" in his BigFile
// then we can delete that part from the BigFile and the map
for ( long i = m_startFileNum ; i < m_startFileNum + m_numFiles; i++ ){
@ -358,12 +363,16 @@ bool RdbMerge::getAnotherList ( ) {
// clear it up in case it was already set
g_errno = 0;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return true;
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return true;
// if merging titledb files, we must adjust m_endKey so we do
// not have to read a huge 200MB+ tfndb list
//key_t newEndKey = m_endKey;
char newEndKey[MAX_KEY_BYTES];
KEYSET(newEndKey,m_endKey,m_ks);
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
char *coll = cr->m_coll;
/*
if ( m_rdbId == RDB_TITLEDB ) { // && m_rdbId == RDB_TFNDB ) {
//long long docId1 = g_titledb.getDocIdFromKey ( m_startKey );
@ -430,7 +439,7 @@ bool RdbMerge::getAnotherList ( ) {
long bufSize = 100000; // g_conf.m_mergeBufSize , // minRecSizes
// get it
return m_msg5.getList ( m_rdbId ,
m_coll ,
coll ,
&m_list ,
m_startKey ,
newEndKey , // usually is maxed!
@ -494,7 +503,10 @@ void dumpListWrapper ( void *state ) {
log(LOG_DEBUG,"db: Dump of list completed: %s.",mstrerror(g_errno));
// get a ptr to ourselves
RdbMerge *THIS = (RdbMerge *)state;
loop:
// collection reset or deleted while RdbDump.cpp was writing out?
if ( g_errno == ENOCOLLREC ) { THIS->doneMerging(); return; }
// return if this blocked
if ( ! THIS->getNextList() ) return;
// if g_errno is out of memory then msg3 wasn't able to get the lists
@ -576,6 +588,8 @@ bool RdbMerge::dumpList ( ) {
}
void RdbMerge::doneMerging ( ) {
// save this
long saved = g_errno;
// let RdbDump free its m_verifyBuf buffer if it existed
m_dump.reset();
// debug msg
@ -599,8 +613,13 @@ void RdbMerge::doneMerging ( ) {
// will call attemptMerge() on all the other dbs
m_isMerging = false;
m_isSuspended = false;
// if collection rec was deleted while merging files for it
// then the rdbbase should be NULL i guess.
if ( saved == ENOCOLLREC ) return;
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_coll))) return;
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return;
// pass g_errno on to incorporate merge so merged file can be unlinked
base->incorporateMerge ( );
// nuke the lock so others can merge

View File

@ -58,7 +58,8 @@ class RdbMerge {
// . calls rdb->incorporateMerge() when done with merge or had error
// . "maxBufSize" is size of list to get then write (read/write buf)
bool merge ( char rdbId ,
char *coll ,
//char *coll ,
collnum_t collnum ,
BigFile *target ,
RdbMap *targetMap ,
long id2 ,
@ -156,7 +157,8 @@ class RdbMerge {
// for getting the RdbBase class doing the merge
uint8_t m_rdbId;
char m_coll [ MAX_COLL_LEN + 1 ];
//char m_coll [ MAX_COLL_LEN + 1 ];
collnum_t m_collnum;
char m_ks;
};