open-source-search-engine/RdbBuckets.cpp
Matt Wells 9642947136 fix so host #0 will delete then re-add collections
that use the same collnum but have a different name.
fixed some unlabelled safebufs.
fix core when deleting collnum from tree/buckets that
is higher than Collectiondb.m_numRecs.
fix File::m_filename safebufs that were not freed on exit.
2015-08-18 14:09:16 -07:00

2347 lines
59 KiB
C++

#undef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500
#include "RdbBuckets.h"
#include "gb-include.h"
#include "sort.h"
#include "SafeBuf.h"
#include "Threads.h"
#include <unistd.h>
#include <sys/stat.h>
#include "Loop.h"
#include "Rdb.h"
//#define BUCKET_SIZE 64
//#define BUCKET_SIZE 1024
//#define BUCKET_SIZE 2048
//#define BUCKET_SIZE 4096
#define BUCKET_SIZE 8192
//#define BUCKET_SIZE 16384
//#define BUCKET_SIZE 1638400
#define INIT_SIZE 4096
#define SAVE_VERSION 0
inline int KEYCMP12 ( const void *a, const void *b ) {
char* k1 = (char*)a;
char* k2 = (char*)b;
if ( (*(uint64_t *)(k1+4)) <
(*(uint64_t *)(k2+4)) ) return -1;
if ( (*(uint64_t *)(k1+4)) >
(*(uint64_t *)(k2+4)) ) return 1;
uint32_t k1n0 = ((*(uint32_t*)(k1)) & ~0x01UL);
uint32_t k2n0 = ((*(uint32_t*)(k2)) & ~0x01UL);
if ( k1n0 < k2n0 ) return -1;
if ( k1n0 > k2n0 ) return 1;
return 0;
}
inline int KEYCMP16 ( const void *a, const void *b ) {
char* k1 = (char*)a;
char* k2 = (char*)b;
if ( (*(uint64_t *)(k1+8)) <
(*(uint64_t *)(k2+8)) ) return -1;
if ( (*(uint64_t *)(k1+8)) >
(*(uint64_t *)(k2+8)) ) return 1;
uint64_t k1n0 = ((*(uint64_t *)(k1)) & ~0x01ULL);
uint64_t k2n0 = ((*(uint64_t *)(k2)) & ~0x01ULL);
if ( k1n0 < k2n0 ) return -1;
if ( k1n0 > k2n0 ) return 1;
return 0;
}
inline int KEYCMP18 ( const void *a, const void *b ) {
char* k1 = (char*)a;
char* k2 = (char*)b;
if ( (*(uint64_t *)(k1+10)) <
(*(uint64_t *)(k2+10)) ) return -1;
if ( (*(uint64_t *)(k1+10)) >
(*(uint64_t *)(k2+10)) ) return 1;
if ( (*(uint64_t *)(k1+2)) <
(*(uint64_t *)(k2+2)) ) return -1;
if ( (*(uint64_t *)(k1+2)) >
(*(uint64_t *)(k2+2)) ) return 1;
uint16_t k1n0 = ((*(uint16_t *)(k1)) & 0xfffe);
uint16_t k2n0 = ((*(uint16_t *)(k2)) & 0xfffe);
if ( k1n0 < k2n0 ) return -1;
if ( k1n0 > k2n0 ) return 1;
return 0;
}
inline int KEYCMP24 ( const void *a, const void *b ) {
char* k1 = (char*)a;
char* k2 = (char*)b;
if ( (*(uint64_t *)(k1+16)) <
(*(uint64_t *)(k2+16)) ) return -1;
if ( (*(uint64_t *)(k1+16)) >
(*(uint64_t *)(k2+16)) ) return 1;
if ( (*(uint64_t *)(k1+8)) <
(*(uint64_t *)(k2+8)) ) return -1;
if ( (*(uint64_t *)(k1+8)) >
(*(uint64_t *)(k2+8)) ) return 1;
uint64_t k1n0 = ((*(uint64_t *)(k1)) & ~0x01ULL);
uint64_t k2n0 = ((*(uint64_t *)(k2)) & ~0x01ULL);
if ( k1n0 < k2n0 ) return -1;
if ( k1n0 > k2n0 ) return 1;
return 0;
}
inline int KEYCMP6 ( const void *a, const void *b ) {
char* k1 = (char*)a;
char* k2 = (char*)b;
if ( (*(uint32_t *)(k1+2)) <
(*(uint32_t *)(k2+2)) ) return -1;
if ( (*(uint32_t *)(k1+2)) >
(*(uint32_t *)(k2+2)) ) return 1;
if ( (*(uint16_t *)(k1+0)) <
(*(uint16_t *)(k2+0)) ) return -1;
if ( (*(uint16_t *)(k1+0)) >
(*(uint16_t *)(k2+0)) ) return 1;
return 0;
}
bool RdbBucket::set(RdbBuckets* parent, char* newbuf) {
m_endKey = NULL;
m_parent = parent;
m_lastSorted = 0;
m_numKeys = 0;
m_keys = newbuf;
return true;
}
void RdbBucket::reBuf(char* newbuf) {
if(!m_keys) {
m_keys = newbuf;
return;
}
gbmemcpy(newbuf, m_keys, m_numKeys * m_parent->getRecSize());
if(m_endKey) m_endKey = newbuf + (m_endKey - m_keys);
m_keys = newbuf;
}
RdbBucket::~RdbBucket() {
reset();
}
void RdbBucket::reset() {
//m_keys = NULL;
m_numKeys = 0;
m_lastSorted = 0;
m_endKey = NULL;
}
int32_t RdbBuckets::getMemAlloced () {
int32_t alloced = sizeof(RdbBuckets) + m_masterSize + m_dataMemOccupied;
return alloced;
}
//includes data in the data ptrs
int32_t RdbBuckets::getMemOccupied() {
return (m_numKeysApprox * m_recSize) + m_dataMemOccupied +
sizeof(RdbBuckets) +
m_sortBufSize +
BUCKET_SIZE * m_recSize; //swapbuf
}
int32_t RdbBuckets::getMemAvailable() {
return m_maxMem - getMemOccupied();
}
bool RdbBuckets::is90PercentFull() {
return getMemOccupied () > m_maxMem * .9;
}
bool RdbBuckets::needsDump() {
if(m_numBuckets + 1 < m_maxBuckets) return false;
if(m_maxBuckets == m_maxBucketsCapacity) return true;
return false;
}
//be very conservative with this because if we say we can fit it
//and we can't then we'll get a partial list added and we will
//add the whole list again.
bool RdbBuckets::hasRoom ( int32_t numRecs ) {
int32_t numBucketsRequired = (((numRecs / BUCKET_SIZE)+1) * 2);
if(m_maxBucketsCapacity - m_numBuckets < numBucketsRequired)
return false;
return true;
}
bool RdbBucket::sort() {
//m_lastSorted = 0;//for debugging
if(m_lastSorted == m_numKeys) return true;
if(m_numKeys < 2) {
m_lastSorted = m_numKeys;
return true;
}
uint8_t ks = m_parent->getKeySize();
int32_t recSize = m_parent->getRecSize();
int32_t fixedDataSize = m_parent->getFixedDataSize();
int (*cmpfn) (const void*, const void *) = NULL;
if ( ks == 18 ) cmpfn = KEYCMP18;
else if ( ks == 24 ) cmpfn = KEYCMP24;
else if ( ks == 12 ) cmpfn = KEYCMP12;
else if ( ks == 16 ) cmpfn = KEYCMP16;
else if ( ks == 6 ) cmpfn = KEYCMP6;
else { char *xx=NULL;*xx=0; }
char* mergeBuf = m_parent->getSwapBuf();
if(!mergeBuf) { char* xx = NULL; *xx = 0; }
int32_t numUnsorted = m_numKeys - m_lastSorted;
char *list1 = m_keys;
char *list2 = m_keys + (recSize*m_lastSorted);
char *list1end = list2;
char *list2end = list2 + (recSize * numUnsorted);
//turn quickpoll off while we sort,
//because we do not know what sort of indeterminate state
//we will be in while sorting
// MDW: this no longer disables it since it is based on g_niceness
// now, but what is the point, does it use static vars or what?
//bool canQuickpoll = g_loop.m_canQuickPoll;
//g_loop.m_canQuickPoll = false;
//sort the unsorted portion
// turn off this way
int32_t saved = g_niceness;
g_niceness = 0;
// . use merge sort because it is stable, and we need to always keep
// . the identical keys that were added last
// . now we pass in a buffer to merge into, otherwise one is malloced,
// . which can fail. It falls back on qsort which is not stable.
if(!m_parent->getSortBuf()) {char *xx = NULL; *xx = 0;}
gbmergesort (list2, numUnsorted , recSize , cmpfn, 0,
m_parent->getSortBuf(), m_parent->getSortBufSize());
//g_loop.m_canQuickPoll = canQuickpoll;
g_niceness = saved;
char *p = mergeBuf;
char v;
char *lastKey = NULL;
int32_t br = 0; //bytesRemoved (abbreviated for column width)
int32_t dso = ks + sizeof(char*);//datasize offset
int32_t numNeg = 0;
while(1) {
if(list1 >= list1end) {
// . just copy into place, deduping as we go
while(list2 < list2end) {
if(lastKey && KEYCMPNEGEQ(list2,lastKey,ks) == 0) {
//this is a dup, we are removing data
if(fixedDataSize != 0) {
if(fixedDataSize == -1)
br += *(int32_t*)(lastKey+dso);
else br += fixedDataSize;
}
if ( KEYNEG(lastKey) ) numNeg++;
p = lastKey;
}
gbmemcpy(p, list2, recSize);
lastKey = p;
p += recSize;
list2 += recSize;
}
break;
}
if(list2 >= list2end) {
// . if all that is left is list 1 just copy it into
// . place, since it is already deduped
gbmemcpy(p, list1, list1end - list1);
p += list1end - list1;
break;
}
v = KEYCMPNEGEQ(list1, list2, ks);
if(v < 0) {
//never overwrite the merged list from list1 because
//it is always older and it is already deduped
if(lastKey && KEYCMPNEGEQ(list1, lastKey, ks) == 0) {
if ( KEYNEG(lastKey) ) numNeg++;
list1 += recSize;
continue;
}
gbmemcpy(p, list1, recSize);
lastKey = p;
p += recSize;
list1 += recSize;
}
else if(v > 0) {
//copy it over the one we just copied in
if(lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
//this is a dup, we are removing data
if(fixedDataSize != 0) {
if(fixedDataSize == -1)
br += *(int32_t*)(lastKey+dso);
else br += fixedDataSize;
}
if ( KEYNEG(lastKey) ) numNeg++;
p = lastKey;
}
gbmemcpy(p, list2, recSize);
lastKey = p;
p += recSize;
list2 += recSize;
}
else {
if(lastKey && KEYCMPNEGEQ(list2, lastKey, ks) == 0) {
if(fixedDataSize != 0) {
if(fixedDataSize == -1)
br += *(int32_t*)(lastKey+dso);
else br += fixedDataSize;
}
if ( KEYNEG(lastKey) ) numNeg++;
p = lastKey;
}
//found dup, take list2's
gbmemcpy(p, list2, recSize);
lastKey = p;
p += recSize;
list2 += recSize;
list1 += recSize; //fuggedaboutit!
}
}
//we compacted out the dups, so reflect that here
int32_t newNumKeys = (p - mergeBuf) / recSize;
m_parent->updateNumRecs(newNumKeys - m_numKeys , - br, -numNeg);
m_numKeys = newNumKeys;
if(m_keys != mergeBuf) m_parent->setSwapBuf(m_keys);
m_keys = mergeBuf;
m_lastSorted = m_numKeys;
m_endKey = m_keys + ((m_numKeys - 1) * recSize);
return true;
}
//make 2 half full buckets,
//addKey assumes that the *this bucket retains the lower half of the keys
//returns a new bucket with the remaining upper half.
RdbBucket* RdbBucket::split(RdbBucket* newBucket) {
// log(LOG_WARN, "splitting bucket");
int32_t b1NumKeys = m_numKeys >> 1; //m_numkeys / 2
int32_t b2NumKeys = m_numKeys - b1NumKeys;
int32_t recSize = m_parent->getRecSize();
//configure the new bucket
gbmemcpy(newBucket->m_keys,
m_keys + (b1NumKeys*recSize),
b2NumKeys * recSize);
newBucket->m_numKeys = b2NumKeys;
newBucket->m_lastSorted = b2NumKeys;
newBucket->m_endKey = newBucket->m_keys + ((b2NumKeys - 1) * recSize);
//reconfigure the old bucket
m_numKeys = b1NumKeys;
m_lastSorted = b1NumKeys;
m_endKey = m_keys + ((b1NumKeys - 1) * recSize);
//add it to our parent
return newBucket;
}
bool RdbBucket::addKey(char *key , char *data , int32_t dataSize) {
uint8_t ks = m_parent->getKeySize();
int32_t recSize = m_parent->getRecSize();
bool isNeg = KEYNEG(key);
char *newLoc = m_keys + (recSize * m_numKeys);
gbmemcpy(newLoc, key, ks);
if(data) {
*(char**)(newLoc + ks) = data;
if(m_parent->getFixedDataSize() == -1) {
*(int32_t*)(newLoc + ks + sizeof(char*)) = (int32_t)dataSize;
}
}
if(m_endKey == NULL) { //are we the first key?
if(m_numKeys > 0) {char* xx = NULL; *xx = 0;}
m_endKey = newLoc;
m_lastSorted = 1;
}
else {
// . minor optimization: if we are almost sorted, then
// . see if we can't maintain that state.
char v = KEYCMPNEGEQ(key, m_endKey, ks);
//char v = KEYCMP(key, m_endKey, ks);
if(v == 0) {
// . just replace the old key if we were the same,
// . don't inc num keys
gbmemcpy(m_endKey, newLoc, recSize);
if(KEYNEG(m_endKey)) {
if(isNeg) return true;
else m_parent->updateNumRecs(0, 0, -1);
}
else if(isNeg) m_parent->updateNumRecs(0, 0, 1);;
return true;
}
else if(v > 0) {
// . if we were greater than the old key,
// . we can assume we are still sorted, which
// . really helps us for adds which are in order
if(m_lastSorted == m_numKeys) m_lastSorted++;
m_endKey = newLoc;
}
}
m_numKeys++;
m_parent->updateNumRecs(1 , dataSize, isNeg?1:0);
return true;
}
char* RdbBucket::getKeyVal ( char *key , char **data , int32_t* dataSize ) {
sort();
int32_t i = getKeyNumExact(key);
if(i < 0) return NULL;
int32_t recSize = m_parent->getRecSize();
uint8_t ks = m_parent->getKeySize();
char* rec = m_keys + (recSize * i);
if(data) {
*data = rec + ks;
if(dataSize)
*dataSize = *(int32_t*)*data + sizeof(char*);
}
return rec;
}
int32_t RdbBucket::getKeyNumExact(char* key) {
uint8_t ks = m_parent->getKeySize();
int32_t recSize = m_parent->getRecSize();
int32_t i = 0;
char v;
char* kk;
int32_t low = 0;
int32_t high = m_numKeys - 1;
while(low <= high) {
int32_t delta = high - low;
i = low + (delta >> 1);
kk = m_keys + (recSize * i);
v = KEYCMP(key,kk,ks);
if(v < 0) {
high = i - 1;
continue;
}
else if(v > 0) {
low = i + 1;
continue;
}
else return i;
}
return -1;
}
bool RdbBucket::selfTest (char* prevKey) {
sort();
char* last = NULL;
char* kk = m_keys;
int32_t recSize = m_parent->getRecSize();
int32_t ks = m_parent->getKeySize();
//ensure our first key is > the last guy's end key
if(prevKey != NULL && m_numKeys > 0) {
if(KEYCMP(prevKey, m_keys,ks) > 0) {
log(LOG_WARN, "db: bucket's first key: %016"XINT64"%08"XINT32" "
"is less than last bucket's end key: "
"%016"XINT64"%08"XINT32"!!!!!",
*(int64_t*)(m_keys+(sizeof(int32_t))),
*(int32_t*)m_keys,
*(int64_t*)(prevKey+(sizeof(int32_t))),
*(int32_t*)prevKey);
//printBucket();
return false;
//char* xx = NULL; *xx = 0;
}
}
for(int32_t i = 0; i < m_numKeys; i++) {
//log(LOG_WARN, "rdbbuckets last key: ""%016"XINT64"%08"XINT32" num keys: %"INT32"",
// *(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, m_numKeys);
if(i > 0 && KEYCMP(last, kk, ks) > 0) {
log(LOG_WARN, "db: bucket's last key was out "
"of order!!!!!"
"key was: %016"XINT64"%08"XINT32" vs prev: %016"XINT64"%08"XINT32""
" num keys: %"INT32""
" ks=%"INT32" bucketNum=%"INT32"",
*(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk,
*(int64_t*)(last+(sizeof(int32_t))), *(int32_t*)last,
m_numKeys, ks, i);
return false;
//char* xx = NULL; *xx = 0;
}
last = kk;
kk += recSize;
}
return true;
}
void RdbBuckets::printBuckets() {
for(int32_t i = 0; i < m_numBuckets; i++) {
m_buckets[i]->printBucket();
}
}
void RdbBucket::printBucket() {
char* kk = m_keys;
int32_t recSize = m_parent->getRecSize();
for(int32_t i = 0; i < m_numKeys;i++) {
log(LOG_WARN, "rdbbuckets last key: ""%016"XINT64"%08"XINT32" num "
"keys: %"INT32"",
*(int64_t*)(kk+(sizeof(int32_t))), *(int32_t*)kk, m_numKeys);
kk += recSize;
}
}
RdbBuckets::RdbBuckets() {
m_numBuckets = 0;
m_masterPtr = NULL;
m_buckets = NULL;
m_swapBuf = NULL;
m_sortBuf = NULL;
m_isWritable = true;
m_isSaving = false;
m_dataMemOccupied = 0;
m_needsSave = false;
m_repairMode = false;
}
bool RdbBuckets::set ( int32_t fixedDataSize , int32_t maxMem,
bool ownData ,
char *allocName ,
char rdbId,
bool dataInPtrs ,
char *dbname ,
char keySize ,
bool useProtection ) {
m_numBuckets = 0;
m_ks = keySize;
m_rdbId = rdbId;
m_allocName = allocName;
m_fixedDataSize = fixedDataSize;
m_recSize = m_ks;
if(m_fixedDataSize != 0) {
m_recSize += sizeof(char*);
if(m_fixedDataSize == -1) m_recSize += sizeof(int32_t);
}
m_numKeysApprox = 0;
m_numNegKeys = 0;
m_dbname = dbname;
m_swapBuf = NULL;
m_sortBuf = NULL;
//taken from sort.cpp, this is to prevent mergesort from mallocing
m_sortBufSize = BUCKET_SIZE * m_recSize + sizeof(char*);
if(m_buckets) {char *xx = NULL; *xx = 0;}
m_maxBuckets = 0;
m_masterSize = 0;
m_masterPtr = NULL;
m_maxMem = maxMem;
int32_t perBucket = sizeof(RdbBucket*) +
sizeof(RdbBucket)
+ BUCKET_SIZE * m_recSize;
int32_t overhead = m_sortBufSize +
BUCKET_SIZE * m_recSize + //swapbuf
sizeof(RdbBuckets); //thats us, silly
int32_t avail = m_maxMem - overhead;
m_maxBucketsCapacity = avail / perBucket;
if(m_maxBucketsCapacity <= 0) {
log("db: max memory for %s's buckets is way too small to"
" accomodate even 1 bucket, reduce bucket size(%"INT32") "
"or increase max mem(%"INT32")",
m_dbname, (int32_t)BUCKET_SIZE, m_maxMem);
char *xx = NULL; *xx = 0;
}
if(!resizeTable(INIT_SIZE)) {
g_errno = ENOMEM;
return false;
}
// log("init: Successfully initialized buckets for %s, "
// "keysize is %"INT32", max mem is %"INT32", datasize is %"INT32"",
// m_dbname, (int32_t)m_ks, m_maxMem, m_fixedDataSize);
/*
RdbBuckets b;
b.set ( 0, // fixedDataSize,
50000000 , // maxTreeMem,
false, //own data
"tbuck", // m_treeName, // allocName
false, //data in ptrs
"tbuck",//m_dbname,
16, // m_ks,
false);
collnum_t cn = 1;
key128_t k;
k.n1 = 12;
k.n0 = 11;
b.addNode ( cn , (char *)&k, NULL, 0 );
// negate it
k.n0 = 10;
b.addNode ( cn , (char *)&k, NULL, 0 );
// try that
key128_t k1;
key128_t k2;
k1.setMin();
k2.setMax();
RdbList list;
int32_t np,nn;
b.getList ( cn,(char *)&k1,(char *)&k2,1000,&list,&np,&nn,false);
if ( np != 0 || nn != 1 ) { char *xx=NULL;*xx=0; }
// must be empty
if ( b.getNumKeys() != 0 ) { char *xx=NULL;*xx=0; }
*/
return true;
}
RdbBuckets::~RdbBuckets( ) {
reset();
}
void RdbBuckets::setNeedsSave(bool s) {
m_needsSave = s;
}
void RdbBuckets::reset() {
for(int32_t j = 0; j < m_numBuckets; j++) {
m_buckets[j]->reset();
}
if(m_masterPtr) mfree(m_masterPtr, m_masterSize, m_allocName );
m_masterPtr = NULL;
m_buckets = NULL;
m_bucketsSpace = NULL;
m_numBuckets = 0;
m_maxBuckets = 0;
m_dataMemOccupied = 0;
m_firstOpenSlot = 0;
m_numKeysApprox = 0;
m_numNegKeys = 0;
m_sortBuf = NULL;
m_swapBuf = NULL;
}
void RdbBuckets::clear() {
for(int32_t j = 0; j < m_numBuckets; j++) {
m_buckets[j]->reset();
}
m_numBuckets = 0;
m_firstOpenSlot = 0;
m_dataMemOccupied = 0;
m_numKeysApprox = 0;
m_numNegKeys = 0;
m_needsSave = true;
}
RdbBucket* RdbBuckets::bucketFactory() {
if(m_numBuckets == m_maxBuckets - 1) {
if(!resizeTable(m_maxBuckets * 2)) return NULL;
}
RdbBucket *b;
if(m_firstOpenSlot > m_numBuckets) {
int32_t i = 0;
for(; i < m_numBuckets; i++) {
if(m_bucketsSpace[i].getNumKeys() == 0) break;
}
b = &m_bucketsSpace[i];
}
else {
b = &m_bucketsSpace[m_firstOpenSlot];
m_firstOpenSlot++;
}
return b;
}
bool RdbBuckets::resizeTable(int32_t numNeeded) {
if(numNeeded == m_maxBuckets) return true;
if(numNeeded < INIT_SIZE) numNeeded = INIT_SIZE;
if(numNeeded > m_maxBucketsCapacity) {
if(m_maxBucketsCapacity <= m_maxBuckets) {
log(LOG_INFO,
"db: could not resize buckets currently have %"INT32" "
"buckets, asked for %"INT32", max number of buckets"
" for %"INT32" bytes with keysize %"INT32" is %"INT32"",
m_maxBuckets, numNeeded, m_maxMem, (int32_t)m_ks,
m_maxBucketsCapacity);
g_errno = ENOMEM;
return false;
}
// log(LOG_INFO,
// "db: scaling down request for buckets. "
// "Currently have %"INT32" "
// "buckets, asked for %"INT32", max number of buckets"
// " for %"INT32" bytes is %"INT32".",
// m_maxBuckets, numNeeded, m_maxMem, m_maxBucketsCapacity);
numNeeded = m_maxBucketsCapacity;
}
int32_t perBucket = sizeof(RdbBucket*) +
sizeof(RdbBucket)
+ BUCKET_SIZE * m_recSize;
int32_t tmpMaxBuckets = numNeeded;
int32_t newMasterSize = tmpMaxBuckets * perBucket +
(BUCKET_SIZE * m_recSize) + /*swap buf*/
m_sortBufSize; /*sort buf*/
if(newMasterSize > m_maxMem) {
log(LOG_WARN,
"db: Buckets oops, trying to malloc more(%"INT32") that max "
"mem(%"INT32"), should've caught this earlier.",
newMasterSize, m_maxMem);
char* xx = NULL; *xx = 0;
}
char *tmpMasterPtr = (char*)mmalloc(newMasterSize, m_allocName);
if(!tmpMasterPtr) {
g_errno = ENOMEM;
return false;
}
char* p = tmpMasterPtr;
char* bucketMemPtr = p;
p += (BUCKET_SIZE * m_recSize) * tmpMaxBuckets;
m_swapBuf = p;
p += (BUCKET_SIZE * m_recSize);
m_sortBuf = p;
p += m_sortBufSize;
RdbBucket** tmpBucketPtrs = (RdbBucket**)p;
p += tmpMaxBuckets * sizeof(RdbBucket*);
RdbBucket* tmpBucketSpace = (RdbBucket*)p;
p += tmpMaxBuckets * sizeof(RdbBucket);
if(p - tmpMasterPtr != newMasterSize) {char* xx = NULL; *xx = 0;}
for(int32_t i = 0; i < m_numBuckets; i++) {
//copy them over one at a time so they
//will now be contiguous and consistent
//with the ptrs array.
tmpBucketPtrs[i] = &tmpBucketSpace[i];
gbmemcpy(&tmpBucketSpace[i],
m_buckets[i],
sizeof(RdbBucket));
tmpBucketSpace[i].reBuf(bucketMemPtr);
bucketMemPtr += (BUCKET_SIZE * m_recSize);
}
//now do the rest
for(int32_t i = m_numBuckets; i < tmpMaxBuckets; i++) {
tmpBucketSpace[i].set(this, bucketMemPtr);
bucketMemPtr += (BUCKET_SIZE * m_recSize);
}
if(bucketMemPtr != m_swapBuf) {char* xx = NULL; *xx = 0;}
// log(LOG_WARN, "new size = %"INT32", old size = %"INT32", newMemUsed = %"INT32" "
// "oldMemUsed = %"INT32"",
// numNeeded, m_maxBuckets, newMasterSize, m_masterSize);
if(m_masterPtr) mfree(m_masterPtr, m_masterSize, m_allocName);
m_masterPtr = tmpMasterPtr;
m_masterSize = newMasterSize;
m_buckets = tmpBucketPtrs;
m_bucketsSpace = tmpBucketSpace;
m_maxBuckets = tmpMaxBuckets;
m_firstOpenSlot = m_numBuckets;
return true;
}
int32_t RdbBuckets::addNode (collnum_t collnum,
char *key,
char *data , int32_t dataSize ) {
if(!m_isWritable || m_isSaving ) {
g_errno = EAGAIN;
return false;
}
m_needsSave = true;
int32_t i;
i = getBucketNum(key, collnum);
if(i == m_numBuckets ||
m_buckets[i]->getCollnum() != collnum) {
int32_t bucketsCutoff = (BUCKET_SIZE>>1);
//when repairing the keys are added in order,
//so fill them up all of the way before moving
//on to the next one.
if(m_repairMode) bucketsCutoff = BUCKET_SIZE;
if(i != 0 &&
m_buckets[i-1]->getCollnum() == collnum &&
m_buckets[i-1]->getNumKeys() < bucketsCutoff) {
i--;
}
else if(i == m_numBuckets) {
m_buckets[i] = bucketFactory();
if(m_buckets[i] == NULL) {
g_errno = ENOMEM;
return -1;
}
m_buckets[i]->setCollnum(collnum);
m_numBuckets++;
}
else { //m_buckets[i]->getCollnum() != collnum
RdbBucket* newBucket = bucketFactory();
if(m_buckets[i] == NULL) {//can't really happen here..
g_errno = ENOMEM;
return -1;
}
newBucket->setCollnum(collnum);
addBucket(newBucket, i);
}
}
//check if we are full
if(m_buckets[i]->getNumKeys() == BUCKET_SIZE) {
//split the bucket
int64_t t = gettimeofdayInMilliseconds();
m_buckets[i]->sort();
RdbBucket* newBucket = bucketFactory();
if(newBucket == NULL ) {
g_errno = ENOMEM;
return -1;
}
newBucket->setCollnum(collnum);
m_buckets[i]->split(newBucket);
addBucket(newBucket, i+1);
if(bucketCmp(key, collnum, m_buckets[i]) > 0) i++;
int64_t took = gettimeofdayInMilliseconds() - t;
if(took > 10) log(LOG_WARN,
"db: split bucket in %"INT64" ms for %s",took,
m_dbname);
}
m_buckets[i]->addKey(key, data, dataSize);
//if(rand() % 100 == 0) selfTest(true, true);
return 0;
}
bool RdbBuckets::addBucket (RdbBucket* newBucket, int32_t i) {
//int32_t i = getBucketNum(newBucket->getEndKey(), newBucket->getCollnum());
m_numBuckets++;
int32_t moveSize = (m_numBuckets - i)*sizeof(RdbBuckets*);
if(moveSize > 0)
memmove(&m_buckets[i+1], &m_buckets[i], moveSize);
m_buckets[i] = newBucket;
return true;
}
// void RdbBuckets::deleteBucket ( int32_t i ) {
// int32_t moveSize = (m_numBuckets - i)*sizeof(RdbBuckets*);
// if(moveSize > 0)
// memmove(&m_buckets[i+1], &m_buckets[i], moveSize);
// m_numBuckets--;
// }
bool RdbBuckets::getList ( collnum_t collnum ,
char *startKey, char *endKey, int32_t minRecSizes ,
RdbList *list , int32_t *numPosRecs ,
int32_t *numNegRecs,
bool useHalfKeys ) {
if ( numNegRecs ) *numNegRecs = 0;
if ( numPosRecs ) *numPosRecs = 0;
// set *lastKey in case we have no nodes in the list
//if ( lastKey ) *lastKey = endKey;
// . set the start and end keys of this list
// . set lists's m_ownData member to true
list->reset();
// got set m_ks first so the set ( startKey, endKey ) works!
list->m_ks = m_ks;
list->set ( startKey , endKey );
list->setFixedDataSize ( m_fixedDataSize );
list->setUseHalfKeys ( useHalfKeys );
// bitch if list does not own his own data
if ( ! list->getOwnData() ) {
g_errno = EBADENGINEER;
return log(LOG_LOGIC,"db: rdbbuckets: getList: List does not "
"own data");
}
// bail if minRecSizes is 0
if ( minRecSizes == 0 ) return true;
if ( minRecSizes < 0 ) minRecSizes = 0x7fffffff;//LONG_MAX;
int32_t startBucket = getBucketNum(startKey, collnum);
if(startBucket > 0 &&
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
startBucket--;
// if the startKey is past our last bucket, then nothing
// to return
if(startBucket == m_numBuckets ||
m_buckets[startBucket]->getCollnum() != collnum) return true;
int32_t endBucket;
if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
endBucket = startBucket;
else endBucket = getBucketNum(endKey, collnum);
if(endBucket == m_numBuckets ||
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"",
//m_numBuckets, startBucket, endBucket);
if(m_buckets[endBucket]->getCollnum() != collnum) {
char* xx = NULL; *xx = 0;
}
int32_t growth = 0;
if(startBucket == endBucket) {
growth = m_buckets[startBucket]->getNumKeys() * m_recSize;
if(growth > minRecSizes) growth = minRecSizes + m_recSize;
if(!list->growList(growth))
return log("db: Failed to grow list to %"INT32" bytes "
"for storing "
"records from buckets: %s.",
growth,mstrerror(g_errno));
if(!m_buckets[startBucket]->getList(list,
startKey,
endKey,
minRecSizes,
numPosRecs,
numNegRecs,
useHalfKeys))
return false;
return true;
}
//reserve some space, it is an upper bound
for(int32_t i = startBucket; i <= endBucket; i++)
growth += m_buckets[i]->getNumKeys() * m_recSize;
if(growth > minRecSizes) growth = minRecSizes + m_recSize;
if(!list->growList(growth))
return log("db: Failed to grow list to %"INT32" bytes for storing "
"records from buckets: %s.",
growth, mstrerror(g_errno));
// separate into 3 different calls so we don't have
// to search for the start and end keys within the buckets
// unnecessarily.
if(!m_buckets[startBucket]->getList(list,
startKey,
NULL,
minRecSizes,
numPosRecs,
numNegRecs,
useHalfKeys))
return false;
int32_t i = startBucket + 1;
for(; i < endBucket && list->getListSize() < minRecSizes; i++) {
if(!m_buckets[i]->getList(list,
NULL,
NULL,
minRecSizes,
numPosRecs,
numNegRecs,
useHalfKeys))
return false;
}
if(list->getListSize() < minRecSizes)
if(!m_buckets[i]->getList(list,
NULL,
endKey,
minRecSizes,
numPosRecs,
numNegRecs,
useHalfKeys))
return false;
return true;
}
int RdbBuckets::getListSizeExact ( collnum_t collnum ,
char *startKey,
char *endKey ) {
int numBytes = 0;
int32_t startBucket = getBucketNum(startKey, collnum);
// does this mean empty?
if(startBucket > 0 &&
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
startBucket--;
if(startBucket == m_numBuckets ||
m_buckets[startBucket]->getCollnum() != collnum)
return 0;
int32_t endBucket;
if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
endBucket = startBucket;
else
endBucket = getBucketNum(endKey, collnum);
if(endBucket == m_numBuckets ||
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"",
//m_numBuckets, startBucket, endBucket);
if(m_buckets[endBucket]->getCollnum() != collnum) {
char* xx = NULL; *xx = 0; }
for( int32_t i = startBucket ; i <= endBucket ; i++)
numBytes += m_buckets[i]->getListSizeExact(startKey,endKey);
return numBytes;
}
bool RdbBuckets::testAndRepair() {
if(!selfTest(true/*thorough*/,
false/*core on error*/)) {
if(!repair()) return false;
m_needsSave = true;
}
return true;
}
bool RdbBuckets::repair() {
if(m_numBuckets == 0 &&
(m_numKeysApprox != 0 || m_numNegKeys != 0)) {
m_numKeysApprox = 0;
m_numNegKeys = 0;
log("db: RdbBuckets repaired approx key count to reflect "
"true number of keys.");
}
//int32_t tmpMaxBuckets = m_maxBuckets;
int32_t tmpMasterSize = m_masterSize;
char *tmpMasterPtr = m_masterPtr;
RdbBucket **tmpBucketPtrs = m_buckets;
int32_t tmpNumBuckets = m_numBuckets;
m_masterPtr = NULL;
m_masterSize = 0;
m_numBuckets = 0;
reset();
if(!resizeTable(INIT_SIZE)) {
log("db: RdbBuckets could not alloc enough memory to repair "
"corruption.");
g_errno = ENOMEM;
return false;
}
m_repairMode = true;
for(int32_t j = 0; j < tmpNumBuckets; j++) {
collnum_t collnum = tmpBucketPtrs[j]->getCollnum();
for(int32_t i = 0; i < tmpBucketPtrs[j]->getNumKeys(); i++) {
char* currRec = tmpBucketPtrs[j]->getKeys() +
m_recSize * i;
char* data = NULL;
int32_t dataSize = m_fixedDataSize;
if(m_fixedDataSize != 0) {
data = currRec + m_ks;
if(m_fixedDataSize == -1)
dataSize = *(int32_t*)(data + sizeof(char*));
}
if(addNode(collnum, currRec, data, dataSize) < 0) {
log(LOG_WARN, "db: got unrepairable error in "
"RdbBuckets, could not re-add data");
return false;
}
}
}
m_repairMode = false;
if(tmpMasterPtr) mfree(tmpMasterPtr, tmpMasterSize, m_allocName);
log("db: RdbBuckets repair for %"INT32" keys complete", m_numKeysApprox);
return true;
}
bool RdbBuckets::selfTest(bool thorough, bool core) {
if(m_numBuckets == 0 && m_numKeysApprox != 0) return false;
int32_t totalNumKeys = 0;
char* last = NULL;
collnum_t lastcoll = -1;
int32_t numColls = 0;
for(int32_t i = 0; i < m_numBuckets; i++) {
RdbBucket* b = m_buckets[i];
if(lastcoll != b->getCollnum()) {
last = NULL;
numColls++;
}
if(thorough) {
if(!b->selfTest (last)) {
if(!core) return false;
char* xx = NULL; *xx = 0;
}
}
totalNumKeys += b->getNumKeys();
char* kk = b->getEndKey();
//log(LOG_WARN, "rdbbuckets last key: ""%016"XINT64"%08"XINT32" "
//"num keys: %"INT32"",
//*(int64_t*)(kk+(sizeof(int32_t))),*(int32_t*)kk,b->getNumKeys());
if(i > 0 &&
lastcoll == b->getCollnum() &&
KEYCMPNEGEQ(last, kk,m_ks) >= 0) {
log(LOG_WARN, "rdbbuckets last key: "
"%016"XINT64"%08"XINT32" num keys: %"INT32"",
*(int64_t*)(kk+(sizeof(int32_t))),
*(int32_t*)kk, b->getNumKeys());
log(LOG_WARN, "rdbbuckets last key was out "
"of order!!!!!");
if(!core) return false;
char* xx = NULL; *xx = 0;
}
last = kk;
lastcoll = b->getCollnum();
}
if ( totalNumKeys != m_numKeysApprox )
log(LOG_WARN, "db have %"INT32" keys, should have %"INT32". "
"%"INT32" buckets in %"INT32" colls for db %s",
totalNumKeys, m_numKeysApprox, m_numBuckets,
numColls, m_dbname);
if(thorough && totalNumKeys != m_numKeysApprox) {
return false;
}
return true;
}
char RdbBuckets::bucketCmp(char *akey, collnum_t acoll,
char *bkey, collnum_t bcoll) {
if (acoll == bcoll) return KEYCMPNEGEQ(akey, bkey, m_ks);
if (acoll < bcoll) return -1;
return 1;
}
char RdbBuckets::bucketCmp(char *akey, collnum_t acoll,
RdbBucket* b) {
if (acoll == b->getCollnum())
return KEYCMPNEGEQ(akey, b->getEndKey(), m_ks);
if (acoll < b->getCollnum()) return -1;
return 1;
}
int32_t RdbBuckets::getBucketNum(char* key, collnum_t collnum) {
if(m_numBuckets < 10) {
int32_t i = 0;
for(; i < m_numBuckets; i++) {
RdbBucket* b = m_buckets[i];
char v = bucketCmp(key, collnum, b);
if(v > 0) continue;
if(v < 0) {break;}
else break;
}
return i;
}
int32_t i = 0;
char v;
RdbBucket* b = NULL;
int32_t low = 0;
int32_t high = m_numBuckets - 1;
while(low <= high) {
int32_t delta = high - low;
i = low + (delta >> 1);
b = m_buckets[i];
char v = bucketCmp(key, collnum, b);
if(v < 0) {
high = i - 1;
continue;
}
else if(v > 0) {
low = i + 1;
continue;
}
else return i;
}
//now fine tune:
v = bucketCmp(key, collnum, b);
if(v > 0) i++;
return i;
}
bool RdbBuckets::collExists(collnum_t collnum) {
for(int32_t i = 0; i < m_numBuckets; i++) {
if(m_buckets[i]->getCollnum() == collnum)return true;
if(m_buckets[i]->getCollnum() > collnum) break;
}
return false;
}
int32_t RdbBuckets::getNumKeys(collnum_t collnum) {
int32_t numKeys = 0;
for(int32_t i = 0; i < m_numBuckets; i++) {
if(m_buckets[i]->getCollnum() == collnum)
numKeys += m_buckets[i]->getNumKeys();
if(m_buckets[i]->getCollnum() > collnum) break;
}
return numKeys;
}
int32_t RdbBuckets::getNumKeys() {
return m_numKeysApprox;
}
// int32_t RdbBuckets::getNumNegativeKeys ( collnum_t collnum ) {
// return m_numNegKeys;
// }
// int32_t RdbBuckets::getNumPositiveKeys ( collnum_t collnum ) {
// return getNumKeys(collnum) - getNumNegativeKeys(collnum);
// }
int32_t RdbBuckets::getNumNegativeKeys ( ) {
return m_numNegKeys;
}
int32_t RdbBuckets::getNumPositiveKeys ( ) {
return getNumKeys() - getNumNegativeKeys ( );
}
char* RdbBuckets::getKeyVal ( collnum_t collnum , char *key ,
char **data , int32_t* dataSize ) {
int32_t i = getBucketNum(key, collnum);
if(i == m_numBuckets ||
m_buckets[i]->getCollnum() != collnum ) return NULL;
return m_buckets[i]->getKeyVal(key, data, dataSize);
}
void RdbBuckets::updateNumRecs(int32_t n, int32_t bytes, int32_t numNeg) {
m_numKeysApprox += n;
m_dataMemOccupied += bytes;
m_numNegKeys += numNeg;
}
char *RdbBucket::getFirstKey() {
sort();
return m_keys;
}
int32_t RdbBucket::getNumNegativeKeys ( ) {
int32_t numNeg = 0;
int32_t recSize = m_parent->getRecSize();
char *currKey = m_keys;
char *lastKey = m_keys + (m_numKeys * recSize);
while(currKey < lastKey) { //&& !list->isExhausted()
if ( KEYNEG(currKey) ) numNeg++;
currKey += recSize;
}
return numNeg;
}
bool RdbBucket::getList(RdbList* list,
char* startKey,
char* endKey,
int32_t minRecSizes,
int32_t *numPosRecs,
int32_t *numNegRecs,
bool useHalfKeys) {
sort();
//get our bounds within the bucket:
uint8_t ks = m_parent->getKeySize();
int32_t recSize = m_parent->getRecSize();
int32_t start = 0;
int32_t end = m_numKeys - 1;
char v;
char* kk = NULL;
if(startKey) {
int32_t low = 0;
int32_t high = m_numKeys - 1;
while(low <= high) {
int32_t delta = high - low;
start = low + (delta >> 1);
kk = m_keys + (recSize * start);
v = KEYCMP(startKey,kk,ks);
if(v < 0) {
high = start - 1;
continue;
}
else if(v > 0) {
low = start + 1;
continue;
}
else break;
}
//now back up or move forward s.t. startKey
//is <= start
while(start < m_numKeys) {
kk = m_keys + (recSize * start);
v = KEYCMP(startKey, kk, ks);
if(v > 0) start++;
else break;
}
}
else start = 0;
if(endKey) {
int32_t low = start;
int32_t high = m_numKeys - 1;
while(low <= high) {
int32_t delta = high - low;
end = low + (delta >> 1);
kk = m_keys + (recSize * end);
v = KEYCMP(endKey,kk,ks);
if(v < 0) {
high = end - 1;
continue;
}
else if(v > 0) {
low = end + 1;
continue;
}
else break;
}
while(end > 0) {
kk = m_keys + (recSize * end);
v = KEYCMP(endKey, kk, ks);
if(v < 0) end--;
else break;
}
}
else end = m_numKeys - 1;
//log(LOG_WARN, "numKeys %"INT32" start %"INT32" end %"INT32"",
//m_numKeys, start, end);
//keep track of our negative a positive recs
int32_t numNeg = 0;
int32_t numPos = 0;
int32_t fixedDataSize = m_parent->getFixedDataSize();
char* currKey = m_keys + (start * recSize);
//bail now if there is only one key and it is out of range.
if(start == end &&
((startKey && KEYCMP(currKey, startKey, ks) < 0) ||
(endKey && KEYCMP(currKey, endKey, ks) > 0))) {
return true;
}
// //set our real start key
// if(startKey != NULL) list->setStartKey(currKey);
char* lastKey = NULL;
for(int32_t i = start;
i <= end && list->getListSize() < minRecSizes;
i++, currKey += recSize) {
if ( fixedDataSize == 0 ) {
if ( ! list->addRecord(currKey, 0, NULL))
return log("db: Failed to add record "
"to list for %s: %s. "
"Fix the growList algo.",
m_parent->getDbname(),
mstrerror(g_errno));
}
else {
int32_t dataSize = fixedDataSize;
if ( fixedDataSize == -1 )
dataSize = *(int32_t*)(currKey +
ks + sizeof(char*));
if ( ! list->addRecord ( currKey ,
dataSize,
currKey + ks) )
return log("db: Failed to add record "
"to list for %s: %s. "
"Fix the growList algo.",
m_parent->getDbname(),
mstrerror(g_errno));
}
if ( KEYNEG(currKey) ) numNeg++;
else numPos++;
lastKey = currKey;
#ifdef GBSANITYCHECK
//sanity, remove for production
if(startKey && KEYCMP(currKey, startKey, ks) < 0) {
log("db: key is outside the "
"keyrange given for getList."
" it is < startkey."
" %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"."
" getting keys %"INT32" to %"INT32" for list"
"bounded by %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"",
*(int64_t*)(startKey+(sizeof(int32_t))),
*(int32_t*)startKey,
*(int64_t*)(currKey+(sizeof(int32_t))),
*(int32_t*)currKey,
start, end,
*(int64_t*)(startKey+(sizeof(int32_t))),
*(int32_t*)startKey,
*(int64_t*)(endKey+(sizeof(int32_t))),
*(int32_t*)endKey);
printBucket();
char* xx=NULL; *xx=0;
}
if(endKey && KEYCMP(currKey, endKey, ks) > 0) {
log("db: key is outside the "
"keyrange given for getList."
" it is > endkey"
" %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"."
" getting keys %"INT32" to %"INT32" for list"
"bounded by %016"XINT64"%08"XINT32" %016"XINT64"%08"XINT32"",
*(int64_t*)(currKey+(sizeof(int32_t))),
*(int32_t*)currKey,
*(int64_t*)(endKey+(sizeof(int32_t))),
*(int32_t*)endKey,
start, end,
*(int64_t*)(startKey+(sizeof(int32_t))),
*(int32_t*)startKey,
*(int64_t*)(endKey+(sizeof(int32_t))),
*(int32_t*)endKey);
printBucket();
char* xx=NULL; *xx=0;
}
#endif
}
// set counts to pass back, we may be accumulating over multiple
// buckets so add it to the count
if ( numNegRecs ) *numNegRecs += numNeg;
if ( numPosRecs ) *numPosRecs += numPos;
//if we don't have an end key, we were not the last bucket, so don't
//finalize the list... yes do, because we might've hit min rec sizes
if(endKey == NULL && list->getListSize() < minRecSizes) return true;
if ( lastKey != NULL ) list->setLastKey ( lastKey );
// reset the list's endKey if we hit the minRecSizes barrier cuz
// there may be more records before endKey than we put in "list"
if ( list->getListSize() >= minRecSizes && lastKey != NULL ) {
// use the last key we read as the new endKey
//key_t newEndKey = m_keys[lastNode];
char newEndKey[MAX_KEY_BYTES];
KEYSET(newEndKey, lastKey, ks);
// . if he's negative, boost new endKey by 1 because endKey's
// aren't allowed to be negative
// . we're assured there's no positive counterpart to him
// since Rdb::addRecord() doesn't allow both to exist in
// the tree at the same time
// . if by some chance his positive counterpart is in the
// tree, then it's ok because we'd annihilate him anyway,
// so we might as well ignore him
// we are little endian
if ( KEYNEG(newEndKey,0,ks) ) KEYADD(newEndKey,1,ks);
// if we're using half keys set his half key bit
if ( useHalfKeys ) KEYOR(newEndKey,0x02);
if ( m_parent->m_rdbId == RDB_POSDB ||
m_parent->m_rdbId == RDB2_POSDB2 )
newEndKey[0] |= 0x04;
// tell list his new endKey now
list->setEndKey ( newEndKey );
}
// reset list ptr to point to first record
list->resetListPtr();
// success
return true;
}
int RdbBucket::getListSizeExact (char* startKey, char* endKey ) {
int32_t numRecs = 0;
sort();
//get our bounds within the bucket:
uint8_t ks = m_parent->getKeySize();
int32_t recSize = m_parent->getRecSize();
int32_t start = 0;
int32_t end = m_numKeys - 1;
char v;
char* kk = NULL;
int32_t low = 0;
int32_t high = m_numKeys - 1;
while(low <= high) {
int32_t delta = high - low;
start = low + (delta >> 1);
kk = m_keys + (recSize * start);
v = KEYCMP(startKey,kk,ks);
if(v < 0) {
high = start - 1;
continue;
}
else if(v > 0) {
low = start + 1;
continue;
}
else break;
}
//now back up or move forward s.t. startKey
//is <= start
while(start < m_numKeys) {
kk = m_keys + (recSize * start);
v = KEYCMP(startKey, kk, ks);
if(v > 0) start++;
else break;
}
low = start;
high = m_numKeys - 1;
while(low <= high) {
int32_t delta = high - low;
end = low + (delta >> 1);
kk = m_keys + (recSize * end);
v = KEYCMP(endKey,kk,ks);
if(v < 0) {
high = end - 1;
continue;
}
else if(v > 0) {
low = end + 1;
continue;
}
else break;
}
while(end > 0) {
kk = m_keys + (recSize * end);
v = KEYCMP(endKey, kk, ks);
if(v < 0) end--;
else break;
}
//keep track of our negative a positive recs
//int32_t numNeg = 0;
//int32_t numPos = 0;
int32_t fixedDataSize = m_parent->getFixedDataSize();
char* currKey = m_keys + (start * recSize);
//bail now if there is only one key and it is out of range.
if(start == end &&
((startKey && KEYCMP(currKey, startKey, ks) < 0) ||
(endKey && KEYCMP(currKey, endKey, ks) > 0))) {
return 0;
}
// MDW: are none negatives?
if ( fixedDataSize == 0 ) {
numRecs = (end - start) * ks;
return numRecs;
}
char* lastKey = NULL;
for(int32_t i = start;
i <= end ; //&& list->getListSize() < minRecSizes;
i++, currKey += recSize) {
// if ( fixedDataSize == 0 ) {
// numRecs++;
// }
// else {
int32_t dataSize = fixedDataSize;
if ( fixedDataSize == -1 )
dataSize = *(int32_t*)(currKey+ks+sizeof(char*));
numRecs++;
//}
lastKey = currKey;
}
// success
return numRecs * ks ;
}
bool RdbBuckets::deleteList(collnum_t collnum, RdbList *list) {
if(list->getListSize() == 0) return true;
if(!m_isWritable || m_isSaving ) {
g_errno = EAGAIN;
return false;
}
// . set this right away because the head bucket needs to know if we
// . need to save
m_needsSave = true;
char startKey [ MAX_KEY_BYTES ];
char endKey [ MAX_KEY_BYTES ];
list->getStartKey ( startKey );
list->getEndKey ( endKey );
int32_t startBucket = getBucketNum(startKey, collnum);
if(startBucket > 0 &&
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
startBucket--;
// if the startKey is past our last bucket, then nothing
// to delete
if(startBucket == m_numBuckets ||
m_buckets[startBucket]->getCollnum() != collnum) return true;
int32_t endBucket = getBucketNum(endKey, collnum);
if(endBucket == m_numBuckets ||
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"",
// m_numBuckets, startBucket, endBucket);
list->resetListPtr();
for(int32_t i= startBucket; i <= endBucket && !list->isExhausted(); i++) {
if(!m_buckets[i]->deleteList(list)) {
m_buckets[i]->reset();
m_buckets[i] = NULL;
}
}
int32_t j = 0;
for(int32_t i = 0; i < m_numBuckets; i++)
if(m_buckets[i]) m_buckets[j++] = m_buckets[i];
m_numBuckets = j;
//did we delete the whole darn thing?
if(m_numBuckets == 0) {
if(m_numKeysApprox != 0) {
log("db: bucket's number of keys is getting off by %"INT32""
" after deleting a list", m_numKeysApprox);
char *xx = NULL; *xx = 0;
}
m_firstOpenSlot = 0;
}
return true;
}
bool RdbBucket::deleteList(RdbList *list) {
sort();
uint8_t ks = m_parent->getKeySize();
int32_t recSize = m_parent->getRecSize();
int32_t fixedDataSize = m_parent->getFixedDataSize();
char v;
char *currKey = m_keys;
char *p = currKey;
char listkey[MAX_KEY_BYTES];
char *lastKey = m_keys + (m_numKeys * recSize);
int32_t br = 0; //bytes removed
int32_t dso = ks+sizeof(char*);//datasize offset
int32_t numNeg = 0;
list->getCurrentKey(listkey);
while(currKey < lastKey) { //&& !list->isExhausted()
v = KEYCMP(currKey, listkey, ks);
if(v == 0) {
if(fixedDataSize != 0) {
if(fixedDataSize == -1)
br += *(int32_t*)(currKey+dso);
else br += fixedDataSize;
}
if(KEYNEG(currKey)) numNeg++;
// . forget it exists by advancing read ptr without
// . advancing the write ptr
currKey += recSize;
if(!list->skipCurrentRecord()) break;
list->getCurrentKey(listkey);
continue;
}
else if (v < 0) {
// . copy this key into place, it was not in the
// . delete list
if(p != currKey) gbmemcpy(p, currKey, recSize);
p += recSize;
currKey += recSize;
}
else { //list key > current key
// . otherwise advance the delete list until
//listKey is <= currKey
if(!list->skipCurrentRecord()) break;
list->getCurrentKey(listkey);
}
}
// . do we need to finish copying our list down to the
// . vacated mem?
if(currKey < lastKey) {
int32_t tmpSize = lastKey - currKey;
gbmemcpy(p, currKey, tmpSize);
p += tmpSize;
}
if(p > m_keys) { //do we have anything left?
int32_t newNumKeys = (p - m_keys) / recSize;
m_parent->updateNumRecs(newNumKeys - m_numKeys, - br, -numNeg);
m_numKeys = newNumKeys;
m_lastSorted = m_numKeys;
m_endKey = m_keys + ((m_numKeys - 1) * recSize);
return true;
}
else {
//we deleted the entire bucket, let our parent know to free us
m_parent->updateNumRecs( - m_numKeys, - br, -numNeg);
return false;
}
// success
return true;
}
// remove keys from any non-existent collection
void RdbBuckets::cleanBuckets ( ) {
// what buckets have -1 rdbid???
if ( m_rdbId < 0 ) return;
// the liberation count
int32_t count = 0;
/*
char buf[50000];
RdbList list;
list.set ( NULL,
0,
buf,
50000,
0, // fixeddatasize
false, // own data? should rdblist free it
false, // usehalfkeys
m_ks);
*/
top:
for ( int32_t i = 0; i < m_numBuckets; i++ ) {
RdbBucket *b = m_buckets[i];
collnum_t collnum = b->getCollnum();
CollectionRec *cr = NULL;
if ( collnum < g_collectiondb.m_numRecs )
cr = g_collectiondb.m_recs[collnum];
if ( cr ) continue;
// count # deleted
count += b->getNumKeys();
// delete that coll
delColl ( collnum );
// restart
goto top;
/*
int32_t nk = b->getNumKeys();
for (int32_t j = 0 ; j < nk ; j++ ) {
char *kp = b->m_keys + j*m_ks;
// add into list. should just be a gbmemcpy()
list.addKey ( kp , 0 , NULL );
*/
//deleteBucket ( i );
}
// print it
if ( count == 0 ) return;
log(LOG_LOGIC,"db: Removed %"INT32" records from %s buckets "
"for invalid collection numbers.",count,m_dbname);
//log(LOG_LOGIC,"db: Records not actually removed for safety. Except "
// "for those with negative colnums.");
// static bool s_print = true;
// if ( ! s_print ) return;
// s_print = false;
// log (LOG_LOGIC,"db: This is bad. Did you remove a collection "
// "subdirectory? Don't do that, you should use the \"delete "
// "collections\" interface because it also removes records from "
// "memory, too.");
}
bool RdbBuckets::delColl(collnum_t collnum) {
m_needsSave = true;
RdbList list;
int32_t minRecSizes = 1024*1024;
int32_t numPosRecs = 0;
int32_t numNegRecs = 0;
while (1) {
if(!getList(collnum, KEYMIN(), KEYMAX(), minRecSizes ,
&list , &numPosRecs , &numNegRecs, false )) {
if(g_errno == ENOMEM && minRecSizes > 1024) {
minRecSizes /= 2;
continue;
} else {
log("db: buckets could not delete "
"collection: %s.",
mstrerror(errno));
return false;
}
}
if(list.isEmpty()) break;
deleteList(collnum, &list);
}
log("buckets: deleted all keys for collnum %"INT32,(int32_t)collnum);
return true;
}
int32_t RdbBuckets::addTree(RdbTree* rt) {
int32_t n = rt->getFirstNode();
int32_t count = 0;
char* data = NULL;
int32_t dataSize = m_fixedDataSize;
while ( n >= 0 ) {
if(m_fixedDataSize != 0) {
data = rt->getData ( n );
if(m_fixedDataSize == -1)
dataSize = rt->getDataSize(n);
}
if(addNode ( rt->getCollnum (n),
rt->getKey ( n ) ,
data , dataSize) < 0)
break;
n = rt->getNextNode ( n );
count++;
}
log("db: added %"INT32" keys from tree to buckets for %s.",count, m_dbname);
return count;
}
//this could be sped up a lot, but it is only called from repair at
//the moment.
bool RdbBuckets::addList(RdbList* list, collnum_t collnum) {
char listKey[MAX_KEY_BYTES];
for( list->resetListPtr();
!list->isExhausted();
list->skipCurrentRecord()) {
list->getCurrentKey(listKey);
if(addNode(collnum,
listKey ,
list->getCurrentData() ,
list->getCurrentDataSize()) < 0)
return false;
}
return true;
}
//return the total bytes of the list bookended by startKey and endKey
int64_t RdbBuckets::getListSize ( collnum_t collnum,
char *startKey , char *endKey ,
char *minKey , char *maxKey ) {
if ( minKey ) KEYSET ( minKey , endKey , m_ks );
if ( maxKey ) KEYSET ( maxKey , startKey , m_ks );
int32_t startBucket = getBucketNum(startKey, collnum);
if(startBucket > 0 &&
bucketCmp(startKey, collnum, m_buckets[startBucket-1]) < 0)
startBucket--;
if(startBucket == m_numBuckets ||
m_buckets[startBucket]->getCollnum() != collnum) return 0;
int32_t endBucket = getBucketNum(endKey, collnum);
// not sure if i should have added this: MDW
//if(bucketCmp(endKey, collnum, m_buckets[startBucket]) <= 0)
// endBucket = startBucket;
if(endBucket == m_numBuckets ||
m_buckets[endBucket]->getCollnum() != collnum) endBucket--;
//log(LOG_WARN, "db numBuckets %"INT32" start %"INT32" end %"INT32"",
//m_numBuckets, startBucket, endBucket);
int64_t retval = 0;
for(int32_t i = startBucket; i <= endBucket; i++) {
retval += m_buckets[i]->getNumKeys();
}
return retval * m_recSize;
}
void *saveBucketsWrapper ( void *state , ThreadEntry *t ) ;
void threadDoneBucketsWrapper ( void *state , ThreadEntry *t ) ;
// . caller should call f->set() himself
// . we'll open it here
// . returns false if blocked, true otherwise
// . sets g_errno on error
bool RdbBuckets::fastSave ( char *dir ,
bool useThread ,
void *state ,
void (* callback) (void *state) ) {
if ( g_conf.m_readOnlyMode ) return true;
// we do not need a save
if ( ! m_needsSave ) return true;
// return true if already in the middle of saving
if ( m_isSaving ) return false;
// do not use thread for now!! test it to make sure that was
// not the problem
//useThread = false;
// save parms
//m_saveFile = f;
m_dir = dir;
m_state = state;
m_callback = callback;
// assume no error
m_saveErrno = 0;
// no adding to the tree now
m_isSaving = true;
/*
// skip thread call if we should
if ( ! useThread ) goto skip;
// make this a thread now
if ( g_threads.call ( SAVETREE_THREAD , // threadType
1 , // niceness
this ,
threadDoneBucketsWrapper ,
saveBucketsWrapper) ) return false;
// if it failed
if ( ! g_threads.m_disabled )
log("db: Thread creation failed. Blocking while "
"saving buckets. Hurts performance.");
skip:
*/
// . this returns false and sets g_errno on error
// . must now lock for each bucket when saving that bucket, but
// release lock to breathe between buckets
fastSave_r ();
// store save error into g_errno
g_errno = m_saveErrno;
// resume adding to the tree
m_isSaving = false;
// we do not need to be saved now?
m_needsSave = false;
// we did not block
return true;
}
void *saveBucketsWrapper ( void *state , ThreadEntry *t ) {
// get this class
RdbBuckets *THIS = (RdbBuckets *)state;
// this returns false and sets g_errno on error
THIS->fastSave_r();
// now exit the thread, bogus return
return NULL;
}
// we come here after thread exits
void threadDoneBucketsWrapper ( void *state , ThreadEntry *t ) {
// get this class
RdbBuckets *THIS = (RdbBuckets *)state;
// store save error into g_errno
g_errno = THIS->m_saveErrno;
// . resume adding to the tree
// . this will also allow other threads to be queued
// . if we did this at the end of the thread we could end up with
// an overflow of queued SAVETHREADs
THIS->m_isSaving = false;
// we do not need to be saved now?
THIS->m_needsSave = false;
// g_errno should be preserved from the thread so if fastSave_r()
// had an error it will be set
if ( g_errno )
log("db: Had error saving tree to disk for %s: %s.",
THIS->m_dbname,mstrerror(g_errno));
// . call callback
if ( THIS->m_callback ) THIS->m_callback ( THIS->m_state );
}
// . returns false and sets g_errno on error
// . NO USING g_errno IN A DAMN THREAD!!!!!!!!!!!!!!!!!!!!!!!!!
bool RdbBuckets::fastSave_r() {
if ( g_conf.m_readOnlyMode ) return true;
// recover the file
//BigFile *f = m_saveFile;
// open it up
//if ( ! f->open ( O_RDWR | O_CREAT ) )
// return log("RdbTree::fastSave_r: %s",mstrerror(g_errno));
// cannot use the BigFile class, since we may be in a thread and it
// messes with g_errno
//char *s = m_saveFile->getFilename();
char s[1024];
sprintf ( s , "%s/%s-buckets-saving.dat", m_dir , m_dbname );
int fd = ::open ( s ,
O_RDWR | O_CREAT | O_TRUNC , S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP | S_IROTH);
if ( fd < 0 ) {
m_saveErrno = errno;
return log("db: Could not open %s for writing: %s.",
s,mstrerror(errno));
}
// clear our own errno
errno = 0;
// . save the header
// . force file head to the 0 byte in case offset was elsewhere
int64_t offset = 0;
offset = fastSaveColl_r(fd, offset);
// close it up
close ( fd );
// now fucking rename it
char s2[1024];
sprintf ( s2 , "%s/%s-buckets-saved.dat", m_dir , m_dbname );
::rename ( s , s2 ) ; //fuck yeah!
// info
log("db RdbBuckets saved %"INT32" keys, %"INT64" bytes for %s",
getNumKeys(), offset, m_dbname);
return offset >= 0;
}
int64_t RdbBuckets::fastSaveColl_r(int fd, int64_t offset) {
if(m_numKeysApprox == 0) return offset;
int32_t version = SAVE_VERSION;
int32_t err = 0;
if ( pwrite ( fd , &version, sizeof(int32_t) , offset ) != 4 ) err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd , &m_numBuckets, sizeof(int32_t) , offset)!=4)err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd , &m_maxBuckets, sizeof(int32_t) , offset)!=4)err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd , &m_ks, sizeof(uint8_t) , offset ) != 1) err=errno;
offset += sizeof(uint8_t);
if ( pwrite ( fd , &m_fixedDataSize,sizeof(int32_t),offset)!=4) err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd , &m_recSize, sizeof(int32_t) , offset ) != 4) err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd , &m_numKeysApprox,sizeof(int32_t),offset) !=4)err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd , &m_numNegKeys,sizeof(int32_t),offset) != 4 ) err=errno;
offset += sizeof(int32_t);
if ( pwrite ( fd,&m_dataMemOccupied,sizeof(int32_t),offset)!=4)err=errno;
offset += sizeof(int32_t);
int32_t tmp = BUCKET_SIZE;
if ( pwrite ( fd , &tmp, sizeof(int32_t) , offset ) != 4 ) err=errno;
offset += sizeof(int32_t);
// int32_t len = gbstrlen(m_dbname) + 1;
// pwrite ( fd , &m_dbname, len , offset );
// offset += len;
// set it
if ( err ) errno = err;
// bitch on error
if ( errno ) {
m_saveErrno = errno;
close ( fd );
log("db: Failed to save buckets for %s: %s.",
m_dbname,mstrerror(errno));
return -1;
}
// position to store into m_keys, ...
for (int32_t i = 0; i < m_numBuckets; i++ ) {
offset = m_buckets[i]->fastSave_r(fd, offset);
// returns -1 on error
if ( offset < 0 ) {
close ( fd );
m_saveErrno = errno;
log("db: Failed to save buckets for %s: %s.",
m_dbname,mstrerror(errno));
return -1;
}
}
return offset;
}
bool RdbBuckets::loadBuckets ( char* dbname) {
char filename[256];
sprintf(filename,"%s-buckets-saved.dat",dbname);
// set this to false
// msg
//log (0,"Rdb::loadTree: loading %s",filename);
// set a BigFile to this filename
BigFile file;//g_hostdb.m_dir
char *dir = g_hostdb.m_dir;
if( *dir == '\0') dir = ".";
file.set ( dir , filename , NULL );
if ( file.doesExist() <= 0 ) return true;
// load the table with file named "THISDIR/saved"
bool status = false ;
status = fastLoad ( &file , dbname ) ;
file.close();
return status;
}
bool RdbBuckets::fastLoad ( BigFile *f , char* dbname) {
// msg
log(LOG_INIT,"db: Loading %s.",f->getFilename());
// open it up
if ( ! f->open ( O_RDONLY ) ) return false;
int32_t fsize = f->getFileSize();
if ( fsize == 0 ) return true;
// init offset
int64_t offset = 0;
offset = fastLoadColl(f, dbname, offset);
if ( offset < 0 ) {
log("db: Failed to load buckets for %s: %s.",
m_dbname,mstrerror(g_errno));
return false;
}
return true;
}
int64_t RdbBuckets::fastLoadColl( BigFile *f,
char *dbname,
int64_t offset ) {
int32_t maxBuckets;
int32_t numBuckets;
int32_t version;
f->read ( &version,sizeof(int32_t), offset );
offset += sizeof(int32_t);
if(version > SAVE_VERSION) {
log("db: Failed to load buckets for %s: "
"saved version is in the future or is corrupt, "
"please restart old executable and do a ddump.",
m_dbname);
return -1;
}
f->read ( &numBuckets,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &maxBuckets,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &m_ks,sizeof(uint8_t), offset );
offset += sizeof(uint8_t);
f->read ( &m_fixedDataSize,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &m_recSize,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &m_numKeysApprox,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &m_numNegKeys,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &m_dataMemOccupied, sizeof(int32_t), offset );
offset += sizeof(int32_t);
int32_t bucketSize;
f->read ( &bucketSize, sizeof(int32_t), offset );
offset += sizeof(int32_t);
if(bucketSize != BUCKET_SIZE) {
log("db: It appears you have changed the bucket size "
"please restart the old executable and dump "
"buckets to disk. old=%"INT32" new=%"INT32"",
bucketSize, (int32_t)BUCKET_SIZE);
char *xx = NULL; *xx = 0;
}
m_dbname = dbname;
if ( g_errno )
return -1;
for (int32_t i = 0; i < numBuckets; i++ ) {
m_buckets[i] = bucketFactory();
if(m_buckets[i] == NULL) return -1;
offset = m_buckets[i]->fastLoad(f, offset);
// returns -1 on error
if ( offset < 0 )
return -1;
m_numBuckets++;
}
return offset;
}
// max key size -- posdb, 18 bytes, so use 18 here
#define BTMP_SIZE (BUCKET_SIZE*18+1000)
int64_t RdbBucket::fastSave_r(int fd, int64_t offset) {
// first copy to a buf before saving so we can unlock!
char tmp[BTMP_SIZE];
char *p = tmp;
gbmemcpy ( p , &m_collnum, sizeof(collnum_t) );
p += sizeof(collnum_t);
//pwrite ( fd , &m_collnum, sizeof(collnum_t) , offset );
//offset += sizeof(collnum_t);
gbmemcpy ( p , &m_numKeys, sizeof(int32_t) );
p += sizeof(m_numKeys);
//pwrite ( fd , &m_numKeys, sizeof(int32_t) , offset );
//offset += sizeof(m_numKeys);
gbmemcpy ( p , &m_lastSorted, sizeof(int32_t) );
p += sizeof(m_lastSorted);
//pwrite ( fd , &m_lastSorted, sizeof(int32_t) , offset );
//offset += sizeof(m_lastSorted);
int32_t endKeyOffset = m_endKey - m_keys;
gbmemcpy ( p , &endKeyOffset, sizeof(int32_t) );
p += sizeof(int32_t);
//pwrite ( fd , &endKeyOffset, sizeof(int32_t) , offset );
//offset += sizeof(int32_t);
int32_t recSize = m_parent->getRecSize();
gbmemcpy ( p , m_keys, recSize*m_numKeys );
p += recSize*m_numKeys;
//pwrite ( fd , m_keys, recSize*m_numKeys , offset );
//offset += recSize*m_numKeys;
int32_t size = p - tmp;
if ( size > BTMP_SIZE ) {
log("buckets: btmp_size too small. keysize>18 bytes?");
char *xx=NULL;*xx=0;
}
// now we can save it without fear of being interrupted and having
// the bucket altered
errno = 0;
if ( pwrite ( fd , tmp , size , offset ) != size ) {
log("db:fastSave_r: %s.",mstrerror(errno));
return -1;
}
return offset + size;
}
int64_t RdbBucket::fastLoad(BigFile *f, int64_t offset) {
//errno = 0;
f->read ( &m_collnum,sizeof(collnum_t), offset );
offset += sizeof(collnum_t);
f->read ( &m_numKeys,sizeof(int32_t), offset );
offset += sizeof(int32_t);
f->read ( &m_lastSorted,sizeof(int32_t), offset );
offset += sizeof(int32_t);
int32_t endKeyOffset;
f->read ( &endKeyOffset,sizeof(int32_t), offset );
offset += sizeof(int32_t);
int32_t recSize = m_parent->getRecSize();
f->read ( m_keys,recSize*m_numKeys, offset );
offset += recSize*m_numKeys;
m_endKey = m_keys + endKeyOffset;
if ( g_errno ) {
log("bucket: fastload %s",mstrerror(g_errno));
return -1;
}
return offset;
}