#include "gb-include.h" #undef _XOPEN_SOURCE #define _XOPEN_SOURCE 500 #include #include "Threads.h" #include "RdbCache.h" #include "Collectiondb.h" #include "Loop.h" #include "Msg17.h" //#include "Dns.h" // g_dns //#include "Msg36.h" // g_qtable #include "Msg13.h" //#include "Msg10.h" // g_deadWaitCache #include "Dns.h" #include "BigFile.h" #include "Spider.h" bool g_cacheWritesEnabled = true; RdbCache::RdbCache () { m_totalBufSize = 0; m_numBufs = 0; m_ptrs = NULL; m_maxMem = 0; m_numPtrsMax = 0; reset(); m_needsSave = false; m_corruptionDetected = false; } RdbCache::~RdbCache ( ) { reset (); } #define BUFSIZE (128*1024*1024) //#define BUFSIZE (100000) //#define BUFSIZE (200000) void RdbCache::reset ( ) { //if ( m_numBufs > 0 ) // log("db: resetting record cache"); m_offset = 0; m_tail = 0; for ( int32_t i = 0 ; i < m_numBufs ; i++ ) // all bufs, but not necessarily last, are BUFSIZE bytes big mfree ( m_bufs[i] , m_bufSizes[i] , "RdbCache" ); m_numBufs = 0; m_totalBufSize= 0; if ( m_ptrs ) mfree ( m_ptrs , m_numPtrsMax*sizeof(char *),"RdbCache"); m_ptrs = NULL; m_numPtrsUsed = 0; // can't reset this, breaks the load! //m_numPtrsMax = 0; m_memOccupied = 0; m_memAlloced = 0; m_numHits = 0; m_numMisses = 0; //m_wrapped = false; m_adds = 0; m_deletes = 0; // assume no need to call convertCache() m_convert = false; m_isSaving = false; } bool RdbCache::init ( int32_t maxMem , int32_t fixedDataSize , bool supportLists , int32_t maxRecs , bool useHalfKeys , char *dbname , bool loadFromDisk , char cacheKeySize , char dataKeySize , int32_t numPtrsMax ) { // reset all reset(); // watch out if ( maxMem < 0 ) return log(LOG_LOGIC,"db: cache for %s had " "negative maxMem." , dbname); if ( maxRecs < 0 ) return log(LOG_LOGIC,"db: cache for %s had " "negative maxRecs.", dbname); // don't use more mem than this m_maxMem = maxMem; m_maxColls = (1LL << (sizeof(collnum_t)*8)); RdbCache *robots = Msg13::getHttpCacheRobots(); RdbCache *others = Msg13::getHttpCacheOthers(); // do not use some cache if we are the tmp cluster RdbCache *th = NULL; if ( g_hostdb.m_useTmpCluster ) th = this; //if ( th == &g_genericCache[SITEQUALITY_CACHEID] ) maxMem = 0; if ( th == g_dns.getCache () ) maxMem = 0; if ( th == g_dns.getCacheLocal () ) maxMem = 0; if ( th == robots ) maxMem = 0; if ( th == others ) maxMem = 0; //if ( th == &g_forcedCache ) maxMem = 0; //if ( th == &g_alreadyAddedCache ) maxMem = 0; // this is the fixed dataSize of all records in a list, not the // fixed dataSize of a list itself. Note that. m_fixedDataSize = fixedDataSize; m_supportLists = supportLists; m_useHalfKeys = useHalfKeys; m_useDisk = loadFromDisk; m_dbname = dbname; m_dks = dataKeySize; m_cks = cacheKeySize; // if maxMem is zero just return true if ( m_maxMem <= 0 ) return true; // assume no need to call convertCache() m_convert = false; // . double what they had so hash table is somewhat sparse // . ::load() uses this so set it here m_numPtrsMax = maxRecs * 2; // it might have been provided though, too if ( numPtrsMax > 0 ) m_numPtrsMax = numPtrsMax; // try loading from disk before anything else if ( m_useDisk ) { if ( load ( m_dbname ) ) return true; //log("RdbCache::init: cache load failed"); g_errno = 0; } // . make our hash table, zero it out // . don't allow it more than 50% full for performance m_threshold = maxRecs; // (maxRecs * 50 ) / 100; if ( m_threshold == m_numPtrsMax ) m_threshold--; char ttt[128]; sprintf(ttt,"cptrs-%s",m_dbname); m_ptrs = (char **) mcalloc (sizeof(char *)*m_numPtrsMax , ttt ); if ( ! m_ptrs ) return log("RdbCache::init: %s", mstrerror(g_errno)); // debug testing -- remove later //m_crcs=(int32_t *)mcalloc(4*m_numPtrsMax,"RdbCache"); //if (!m_crcs)return log("RdbCache::init: %s", mstrerror(g_errno)); // update OUR mem alloced m_memAlloced = m_numPtrsMax * sizeof(char *); // include this m_memOccupied = 0; // m_memOccupied = m_memAlloced; sprintf(ttt,"cbuf-%s",m_dbname); // . make the 128MB buffers // . if we do more than 128MB per buf then pthread_create() will fail int32_t bufMem = m_maxMem - m_memAlloced; if( bufMem <= 0 ) { log("rdbcache: cache for %s does not have enough mem. fix " "by increasing maxmem or number of recs, etc.",m_dbname); return false; char *xx=NULL;*xx=0; } if ( bufMem && m_fixedDataSize > 0 && bufMem / m_fixedDataSize < maxRecs / 2 ) { log("cache: warning. " "cache for %s can have %i ptrs but buf mem " "can only hold %i objects" ,m_dbname ,(int)maxRecs ,(int)(bufMem/m_fixedDataSize)); } m_totalBufSize = 0LL; m_offset = 0LL; while ( bufMem > 0 && m_numBufs < 32 ) { int32_t size = bufMem; if ( size > BUFSIZE ) size = BUFSIZE; m_bufSizes [ m_numBufs ] = size; m_bufs [ m_numBufs ] = (char *)mcalloc(size,ttt); //m_bufEnds [ m_numBufs ] = NULL; if ( ! m_bufs [ m_numBufs ] ) { reset(); return log("db: Could not allocate %"INT32" bytes for " "cache for %s.",size,dbname); } m_numBufs++; bufMem -= size; m_memAlloced += size; m_totalBufSize += size; } // now fill ourselves up if ( m_convert ) convertCache ( m_convertNumPtrsMax , m_convertMaxMem ); return true; } //bool RdbCache::isInCache ( collnum_t collnum, key_t cacheKey, int32_t maxAge ) { bool RdbCache::isInCache ( collnum_t collnum, char *cacheKey, int32_t maxAge ) { // maxAge of 0 means don't check cache if ( maxAge == 0 ) return false; // bail if no cache if ( m_numPtrsMax <= 0 ) return false; // look up in hash table //int32_t n=(cacheKey.n0 + (uint64_t)cacheKey.n1)% m_numPtrsMax; int32_t n = hash32 ( cacheKey , m_cks ) % m_numPtrsMax; // chain while ( m_ptrs[n] && ( *(collnum_t *)(m_ptrs[n]+0 ) != collnum || //*(key_t *)(m_ptrs[n]+sizeof(collnum_t)) != cacheKey ) ) KEYCMP(m_ptrs[n]+sizeof(collnum_t),cacheKey,m_cks) != 0 ) ) if ( ++n >= m_numPtrsMax ) n = 0; // return false if not found if ( ! m_ptrs[n] ) return false; // get timestamp char *p = m_ptrs[n]; // skip over collnum_t and key //p += sizeof(collnum_t) + sizeof(key_t); p += sizeof(collnum_t) + m_cks; // get time stamp int32_t timestamp = *(int32_t *)p; // return false if too old if ( maxAge > 0 && getTimeLocal() - timestamp > maxAge ) return false; // return true if found return true; } // . a quick hack for SpiderCache.cpp // . if your record is always a 4 byte int32_t call this // . returns -1 if not found, so don't store -1 in there then int64_t RdbCache::getLongLong ( collnum_t collnum , uint32_t key , int32_t maxAge , bool promoteRecord ) { char *rec; int32_t recSize; key_t k; k.n0 = 0; k.n1 = (uint64_t)key; // sanity check //if ( m_cks != 4 ) { char *xx = NULL; *xx = 0; } // return -1 if not found if ( ! getRecord ( collnum , //k , //(char *)&key , (char *)&k, &rec , &recSize , false , maxAge , // in seconds, -1 means none true , // incCounts? NULL , // cacheTime ptr promoteRecord ) ) return -1LL; if ( recSize != 8 ) { log(LOG_LOGIC,"db: cache: Bad engineer. RecSize = %"INT32".", recSize); return -1LL; } // otherwise, it was found and the right length, so return it return *(int64_t *)rec; } // both key and returned value are int64_ts for this int64_t RdbCache::getLongLong2 ( collnum_t collnum , uint64_t key , int32_t maxAge , bool promoteRecord ) { char *rec; int32_t recSize; key_t k; k.n0 = (uint64_t)key; k.n1 = 0; // sanity check if ( m_cks != 8 ) { char *xx = NULL; *xx = 0; } if ( m_dks != 0 ) { char *xx = NULL; *xx = 0; } // return -1 if not found if ( ! getRecord ( collnum , (char *)&k, &rec , &recSize , false , maxAge , // in seconds, -1 means none true , // incCounts? NULL , // cacheTime ptr promoteRecord ) ) return -1LL; if ( recSize != 8 ) { log(LOG_LOGIC,"db: cache: Bad engineer. RecSize = %"INT32".", recSize); return -1LL; } // otherwise, it was found and the right length, so return it return *(int64_t *)rec; } // this puts a int32_t in there void RdbCache::addLongLong2 ( collnum_t collnum , uint64_t key , int64_t value , char **retRecPtr ) { key_t k; k.n0 = (uint64_t)key; k.n1 = 0; // sanity check if ( m_cks != 8 ) { char *xx = NULL; *xx = 0; } if ( m_dks != 0 ) { char *xx = NULL; *xx = 0; } addRecord ( collnum , (char *)&k , NULL , 0 , (char *)&value , 8 , 0 , // timestamp=now retRecPtr ); // clear error in case addRecord set it g_errno = 0; } // this puts a int32_t in there void RdbCache::addLongLong ( collnum_t collnum , uint32_t key , int64_t value , char **retRecPtr ) { key_t k; k.n0 = 0; k.n1 = (uint64_t)key; // sanity check //if ( m_cks != 4 ) { char *xx = NULL; *xx = 0; } // sanity check if ( m_cks > (int32_t)sizeof(key_t) ) { char *xx = NULL; *xx = 0; } //if ( m_dks != 0 ) { char *xx = NULL; *xx = 0; } //addRecord ( collnum , k , NULL , 0 , (char *)&value , 8 , //addRecord ( collnum , (char *)&key , NULL , 0 , (char *)&value , 8 , addRecord ( collnum , (char *)&k , NULL , 0 , (char *)&value , 8 , 0 , // timestamp=now retRecPtr ); // clear error in case addRecord set it g_errno = 0; } int32_t RdbCache::getLong ( collnum_t collnum , uint64_t key , int32_t maxAge , bool promoteRecord ) { char *rec; int32_t recSize; key_t k; // TODO: fix this!?! k.n0 = key, k.n1 = 0? k.n0 = 0; k.n1 = key; // return -1 if not found if ( ! getRecord ( collnum , (char *)&k, &rec , &recSize , false , // do copy? maxAge , // in seconds, -1 means none true , // incCounts? NULL , // cacheTime ptr promoteRecord ) ) return -1; if ( recSize != 4 ) { log(LOG_LOGIC,"db: cache: Bad engineer. RecSize = %"INT32".", recSize); return -1; } // otherwise, it was found and the right length, so return it return *(int32_t *)rec; } // this puts a int32_t in there void RdbCache::addLong ( collnum_t collnum , uint64_t key , int32_t value , char **retRecPtr ) { key_t k; k.n0 = 0; k.n1 = key; // sanity check if ( m_cks > (int32_t)sizeof(key_t) ) { char *xx = NULL; *xx = 0; } addRecord ( collnum , (char *)&k , NULL , 0 , (char *)&value , // by long we really mean 32 bits! 4,//sizeof(char *), // 4 , now 8 for 64 bit archs 0 , // timestamp=now retRecPtr ); // clear error in case addRecord set it g_errno = 0; } bool RdbCache::getRecord ( char *coll , //key_t cacheKey , char *cacheKey , char **rec , int32_t *recSize , bool doCopy , int32_t maxAge , bool incCounts , time_t *cachedTime , bool promoteRecord) { collnum_t collnum = g_collectiondb.getCollnum ( coll ); if ( collnum < (collnum_t) 0 ) { log("db: Could not get cache rec for collection \"%s\".",coll); return false; } return getRecord ( collnum , cacheKey , rec , recSize , doCopy , maxAge , incCounts , cachedTime, promoteRecord ); } // returns false if was not in the cache, true otherwise bool RdbCache::setTimeStamp ( collnum_t collnum , char *cacheKey , int32_t newTimeStamp ) { // return now if table empty if ( m_numPtrsMax <= 0 ) return false; // look up in hash table int32_t n = hash32 ( cacheKey , m_cks ) % m_numPtrsMax; // chain while ( m_ptrs[n] && ( *(collnum_t *)(m_ptrs[n]+0 ) != collnum || KEYCMP(m_ptrs[n]+sizeof(collnum_t),cacheKey,m_cks) != 0 ) ) if ( ++n >= m_numPtrsMax ) n = 0; // return ptr to rec char *p = m_ptrs[n]; // return false if not found if ( ! p ) return false; // skip over collnum and key p += sizeof(collnum_t) + m_cks; // set the timestamp *(int32_t *)p = newTimeStamp; return true; } // . returns true if found, false if not found in cache // . sets *rec and *recSize iff found bool RdbCache::getRecord ( collnum_t collnum , //key_t cacheKey , char *cacheKey , char **rec , int32_t *recSize , bool doCopy , int32_t maxAge , bool incCounts , time_t *cachedTime , bool promoteRecord ) { // maxAge of 0 means don't check cache if ( maxAge == 0 ) return false; // bail if no cache if ( m_numPtrsMax <= 0 ) return false; // if init() called failed because of oom... if ( ! m_ptrs ) //return log("cache: getRecord: failed because oom"); return false; // time it -- debug int64_t t = 0LL ; if ( g_conf.m_logTimingDb ) t = gettimeofdayInMillisecondsLocal(); // reset this if ( cachedTime ) *cachedTime = 0; // only do copy supported //if ( ! doCopy ) // return log("RdbCache::getRecord: only doCopy supported"); // look up in hash table //int32_t n =(cacheKey.n0 + (uint64_t)cacheKey.n1)%m_numPtrsMax; int32_t n = hash32 ( cacheKey , m_cks ) % m_numPtrsMax; //int32_t n = cacheKey.n0 % m_numPtrsMax; // chain while ( m_ptrs[n] && ( *(collnum_t *)(m_ptrs[n]+0 ) != collnum || //*(key_t *)(m_ptrs[n]+sizeof(collnum_t)) != cacheKey ) ) KEYCMP(m_ptrs[n]+sizeof(collnum_t),cacheKey,m_cks) != 0 ) ) if ( ++n >= m_numPtrsMax ) n = 0; //while ( m_ptrs[n] && *(key_t *)m_ptrs[n] != cacheKey ) // if ( ++n >= m_numPtrsMax ) n = 0; // return false if not found if ( ! m_ptrs[n] ) { if ( incCounts ) m_numMisses++; return false; } // return ptr to rec char *p = m_ptrs[n]; // if collnum is -1 then that means we set it to that in // RdbCache::clear(). this is kinda hacky. if ( *(collnum_t *)p == (collnum_t)-1 ) { if ( incCounts ) m_numMisses++; return false; } // skip over collnum and key //p += sizeof(collnum_t) + sizeof(key_t); p += sizeof(collnum_t) + m_cks; // skip over time stamp int32_t timestamp = *(int32_t *)p; if ( cachedTime ) *cachedTime = timestamp; // return false if too old if ( maxAge > 0 && getTimeLocal() - timestamp > maxAge ) { // debug msg // don't print for tagdb, however, spider prints it // too much and i don't care about it if ( m_dbname[0]!='s' || m_dbname[1]!='i' ) log(LOG_DEBUG,"db: Found rec in cache for %s, " "but elapsed time of %"INT32" is greater " "than %"INT32".", m_dbname, (int32_t)(getTimeLocal() - timestamp) , maxAge ); if ( incCounts ) m_numMisses++; return false; } // skip timestamp p += 4; // store data size if our recs are var length or we cache lists of // fixed length recs, and those lists need a dataSize if ( m_fixedDataSize == -1 || m_supportLists ) { *recSize = *(int32_t *)p; p += 4; } else *recSize = m_fixedDataSize; // . debug testing -- remove later // . get checksum //char *s = m_ptrs[n]; //char *send = s + (p - s) + *recSize - 3; //int32_t crc = 0; //while ( s < send ) { crc += *(int32_t *)s; s += 4; } //if ( crc != m_crcs[n] ) { // log("BAD ENGINNEER. CRC MISMATCH."); // char *pp = NULL; // *pp = 1; // sleep (10000); //} // set it for return *rec = p; // copy the data and set "list" with it iff "doCopy" is true if ( doCopy && *recSize > 0 ) { *rec = mdup ( p , *recSize , "RdbCache3" ); if ( ! *rec ) { return log("db: Could not allocate space for " "cached record for %s of %"INT32" bytes.", m_dbname,*recSize); } } RdbCache *robots = Msg13::getHttpCacheRobots(); RdbCache *others = Msg13::getHttpCacheOthers(); // // now we only promote the record if it is near the delete head // in order to avoid having massive duplicity. if it is with 10% // of the delete head's space i guess. // i do this for all caches now... what are the downsides? i forget. // bool check = true;//false; //if ( this == &g_genericCache[SITEQUALITY_CACHEID] ) check = true; if ( this == g_dns.getCache () ) check = true; if ( this == g_dns.getCacheLocal () ) check = true; if ( this == robots ) check = true; if ( this == others ) check = true; //if ( this == &g_deadWaitCache ) check = true; //if ( this == &g_forcedCache ) check = true; //if ( this == &g_alreadyAddedCache ) check = true; // this algo seems to have issues with really large recs // because spaces.live.com list was 570k and the data was foobar // so just don't do promotion on it ever... //if ( this == &g_tagdb.m_listCache ) check = true; // the exact count cache... //if ( this == &g_qtable ) check = true; //if ( m_totalBufSize < 20000 ) check = false; if ( check ) promoteRecord = false; // sanity check, do not allow the site quality cache or dns cache to // be > 128MB, that just does not make sense and it complicates things //if(check && m_totalBufSize > BUFSIZE ) { char *xx = NULL; *xx = 0; } // sanity check if ( m_tail < 0 || m_tail > m_totalBufSize ) { char *xx = NULL; *xx = 0; } // get the window of promotion int32_t tenPercent = (int32_t)(((float)m_totalBufSize) * .10); char *start1 = m_bufs[0] + m_tail ; char *end1 = start1 + tenPercent; char *start2 = NULL; char *end2 = NULL; char *max = m_bufs[0] + m_totalBufSize; if ( end1 > max ) { start2 = m_bufs[0]; end2 = start2 + (end1 - max); end1 = max; } // are we in 10% range? if so, promote to head of the ring buffer // to avoid losing it in a delete operation if ( check && *rec >= start1 && *rec <= end1 ) promoteRecord = true; if ( check && *rec >= start2 && *rec <= end2 ) promoteRecord = true; // debug //if ( check ) // logf(LOG_DEBUG, // "db: promote=%"UINT32" " // "start1=%"UINT32" end1=%"UINT32" " // "start2=%"UINT32" end2=%"UINT32" " // "rec=%"UINT32" m_tail=%"UINT32" bufs[0]=%"UINT32" total=%"UINT32"", // (int32_t)promoteRecord , // (int32_t)start1,(int32_t)end1,(int32_t)start2,(int32_t)end2, // (int32_t)*rec,(int32_t)m_tail,(int32_t)m_bufs[0],m_totalBufSize); // . now promote the record, same as adding (this always copies) // . do this after mdup as there is a chance it will overwrite // the original record with the copy of the same record // . Process.cpp turns off g_cacheWritesEnabled while it saves them if ( promoteRecord && ! m_isSaving && g_cacheWritesEnabled ) { //char *ptr = m_ptrs[n]; //removeKey ( collnum , cacheKey , ptr ); //markDeletedRecord(ptr); //int32_t n = hash32 ( cacheKey , m_cks ) % m_numPtrsMax; //if ( this == &g_robotdb.m_rdbCache ) // if ( this == &g_spiderLoop.m_winnerListCache ) { // logf(LOG_DEBUG, "db: cachebug: promoting record " // "k.n0=0x%"XINT64" n=%"INT32"", // ((key_t *)cacheKey)->n0, // *recSize); // } char *retRec = NULL; addRecord ( collnum , cacheKey , *rec , *recSize , timestamp , &retRec ); // update our rec, it might have been deleted then re-added // and we have to be careful of that delimter clobbering // memset() below if ( ! doCopy ) *rec = retRec; } // keep track of cache stats if we should if ( incCounts ) { m_numHits++; m_hitBytes += *recSize; } // debug msg time if ( g_conf.m_logTimingDb ) log(LOG_TIMING,"db: cache: %s getRecord %"INT32" bytes took %"INT64" " "ms.",m_dbname,*recSize, gettimeofdayInMillisecondsLocal()-t); // it was found, so return true return true; } // . returns true if found, // . returns false if not found or on error // . sets errno on error // . list's data may be empty if it's a cached not found // . we use "endKey" so we know if the FULL list was needed or not // . if "incCounts" is true and we hit we inc the hit count // . if "incCounts" is true and we miss we inc the miss count bool RdbCache::getList ( collnum_t collnum , //key_t cacheKey , //key_t startKey , char *cacheKey , char *startKey , RdbList *list , bool doCopy , int32_t maxAge , bool incCounts ) { // reset the list list->reset(); // maxAge of 0 means don't check cache if ( maxAge == 0 ) return false; // get pure record int32_t recSize; char *rec; // return false right away if not found if ( ! getRecord ( collnum , cacheKey , &rec , &recSize , doCopy , maxAge , incCounts , NULL ) ) return false; // first 2 keys of bytes are the start and end keys //key_t endKey = *(key_t *)rec; //char *data = rec + sizeof(key_t); //int32_t dataSize = recSize - sizeof(key_t); char *endKey = rec; char *data = rec + m_dks; int32_t dataSize = recSize - m_dks; // use NULL if empty if ( dataSize == 0 ) data = NULL; // how could this happen if ( dataSize < 0 ) return log(LOG_LOGIC,"db: cache: getList: " "Bad data size."); // . set the list! // . data is NULL if it's a cached not found (empty list) list->set ( data , dataSize , rec , // alloc recSize , // alloc size startKey , endKey , m_fixedDataSize , doCopy , // ownData? m_useHalfKeys , m_dks ); // sanity check //bool ok = list->checkList_r ( false , true ); //if ( ! ok ) log("RDBCACHE::GETLIST had problem"); // break out //if ( ! ok ) { char *xx = NULL; *xx = 0; } return true; } // returns false and sets errno on error //bool RdbCache::addList ( char *coll , key_t cacheKey , RdbList *list ) { bool RdbCache::addList ( char *coll , char *cacheKey , RdbList *list ) { collnum_t collnum = g_collectiondb.getCollnum ( coll ); if ( collnum < 0 ) { log("query: Collection %s does not exist. Cache failed.", coll); return false; } return addList ( collnum , cacheKey , list ); } // returns false and sets errno on error //bool RdbCache::addList ( collnum_t collnum , key_t cacheKey , RdbList *list){ bool RdbCache::addList ( collnum_t collnum , char *cacheKey , RdbList *list ) { // . sanity check // . msg2 sometimes fails this check when it adds to the cache if ( list->m_ks != m_dks ) { //g_errno = EBADENGINEER; return log("cache: key size %"INT32" != %"INT32"", (int32_t)list->m_ks,(int32_t)m_dks); //char *xx = NULL; *xx = 0; } } // store endkey then list data in the record data slot //key_t k; //k = list->getLastKey (); char *k = list->getLastKey (); // just to make sure char *data = list->getList(); int32_t dataSize = list->getListSize(); if ( ! data ) dataSize = 0; // . add as a record // . key is combo of startKey/endKey // . return false on error (and set errno), false otherwise return addRecord ( collnum , cacheKey , //(char *)&k , sizeof(key_t) , k , m_dks , list->getList() , list->getListSize() , 0 ); } // . basically adding a list of only 1 record // . used by dns/Dns.cpp to store ip's whose key is the hash of a hostname // . "rec" may be a raw RdbList (format=key/recSize/rec) or may just be data // . we do not copy "rec" so caller must malloc it // . returns -1 on error and sets errno // . returns node # in tree we added it to on success bool RdbCache::addRecord ( collnum_t collnum , //key_t cacheKey , char *cacheKey , char *rec , int32_t recSize , int32_t timestamp , char **retRecPtr ) { return addRecord (collnum, cacheKey, NULL, 0, rec, recSize, timestamp, retRecPtr); } bool RdbCache::addRecord ( char *coll , //key_t cacheKey , char *cacheKey , char *rec , int32_t recSize , int32_t timestamp ) { collnum_t collnum = g_collectiondb.getCollnum ( coll ); if ( collnum < (collnum_t) 0 ) { log("db: Could not cache rec for collection \"%s\".",coll); return false; } return addRecord (collnum, cacheKey, NULL, 0, rec, recSize, timestamp); } bool RdbCache::addRecord ( collnum_t collnum , //key_t cacheKey , char *cacheKey , char *rec1 , int32_t recSize1 , char *rec2 , int32_t recSize2 , int32_t timestamp , char **retRecPtr ) { // bail if cache empty. maybe m_maxMem is 0. if ( m_totalBufSize <= 0 ) return true; //int64_t startTime = gettimeofdayInMillisecondsLocal(); if ( collnum < (collnum_t)0) {char *xx=NULL;*xx=0; } if ( collnum >= m_maxColls ) {char *xx=NULL;*xx=0; } // full key not allowed because we use that in markDeletedRecord() if ( KEYCMP(cacheKey,KEYMAX(),m_cks) == 0 ) { char *xx=NULL;*xx=0; } // debug msg int64_t t = 0LL ; if ( g_conf.m_logTimingDb ) t = gettimeofdayInMillisecondsLocal(); // need space for record data int32_t need = recSize1 + recSize2; // are we bad? if (m_fixedDataSize>=0 && ! m_supportLists && need != m_fixedDataSize){ char *xx=NULL;*xx=0; return log(LOG_LOGIC,"db: cache: addRecord: %"INT32" != %"INT32".", need,m_fixedDataSize); } // don't allow 0 timestamps, those are special indicators if ( timestamp == 0 ) timestamp = getTimeLocal(); //if ( timestamp == 0 && cacheKey.n0 == 0LL && cacheKey.n1 == 0 ) if ( timestamp == 0 && KEYCMP(cacheKey,KEYMIN(),m_cks)==0 ) return log(LOG_LOGIC,"db: cache: addRecord: Bad " "key/timestamp."); // bail if no writing ops allowed now if ( ! g_cacheWritesEnabled ) return false; if ( m_isSaving ) return false; // collnum_t and cache key //need += sizeof(collnum_t) + sizeof(key_t); need += sizeof(collnum_t) + m_cks; // timestamp need += 4; // . trailing 0 collnum_t, key and trailing time stamp // . this DELIMETER tells us to go to the next buf //need += sizeof(collnum_t) + sizeof(key_t) + 4 ; // timestamp need += sizeof(collnum_t) + m_cks + 4 ; // and size, if not fixed or we support lists if ( m_fixedDataSize == -1 || m_supportLists ) need += 4; // watch out if ( need >= m_totalBufSize ) return log(LOG_INFO, "db: Could not fit record of %"INT32" bytes into %s " "cache. Max size is %"INT32".",need,m_dbname, m_totalBufSize); if ( need >= BUFSIZE ) return log(LOG_INFO, "db: Could not fit record of %"INT32" bytes into %s " "cache. Max size is %i.",need,m_dbname,BUFSIZE); // if too many slots in hash table used free one up while ( m_numPtrsUsed >= m_threshold ) { if ( ! deleteRec() ) { return false; } } // . do NOT split across buffers, align on a boundary if we need to // . "i1" is where we PLAN to store the record int32_t i1 = m_offset; int32_t bufNum1 = i1 / BUFSIZE; // what buffer does the byte AFTER our last byte fall into? int32_t i2 = m_offset + need; int32_t bufNum2 = i2 / BUFSIZE; // BUT if bufNum1 is the last buffer, it will most likely be SMALLER // than "BUFSIZE" byts, so do a special check to see if "i2" falls // outside of it! if ( i2 >= m_totalBufSize ) bufNum2 = bufNum1 + 1; // . "i1b" is offset of where we REALLY store the record // . "i2b" is the offset of the byte after the last byte that we store int32_t i1b = i1; int32_t i2b = i2; if ( bufNum1 != bufNum2 ) { // advance to first byte of the next buffer if not enough room // in bufNum1 to FULLY contain the record i1b = bufNum2 * BUFSIZE; i2b = i1b + need; } // . no, "i1c" is where we "really really" store it // . and "i2c" is the offset of the byte after the last we store int32_t i1c = i1b; int32_t i2c = i2b; if ( i2b >= m_totalBufSize ) { // reset back to the very beginning... i1c = 0; i2c = i1c + need; } // save for debug //int32_t saved = m_tail; // . increase m_tail so it is NOT in the range: [i1,i2b) // . NEVER do this if we are the first rec added though, because // m_tail will equal i1 at that point... while ( m_numPtrsUsed!=0 && m_tail>=i1 && m_tail<=i2 ) deleteRec(); while ( m_numPtrsUsed!=0 && m_tail>=i1b && m_tail<=i2b ) deleteRec(); while ( m_numPtrsUsed!=0 && m_tail>=i1c && m_tail<=i2c ) deleteRec(); // store rec at "start" int32_t bufNumStart = i1c / BUFSIZE; char *start = m_bufs[bufNumStart] + i1c % BUFSIZE; // point to storage area char *p = start; // before we start writing over possible record data, // if we are promoting a rec, "rec" may actually point // somewhere into here, so be careful! //if ( rec2 <= start && rec2+recSize2 > start ) { char*xx=NULL;*xx=0;} //if ( start <= rec2 && start+32>= rec2 ) { char*xx=NULL;*xx=0;} //if ( this == &g_robotdb.m_rdbCache ) // if ( this == &g_spiderLoop.m_winnerListCache ) // logf(LOG_DEBUG, "db: cachebug: adding rec k.n0=0x%"XINT64" " // "rs=%"INT32" " // "off=%"INT32" bufNum=%"INT32" ptr=0x%"PTRFMT" " // "oldtail=%"INT32" " // "newtail=%"INT32" " // "numPtrs=%"INT32"", // ((key_t *)cacheKey)->n0,recSize1+recSize2, // i1c,bufNumStart,(PTRTYPE)p,saved,m_tail,m_numPtrsUsed); // if we wiped out all recs then reset tail to m_offset if ( m_numPtrsUsed == 0 ) { //if ( this == &g_robotdb.m_rdbCache ) // if ( this == &g_spiderLoop.m_winnerListCache ) // logf(LOG_DEBUG,"db: cachebug: full tail reset. " // "tail=0"); m_tail = 0; } // store collnum *(collnum_t *)p = collnum; p += sizeof(collnum_t); // store key //*(key_t *)p = cacheKey; p += sizeof(key_t); KEYSET(p,cacheKey,m_cks); p += m_cks; // store timestamp *(int32_t *)p = timestamp; p += 4; // then dataSize if we need to if ( m_fixedDataSize == -1 || m_supportLists ) { *(int32_t *)p = recSize1+recSize2; p +=4; } //datasize // sanity : check if the recSizes add up right else if ( m_fixedDataSize != recSize1 + recSize2 ){ char *xx = NULL; *xx = 0; } // save for returning if ( retRecPtr ) *retRecPtr = p; // sanity check //if ( rec1 < p && rec1 + recSize1 > p ) { char*xx=NULL;*xx=0;} //if ( rec2 < p && rec2 + recSize2 > p ) { char*xx=NULL;*xx=0;} //if ( rec1 >= p && rec1 < p + need ) { char*xx=NULL;*xx=0;} //if ( rec2 >= p && rec2 < p + need ) { // log("cache: poop");}//char*xx=NULL;*xx=0;} // then data gbmemcpy ( p , rec1 , recSize1 ); p += recSize1; gbmemcpy ( p , rec2 , recSize2 ); p += recSize2; // . store 0 collnum, key AND timestamp at end of record --> delimeter // . CAUTION: if doing a "promote" we can end up deleting the rec // we are pointing to, then clobbering it with this memset! //memset ( p , 0 , sizeof(collnum_t) + 16 /*key+timestamp*/); memset ( p , 0 , sizeof(collnum_t) + m_cks + 4 /*key+timestamp*/); // count the occupied memory, excluding the terminating 0 key m_memOccupied += ( p - start ); // debug msg (MDW) // if ( this == &g_spiderLoop.m_winnerListCache ) { // log("cache: adding rec @ %"UINT32" size=%i tail=%"INT32"", // i1c,(int)(p-start),m_tail); // log("cache: stored k.n1=%"UINT32" k.n0=%"UINT64" %"INT32" bytes @ %"UINT32" tail=%"UINT32"", // ((key_t *)cacheKey)->n1, // ((key_t *)cacheKey)->n0,(int)(p-start),i1c,m_tail); // } //if ( m_cks == 4 ) // log("stored k=%"XINT32" %"INT32" bytes @ %"UINT32"", // *(int32_t *)cacheKey,p-start,i);//(uint32_t)start); // update offset, excluding the terminating 0 key m_offset = i1c + ( p - start ); // . debug testing -- remove later // . get the crc of the whole thing //char *s = start; //char *send = p - 3; //int32_t crc = 0; //while ( s < send ) { crc += *(int32_t *)s; s += 4; } // . add to hash table // . if we are already in there, preserve the addKey ( collnum , cacheKey , start ); // , crc ); // debug test // debug msg time log(LOG_TIMING,"db: cache: %s addRecord %"INT32" bytes took %"INT64" " "ms this=0x%"PTRFMT" key.n1=%"UINT32" n0=%"UINT64"", m_dbname, (int32_t)(p - start) , gettimeofdayInMillisecondsLocal()-t, (PTRTYPE)this, ((key_t *)(&cacheKey))->n1 , ((key_t *)(&cacheKey))->n0 ); //log("%s addRecord %"INT32" bytes @ offset=%"INT32" k.n1=%"UINT32" n0=%"UINT64" " // "TOOK %"INT64" ms" , // m_dbname , need , i , // cacheKey.n1 , cacheKey.n0 , m_adds++; //int64_t now = gettimeofdayInMillisecondsLocal(); //int64_t took = now - startTime; //if(took > 10) // log(LOG_INFO, "admin: adding to RdbCache %s of %"INT32" bytes " // "took %"INT64" ms.",m_dbname,recSize1+recSize2,took); m_needsSave = true; return true; } // delete the rec at m_tail from the hashtable bool RdbCache::deleteRec ( ) { // sanity. if ( m_tail < 0 || m_tail >= m_totalBufSize ) { char *xx = NULL; *xx = 0;} // don't do anything if we're empty // ...fix...we ned to make sure the head doesn't eat the tail, so // don't ever skip this stuff //if ( m_numPtrsUsed <= 0 ) return; //if ( b == 36887 ) // log("hey"); // delete all recs in [a,b] //if (b > m_totalBufSize) b = m_totalBufSize; //while ( m_tail < b ) { // get ptr from offset int32_t bufNum = m_tail / BUFSIZE; char *p = m_bufs[bufNum] + m_tail % BUFSIZE; top: // get ptr to where tail is currently char *start = p; // get collnum collnum_t collnum = *(collnum_t *)p; p += sizeof(collnum_t); // NSD: trying to find the error where removeKey() doesn't // find the key even after going through all the records // I think that the data here is corrupted or not pointed right // . collnum can be 0 in case we have to go to next buffer // . allow -1 collnum to exist, seems to happen in robots.txt cache // sometimes, maybe for delete collnum... not sure, but the timestamp // seems to be legit if ( collnum >= m_maxColls || collnum < -1 // we now call ::reset(oldcollnum) // when resetting a collection in // Collectiondb::resetColl() which calls // SpiderColl::clear() which calls // lastDownloadTime.reset(oldcollnum) // and then we nuke the collrec so it was // triggering this. so check m_ptrs[i]==-1 //|| !g_collectiondb.m_recs[collnum] ) { log (LOG_WARN,"db: cache: deleteRec: possible " "corruption, start=%"PTRFMT" collNum=%"INT32" " "maxCollNum=%"INT32" dbname=%s", (PTRTYPE)start, (int32_t)collnum, g_collectiondb.m_numRecsUsed, m_dbname); char *xx=NULL;*xx=0; // exception for gourav's bug (dbname=Users) // i am tired of it craping out every 2-3 wks //if ( m_dbname[0]=='U' ) return true; // some records might have been deleted m_needsSave = true; // but its corrupt so don't save to disk m_corruptionDetected = true; //char *xx=NULL;*xx=0; return false; } // get key //key_t k = *(key_t *)p ; p += sizeof(key_t); char *k = p ; p += m_cks; // get time stamp int32_t timestamp = *(int32_t *)p ; p += 4; // a timestamp of 0 and 0 key, means go to next buffer //if ( timestamp == 0 && k.n0 == 0LL && k.n1 == 0 ) { if ( timestamp == 0 && KEYCMP(k,KEYMIN(),m_cks)==0 ) { // if we wrap around back to first buffer then // change the "wrapped" state to false. that means // we are no longer directly in front of the write // head, but behind him again. if ( ++bufNum >= m_numBufs ) { bufNum = 0; //m_tail = 0; //m_wrapped = false; //return true; //continue; } // otherwise, point to the start of the next buffer p = m_bufs[bufNum]; m_tail = bufNum * BUFSIZE; // sanity //if ( m_tail < 0 || m_tail > m_totalBufSize ) { // char *xx = NULL; *xx = 0;} // if ( this == &g_spiderLoop.m_winnerListCache ) // logf(LOG_DEBUG, "db: cachebug: wrapping tail to 0"); //return true; // continue; goto top; } // get data size int32_t dataSize; // get dataSize and data if ( m_fixedDataSize == -1 || m_supportLists ) { dataSize = *(int32_t *)p; p += 4; } else dataSize = m_fixedDataSize; // sanity if ( dataSize < 0 || dataSize > m_totalBufSize ){ char *xx = NULL; *xx = 0; } //int32_t saved = m_tail; // debug msg (MDW) // if ( this == &g_spiderLoop.m_winnerListCache ) { // log("cache: deleting rec @ %"INT32" size=%"INT32"",m_tail, // dataSize+2+12+4+4); // } // skip over rest of rec p += dataSize; // remove this rec from the count, (4 bytes for ptr) //m_memOccupied -= (p - start) + 4; // otherwise, it's a simple advance m_tail += (p - start); // sanity. this must be failing due to a corrupt dataSize... if ( m_tail < 0 || m_tail +(int32_t)sizeof(collnum_t)+m_cks+4>m_totalBufSize){ char *xx = NULL; *xx = 0;} // if ( this == &g_spiderLoop.m_winnerListCache ) // log("spider: rdbcache: removing tail rec collnum=%i", // (int)collnum); // delete key from hash table, iff is for THIS record // but if it has not already been voided. // we set key to KEYMAX() in markDeletedRecord() if ( KEYCMP(k,KEYMAX(),m_cks) != 0 ){ removeKey ( collnum , k , start ); markDeletedRecord(start); } //if ( this == &g_robotdb.m_rdbCache ) // if ( this == &g_spiderLoop.m_winnerListCache ) // logf(LOG_DEBUG, "db: cachebug: removing k.n0=0x%"XINT64" " // "oldtail=%"INT32" newtail=%"INT32" ds=%"INT32"", // ((key_t *)k)->n0,saved,m_tail,dataSize); //else // logf(LOG_DEBUG,"test: oops"); // count as a delete m_deletes++; // void this key in the buffer, // so it doesn't try to delete it later from // the hash table // memset(start+sizeof(collnum_t), 0xff, m_cks); // debug msg //log("%s m_tail = %"INT32", #ptrs=%"INT32"", // m_dbname,m_tail,m_numPtrsUsed); //} // debug msg //log("%s m_tail = %"INT32", #ptrs=%"INT32"",m_dbname,m_tail,m_numPtrsUsed); m_needsSave = true; return true; } // mark a record in the buffer deleted to ensure that we reclaim the memory // and attempt to delete the key only once. void RdbCache::markDeletedRecord(char *ptr){ int32_t dataSize = sizeof(collnum_t)+m_cks+sizeof(int32_t); // debug it // if ( this == &g_spiderLoop.m_winnerListCache ) { //logf(LOG_DEBUG,"cache: makeDeleteRec ptr=0x%"PTRFMT" off=%"INT32"", // (PTRTYPE)ptr,(int32_t)(ptr-m_bufs[0])); // } // get dataSize and data if ( m_fixedDataSize == -1 || m_supportLists ) { dataSize += 4 + // size *(int32_t*)(ptr+ sizeof(collnum_t)+ // collnum m_cks+ // key sizeof(int32_t)); // timestamp } else dataSize += m_fixedDataSize; // mark newly freed mem m_memOccupied -= dataSize; // relabel old record key as 0xffffffff... so key is not removed // more than once. memset(ptr+sizeof(collnum_t), 0xff, m_cks); } // patch the hole so chaining still works //void RdbCache::removeKey ( collnum_t collnum , key_t key , char *rec ) { void RdbCache::removeKey ( collnum_t collnum , char *key , char *rec ) { //int32_t n = (key.n0 + (uint64_t)key.n1)% m_numPtrsMax; int32_t n = hash32 ( key , m_cks ) % m_numPtrsMax; // debug msg //if ( m_cks == 4) // log("remove first try = slot #%"INT32" (%"INT32")",n,m_numPtrsMax); // debug msg //log("%s removing key.n1=%"UINT32" key.n0=%"UINT64"",m_dbname,key.n1,key.n0); //if ( m_cks == 4 ) // log("removing k=%"XINT32"",*(int32_t *)key); // chain while ( m_ptrs[n] && ( *(collnum_t *)(m_ptrs[n]+0 ) != collnum || //*(key_t *)(m_ptrs[n]+sizeof(collnum_t)) != key ) ) KEYCMP(m_ptrs[n]+sizeof(collnum_t),key,m_cks) != 0 ) ) if ( ++n >= m_numPtrsMax ) n = 0; //while ( m_ptrs[n] && *(key_t *)m_ptrs[n] != key ) // if ( ++n >= m_numPtrsMax ) n = 0; // . return false if key not found // . this happens sometimes, if m_tail wraps to 0 and the new rec // the gets placed at the end of the last m_buf, changing m_bufEnds // then m_tail may revisit many recs it already removed from hashtbl if ( ! m_ptrs[n] ) { log(LOG_LOGIC,"db: cache: removeKey: Could not find key. " "Trying to scan whole table."); // try scanning whole table int32_t i; for ( i = 0 ; i < m_numPtrsMax ; i++ ) { // skip if empty if ( ! m_ptrs[i] ) continue; // skip if no match if (KEYCMP(m_ptrs[i]+sizeof(collnum_t),key,m_cks) != 0) continue; // got a match log(LOG_LOGIC,"db: cache: removeKey. Found key in " "linear scan. Wierd."); n = i; break; } if ( i >= m_numPtrsMax ) { log(LOG_LOGIC,"db: cache: removeKey: BAD ENGINEER. " "dbname=%s",m_dbname ); char *xx = NULL; *xx = 1; return; } } // if does not point to this rec , it is now pointing to the latest, // promoted copy of the rec, so do not delete if ( m_ptrs[n] != rec ) { // debug msg // This shouldn't happen anymore -partap char *xx = NULL; xx = 0; return; } // debug msg //key_t *k = (key_t *)(m_ptrs[n]+2); //log("cache: %s removing key.n1=%"UINT32" key.n0=%"UINT64" from slot #%"INT32"", // m_dbname,k->n1,k->n0,n); // all done if already cleared if ( ! m_ptrs[n] ) return; // clear it m_ptrs[n] = NULL; m_numPtrsUsed--; m_memOccupied -= sizeof(char *);//4; // advance through list after us now if ( ++n >= m_numPtrsMax ) n = 0; // keep looping until we hit an empty slot while ( m_ptrs[n] ) { char *ptr = m_ptrs[n]; // point to the key char *kptr = ptr + sizeof(collnum_t); // clear it m_ptrs[n] = NULL; // undo stats m_numPtrsUsed--; m_memOccupied -= sizeof(char *);//4; // re-hash it back to possibly fill the "gap" addKey ( *(collnum_t *)ptr , kptr , ptr ); if ( ++n >= m_numPtrsMax ) n = 0; } } //void RdbCache::addKey ( collnum_t collnum , key_t key , char *ptr ) { void RdbCache::addKey ( collnum_t collnum , char *key , char *ptr ) { // look up in hash table //int32_t n = (key.n0 + (uint64_t)key.n1)% m_numPtrsMax; int32_t n = hash32 ( key , m_cks ) % m_numPtrsMax; // save orig for debugging //int32_t n2 = n; // debug msg //log("add first try = slot #%"INT32" (%"INT32")",n,m_numPtrsMax); //int32_t n = key.n0 % m_numPtrsMax; // chain while ( m_ptrs[n] && ( *(collnum_t *)(m_ptrs[n]+0 ) != collnum || //*(key_t *)(m_ptrs[n]+sizeof(collnum_t)) != key ) ) KEYCMP(m_ptrs[n]+sizeof(collnum_t),key,m_cks) != 0 ) ) if ( ++n >= m_numPtrsMax ) n = 0; //while ( m_ptrs[n] && *(key_t *)m_ptrs[n] != key ) // if ( ++n >= m_numPtrsMax ) n = 0; // if already there don't inc the count if ( ! m_ptrs[n] ) { m_numPtrsUsed++; m_memOccupied += sizeof(char *); // debug msg //key_t *k = (key_t *)key; //log("cache: %s added key.n1=%"UINT32" key.n0=%"UINT64" to slot #%"INT32" " // "ptr=0x%"XINT32" off=%"INT32" size=%"INT32"", // m_dbname,k->n1,k->n0,n,ptr,ptr-m_bufs[0], // *(int32_t *)(ptr+2+12+4)); } // debug msg //else // log("%s update key.n1=%"UINT32" key.n0=%"UINT64" in slot #%"INT32"", // m_dbname,key.n1,key.n0,n); // If this pointer is already set, we may be replacing it from // Msg5::needRecall. We need to mark the old record as deleted if (m_ptrs[n]){ //char *xx = NULL; *xx = 0; markDeletedRecord(m_ptrs[n]); } // store the ptr m_ptrs[n] = ptr; // debug testing //m_crcs[n] = crc; //if ( this == &g_robotdb.m_rdbCache ) // if ( this == &g_spiderLoop.m_winnerListCache ) // logf(LOG_DEBUG,"db: cachebug: addkey slot #%"INT32" has " // "ptr=0x%"PTRFMT"",n,(PTRTYPE)ptr); } /* void RdbCache::clearAll ( ) { //if ( m_numBufs > 0 ) // log("db: resetting record cache"); m_offset = 0; m_tail = 0; //for ( int32_t i = 0 ; i < m_numBufs ; i++ ) // // all bufs, but not necessarily last, are BUFSIZE bytes big // mfree ( m_bufs[i] , m_bufSizes[i] , "RdbCache" ); //m_numBufs = 0; //m_totalBufSize= 0; //if(m_ptrs ) mfree ( m_ptrs , m_numPtrsMax*sizeof(char *),"RdbCache"); //m_ptrs = NULL; m_numPtrsUsed = 0; // can't reset this, breaks the load! //m_numPtrsMax = 0; m_memOccupied = 0; //m_memAlloced = 0; m_numHits = 0; m_numMisses = 0; //m_wrapped = false; m_adds = 0; m_deletes = 0; // assume no need to call convertCache() m_convert = false; m_isSaving = false; } */ // // . MDW: took out clear() for corruption suspicision... i think ninad's // corruption detection would panic on collnum_t's of -1 anyway... // // . this just clears the contents of the cache // . used when deleting a collection in Rdb::delColl() and used in // Rdb::updateToRebuild() when updating/setting the rdb to a rebuilt rdb // . try it again now with new 64-bit logic updates (MDW 2/10/2015) void RdbCache::clear ( collnum_t collnum ) { // bail if no writing ops allowed now if ( ! g_cacheWritesEnabled ) { char *xx=NULL;*xx=0; } if ( m_isSaving ) { char *xx=NULL;*xx=0; } for ( int32_t i = 0 ; i < m_numPtrsMax ; i++ ) { // skip if empty bucket if ( ! m_ptrs[i] ) continue; // skip if wrong collection if ( *(collnum_t *)m_ptrs[i] != collnum ) continue; // change to the -1 collection, nobody should use that and // it should get kicked out over time //*(collnum_t *)m_ptrs[i] = -1; // just change the collnum to something impossible // this is kinda hacky but hopefully will not cause corruption *(collnum_t *)m_ptrs[i] = (collnum_t)-1; } } bool RdbCache::load ( ) { return load ( m_dbname ); } static void *saveWrapper ( void *state , ThreadEntry *te ) ; static void threadDoneWrapper ( void *state , ThreadEntry *te ) ; // . just like RdbTree::fastSave() // . returns false if blocked and is saving bool RdbCache::save ( bool useThreads ) { if ( g_conf.m_readOnlyMode ) return true; // if we do not need it, don't bother if ( ! m_needsSave ) return true; // if corruption was detected, don't bother if ( m_corruptionDetected ) return true; // return true if already in the middle of saving if ( m_isSaving ) return false; // log log(LOG_INIT,"db: Saving %"INT32" bytes of cache to %s/%s.cache", m_memAlloced,g_hostdb.m_dir,m_dbname); // spawn the thread if ( useThreads ) { // lock cache while saving m_isSaving = true; // make a thread. returns true on success, in which case // we return false to indicate we blocked. if ( g_threads.call ( SAVETREE_THREAD , 1 , // niceness this , // state threadDoneWrapper , // callback saveWrapper ) ) return false; // crap had an error spawning thread if ( ! g_threads.m_disabled ) log("db: Error spawning cache write thread. " "Not using threads."); } // do it directly with no thread save_r(); // wrap it up threadDone (); return true; } void threadDoneWrapper ( void *state , ThreadEntry *te ) { RdbCache *THIS = (RdbCache *)state; THIS->threadDone ( ); } void RdbCache::threadDone ( ) { // allow cache to change now m_isSaving = false; // and we are in sync with that data saved on disk m_needsSave = false; // report if ( m_saveError ) log("db: Had error saving cache to disk for %s: %s.", m_dbname,mstrerror(m_saveError)); } void *saveWrapper ( void *state , ThreadEntry *te ) { RdbCache *THIS = (RdbCache *)state; // assume no error THIS->m_saveError = 0; // do it if ( THIS->save_r () ) return NULL; // we got an error, save it THIS->m_saveError = errno; return NULL; } // returns false withe rrno set on error bool RdbCache::save_r ( ) { // append .cache to "dbname" to get cache filename char filename [ 64 ]; if ( gbstrlen(m_dbname) > 50 ) return log("db: Dbname too long. Could not save cache."); sprintf ( filename , "%s%s.cache" , g_hostdb.m_dir , m_dbname ); //File f; //f.set ( g_hostdb.m_dir , filename ); // open the file //if ( ! f.open ( O_RDWR | O_CREAT ) ) int fd = open ( filename , O_RDWR | O_CREAT , getFileCreationFlags() ); if ( fd < 0 ) return log("db: Had opening file to save cache to: %s.", mstrerror(errno)); bool status = save2_r ( fd ); close ( fd ); return status; } bool RdbCache::save2_r ( int fd ) { int n; int32_t off = 0; // general info n = gbpwrite ( fd , &m_numPtrsMax , 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_maxMem , 4 , off ); off += 4; if ( n != 4 ) return false; // mem stuff n = gbpwrite ( fd , &m_memAlloced , 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_memOccupied , 4 , off ); off += 4; if ( n != 4 ) return false; // save the buffer stuff n = gbpwrite ( fd , &m_numBufs , 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_totalBufSize, 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_offset , 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_tail , 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_wrapped , 1 , off ); off += 1; if ( n!= 1 ) return false; // write each buf for ( int32_t i = 0 ; i < m_numBufs ; i++ ) { // write end relative //int32_t end = (m_bufEnds[i] - m_bufs[i]); //if ( end < 0 ) end = -1; //n = pwrite ( fd ,&end , 4 , off ); off += 4; //if ( n != 4 ) return false; // and buf size int32_t bufSize = m_bufSizes[i]; n = gbpwrite ( fd , &bufSize , 4 , off ); off += 4; if ( n != 4 ) return false; // then write contents of buffer #i n = gbpwrite ( fd, m_bufs[i] , bufSize , off ); off += bufSize; if ( n != bufSize ) return false; } // save the hash table stuff n = gbpwrite ( fd , &m_numPtrsUsed , 4 , off ); off += 4; if ( n != 4 ) return false; n = gbpwrite ( fd , &m_threshold , 4 , off ); off += 4; if ( n != 4 ) return false; // save 100k at a time int32_t i = 0; while ( i < m_numPtrsMax ) if ( ! saveSome_r ( fd, &i , &off) ) return false; //close ( fd ) ; return true; } #define SAVEBUFSIZE (256*1024) bool RdbCache::saveSome_r ( int fd , int32_t *iptr , int32_t *off ) { char buf[SAVEBUFSIZE]; char *bufEnd = buf + SAVEBUFSIZE; // point to buf char *bp = buf; int32_t used = 0; // make hash table ptrs relative to offset for ( ; *iptr < m_numPtrsMax && bp + 4 < bufEnd ; *iptr = *iptr + 1 ) { // resume at i char *p = m_ptrs[*iptr]; // if empty, write a -1 offset if ( ! p ) { //int32_t tt = -1; // store that as it is *(int32_t *)bp = -1; bp += 4; //n = pwrite ( fd ,&tt , 4 , off ) ; off += 4; //if ( n != 4 ) return false; continue; } // otherwise convert ptr to offset... bitch int32_t converted = -1; for ( int32_t j = 0 ; j < m_numBufs ; j++ ) // is p pointing into the jth buffer? if ( p >= m_bufs[j] && p < m_bufs[j] + m_bufSizes[j] ){ // if so, make it relative converted = p - m_bufs[j] + BUFSIZE*j ; break; } // bitch if not found if ( converted == -1 ) return log(LOG_LOGIC,"db: cache: save: Bad " "engineer"); // store that as it is *(int32_t *)bp = converted; bp += 4; used++; //n = pwrite ( fd ,&converted , 4 , off ) ; off += 4; //if ( n != 4 ) return false; } if ( used != m_numPtrsUsed ) { log("cache: error saving cache. %"INT32" != %"INT32"" , used , m_numPtrsUsed ); //char *xx=NULL;*xx=0; } return false; } // now write it all at once int32_t size = bp - buf; int32_t n = gbpwrite ( fd , buf , size , *off ); *off = *off + size; if ( n != size ) return false; return true; } bool RdbCache::load ( char *dbname ) { // append .cache to "dbname" to get cache filename char filename [ 64 ]; if ( gbstrlen(dbname) > 50 ) return log(LOG_LOGIC,"db: cache: load: dbname too long."); sprintf ( filename , "%s.cache" , dbname ); // does the file exist? File f; f.set ( g_hostdb.m_dir , filename ); // having cache file not existing on disk is not so bad, it's a cache if ( ! f.doesExist() ) return false; // return log("db: Could not load cache from %s: does not exist.", // f.getFilename()); // open the file if ( ! f.open ( O_RDWR ) ) return log("db: Could not open cache save file for %s: %s.", dbname,mstrerror(g_errno)); // log log(LOG_INIT,"db: Loading cache from %s/%s.cache", g_hostdb.m_dir,dbname); // clear everything reset(); int n; int32_t off = 0; // general info int32_t numPtrsMax ; int32_t maxMem ; n = f.read ( &numPtrsMax , 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &maxMem , 4 , off ); off += 4; if ( n != 4 ) return false; // . they need to match our current config to continue loading // . attempt to convert if not, because it is painful to rebuild // the site quality cache if ( numPtrsMax != m_numPtrsMax || maxMem != m_maxMem ) { log("db: Error while loading cache file %s. Does not match " "current cache config. " "current numPtrsMax=%"INT32" maxMem=%"INT32", " "ondisk numPtrsMax=%"INT32" maxMem=%"INT32". " "Attempting to convert.", //"Ignoring file.", f.getFilename() , m_numPtrsMax , m_maxMem , numPtrsMax , maxMem ); //log("RdbCache::load: not loading"); m_convert = true; m_convertNumPtrsMax = numPtrsMax ; m_convertMaxMem = maxMem ; return false; } // mem stuff n = f.read ( &m_memAlloced , 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &m_memOccupied , 4 , off ); off += 4; if ( n != 4 ) return false; // load the buffer stuff n = f.read ( &m_numBufs , 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &m_totalBufSize, 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &m_offset , 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &m_tail , 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &m_wrapped , 1 , off ); off += 1; if ( n != 1 ) return false; // load each buf for ( int32_t i = 0 ; i < m_numBufs ; i++ ) { // load end relative //int32_t end ; //= (m_bufEnds[i] - m_bufs[i]); //n = f.read ( &end , 4 , off ); off += 4; //if ( n != 4 ) return false; // and buf size int32_t bufSize ; //= m_bufSizes[i]; n = f.read ( &bufSize , 4 , off ); off += 4; if ( n != 4 ) return false; // alloc the buf char ttt[64]; sprintf(ttt,"clb-%s",m_dbname); m_bufs[i] = (char *) mcalloc ( bufSize , ttt ); if ( ! m_bufs[i] ) return false; m_bufSizes[i] = bufSize; //m_bufEnds [i] = m_bufs[i] + end; //if ( end < 0 ) m_bufEnds[i] = NULL; // then read contents of buffer #i n = f.read ( m_bufs[i] , bufSize , off ); off += bufSize; if ( n != bufSize ) return false; } // load the hash table stuff n = f.read ( &m_numPtrsUsed , 4 , off ); off += 4; if ( n != 4 ) return false; n = f.read ( &m_threshold , 4 , off ); off += 4; if ( n != 4 ) return false; // load the OFFSETS into "fix" int32_t total = sizeof(int32_t) * m_numPtrsMax ; SafeBuf fix; fix.reserve ( total ); //n = f.read ( m_ptrs , total , off ); off += total; n = f.read ( fix.getBufStart() , total , off ); off += total; if ( n != total ) return false; fix.setLength ( total ); int32_t *poff = (int32_t *)fix.getBufStart(); // ptrs can be 8 bytes each, if we are 64-bit m_ptrs = (char **) mcalloc (m_numPtrsMax * sizeof(char *),m_dbname); if ( ! m_ptrs ) return false; int32_t used = 0; // convert offsets into pointers for ( int32_t i = 0 ; i < m_numPtrsMax ; i++ , poff++ ) { //uint32_t j = (SPTRTYPE) m_ptrs[i]; // is it a NULL? //if ( j == -1 ) { m_ptrs[i] = NULL; continue; } if ( *poff == -1 ) { m_ptrs[i] = NULL; continue; } // sanity if ( *poff >= m_numBufs * BUFSIZE ) { char *xx=NULL;*xx=0;} // get buffer int32_t bufNum = (*poff) / BUFSIZE; char *p = m_bufs[bufNum] + (*poff) % BUFSIZE ; // re-assign m_ptrs[i] = p; // count it used++; // see what is there // debug msg //key_t kk = *(key_t *)p; //log("loaded k.n1=%"UINT32" k.n0=%"UINT64"",kk.n1,kk.n0); //if ( m_fixedDataSize || m_supportLists ) // log("loaded k.n1=%"UINT32" k.n0=%"UINT64" size=%"INT32"", // kk.n1,kk.n0, 20+*(int32_t *)(p+sizeof(key_t)+4)); //else // log("loaded k.n1=%"UINT32" k.n0=%"UINT64"", kk.n1,kk.n0); } if ( used != m_numPtrsUsed ) { log("cache: error loading cache. %"INT32" != %"INT32"" , used , m_numPtrsUsed ); return false; } m_needsSave = false; return true; } // remove a key range from the cache void RdbCache::removeKeyRange ( collnum_t collnum , char *startKey , char *endKey ) { //int32_t n = (key.n0 + (uint64_t)key.n1)% m_numPtrsMax; // unused now!! int32_t n = hash32 ( startKey , m_cks ) % m_numPtrsMax; int32_t startn = n; // chain for ( ; n+1 != startn; n++ ) { // check for wrap if ( n >= m_numPtrsMax ) n = 0; // make sure it's not null if ( !m_ptrs[n] ) return; // check collection number if ( *(collnum_t *)(m_ptrs[n]) != collnum ) continue; // check the range if ( KEYCMP ( m_ptrs[n]+sizeof(collnum_t), startKey, m_cks ) >= 0 && KEYCMP ( m_ptrs[n]+sizeof(collnum_t), endKey, m_cks ) <= 0 ) { // remove the key int32_t rem = n; m_ptrs[rem] = NULL; m_numPtrsUsed--; m_memOccupied -= sizeof(char *); if ( ++rem >= m_numPtrsMax ) rem = 0; // keep looping until we hit an empty slot while ( m_ptrs[rem] ) { char *ptr = m_ptrs[rem]; m_ptrs[rem] = NULL; m_numPtrsUsed--; m_memOccupied -= sizeof(char *); char k[MAX_KEY_BYTES]; KEYSET(k,ptr+sizeof(collnum_t),m_cks); addKey ( *(collnum_t *)ptr , k , ptr ); if ( ++rem >= m_numPtrsMax ) rem = 0; } } } m_needsSave = true; } bool RdbCache::convertCache ( int32_t numPtrsMax , int32_t maxMem ) { // divide numPtrsMax by 2 to get maxRecs (see above) int32_t maxRecs = numPtrsMax / 2; // load the cache stored on disk into the "tmp" cache RdbCache tmp; if ( ! tmp.init ( maxMem , m_fixedDataSize , m_supportLists , maxRecs , m_useHalfKeys , m_dbname , true , // loadFromDisk m_cks , // cacheKeySize m_dks , // dataKeySize numPtrsMax )) return false; // load it from disk //if ( ! tmp.load() ) return false; // copy its recs into our space int32_t failed = 0; int32_t success = 0; char key[16]; for ( int32_t i = 0 ; i < tmp.m_numPtrsMax ; i++ ) { // get ptr to slot in hash table char *p = tmp.m_ptrs[i]; // skip if empty bucket if ( ! p ) continue; // otherwise, get collnum collnum_t collnum = *(collnum_t *)p; // get key gbmemcpy ( key , p + sizeof(collnum_t), m_cks ); // now get the record proper bool found; char *rec; int32_t recSize; int32_t timestamp; found = tmp.getRecord ( collnum , key , &rec , &recSize , false , // do copy? -1 , // maxAge false , // inc counts? // when it was cached (time_t *)×tamp , false );// promote rec? // sanity check if ( ! found ) { log("db: key is in hash table but no rec."); continue; } if ( ! timestamp ) { log("db: has a timestamp of 0"); } // now add it to our table bool status; status = addRecord ( collnum , key , rec , recSize , timestamp ); if ( ! status ) failed++; else success++; } // log it logf(LOG_INFO,"db: Successfully converted %"INT32" recs from cache on disk " "for %s.", success,m_dbname); if ( failed > 0 ) logf(LOG_INFO,"db: Failed to convert %"INT32" recs from cache on " "disk for %s.", failed,m_dbname); return true; } // goes through all the pointers and checks the integrity of the data they // point to. Also checks if m_tail is pointing right or not void RdbCache::verify(){ bool foundTail = false; int32_t count = 0; logf(LOG_DEBUG,"db: cachebug: verifying"); for ( int32_t i = 0; i < m_numPtrsMax; i++ ){ char *start = m_ptrs[i]; if ( !start ) continue; if ( start == m_bufs[0] + m_tail ) foundTail = true; char *p = start; // get collnum collnum_t collnum = *(collnum_t *)p; p += sizeof(collnum_t); // -1 this means cleared! set in RdbCache::clear(collnum_t) // collnum can be 0 in case we have to go to next buffer if ( collnum != 0 && ( collnum >= m_maxColls || collnum <-1)){ // !g_collectiondb.m_recs[collnum] ) ) { char *xx = NULL; *xx = 0; } // get key //char *k = p ; p += m_cks; // get time stamp //int32_t timestamp = *(int32_t *)p ; p += 4; //logf(LOG_DEBUG, "db: cachebug: removing key. tail=%"INT32" ", // m_tail); // get data size int32_t dataSize; // get dataSize and data if ( m_fixedDataSize == -1 || m_supportLists ) { dataSize = *(int32_t *)p; p += 4; } else dataSize = m_fixedDataSize; // sanity if ( dataSize < 0 || dataSize > m_totalBufSize ){ char *xx = NULL; *xx = 0; } // count it count++; } if ( !foundTail && m_wrapped ){ char *xx = NULL; *xx = 0 ; } if ( count != m_numPtrsUsed ) { char *xx = NULL; *xx = 0 ; } }