rebalancer is running now

This commit is contained in:
Matt Wells 2014-01-15 16:09:38 -08:00
parent f8c2329bd2
commit 4a04542829
2 changed files with 38 additions and 16 deletions

View File

@ -240,8 +240,9 @@ bool Rebalance::saveRebalanceFile ( ) {
}
static void gotListWrapper ( void *state , RdbList *list, Msg5 *msg5 ) {
// this never blocks
g_rebalance.gotList();
// . this can block if a msg4 blocks, in which case it returns false
// . when its msg4 callback is called it calls scanLoop() from there
if ( ! g_rebalance.gotList() ) return;
// init another rdb scan pass
g_rebalance.scanLoop();
}
@ -290,8 +291,8 @@ bool Rebalance::scanRdb ( ) {
// all done if list empty
if ( m_list.isEmpty() ) return true;
// process that list
gotList();
// process that list. return false if blocked.
if ( ! gotList() ) return false;
// get another list
goto readAnother;
@ -307,23 +308,32 @@ static void doneAddingMetaWrapper ( void *state ) {
}
// scan that list
void Rebalance::gotList ( ) {
bool Rebalance::gotList ( ) {
if ( m_blocked ) { char *xx=NULL;*xx=0; }
Rdb *rdb = g_process.m_rdbs[m_rdbNum];
char rdbId = rdb->m_rdbId;
long keySize = rdb->m_ks;//getKeySize();
long ks = rdb->m_ks;//getKeySize();
long myShard = g_hostdb.m_myHost->m_shardNum;
m_list.resetListPtr();
m_posMetaList.reset();
m_negMetaList.reset();
char *last = NULL;
for ( ; ! m_list.isExhausted() ; m_list.skipCurrentRec() ) {
// get tht rec
char *rec = m_list.getCurrentRec();
// get shard
long shard = getShardNum ( rdbId , rec );
// save last ptr
last = rec;
// skip it if it belongs with us
if ( shard == myShard ) continue;
// otherwise, it does not!
@ -331,10 +341,10 @@ void Rebalance::gotList ( ) {
// copy the full key into "key" buf because might be compressed
char key[MAX_KEY_BYTES];
m_list.getCurrentKey ( key );
// store rdbid
m_posMetaList.pushChar ( rdbId );
// store rdbid, no! we supply rdbid below to msg4
//m_posMetaList.pushChar ( rdbId );
// first key
m_posMetaList.safeMemcpy ( key , keySize );
m_posMetaList.safeMemcpy ( key , ks );
// then record
long dataSize = rdb->m_fixedDataSize;
if ( rdb->m_fixedDataSize == -1 ) {
@ -349,15 +359,27 @@ void Rebalance::gotList ( ) {
//
// NOW DELETE FROM OUR SHARD!
//
// store rdbid
m_negMetaList.pushChar ( rdbId );
// store rdbid, no! we supply rdbid below to msg4
//m_negMetaList.pushChar ( rdbId );
// make key a delete
key[0] &= 0xfe;
// and store that negative key
m_posMetaList.safeMemcpy ( key , keySize );
m_posMetaList.safeMemcpy ( key , ks );
}
// update nextkey
if ( last ) {
// get the last key we scanned, all "ks" bytes of it.
// because some keys are compressed and we take the
// more significant compressed out bytes from m_list.m_*
// member vars
m_list.getKey ( last , m_nextKey );
// if it is not maxxed out, then incremenet it for the
// next scan round
if ( KEYCMP ( m_nextKey , KEYMAX() , ks ) != 0 )
KEYADD ( m_nextKey , 1 , ks );
}
if ( m_blocked ) { char *xx=NULL;*xx=0; }
if ( ! m_msg4a.addMetaList ( &m_posMetaList ,
m_collnum ,
@ -378,7 +400,7 @@ void Rebalance::gotList ( ) {
myShard ) ) // shard override, not!
m_blocked++;
if ( m_blocked ) return;
if ( m_blocked ) return false;
scanLoop();
return true;
}

View File

@ -20,7 +20,7 @@ class Rebalance {
void rebalanceLoop ( ) ;
void scanLoop ( ) ;
bool scanRdb ( ) ;
void gotList ( ) ;
bool gotList ( ) ;
bool saveRebalanceFile ( ) ;
bool m_inRebalanceLoop;