mirror of
https://github.com/gigablast/open-source-search-engine.git
synced 2024-10-04 20:27:43 +03:00
a1ed368d82
it's useful to limit per process mem usage to prevent oom killer because we can't save if we get killed. overhaul diskpagecache to just use rdbcache. much simpler and faster, but disabled for now until debugged more. reduce min files to merge for crawlbot collections so they stay more tightly merged to conserve fds and mem. improved logDebugDisk msgs. overhauled File.cpp fd pool. now it is way faster and doesn't use any extra mem. much simpler too. although could be sped up a little by using a linked list, but probably is not significant enough to warrant doing right now. increase mem ptr table from 3M to 8M slots. should really make dynamic though. fix core from null msg20s[0]->m_r. only call attemptMergeAll once every 60 seconds really. do not attempt merge if already merging.
360 lines
12 KiB
C++
360 lines
12 KiB
C++
#include "gb-include.h"
|
|
|
|
#include "RdbScan.h"
|
|
#include "DiskPageCache.h"
|
|
#include "Rdb.h"
|
|
|
|
void gotListWrapper ( void *state ) ;
|
|
|
|
// . readset up for a scan of slots in the RdbScans
|
|
// . returns false if blocked, true otherwise
|
|
// . sets errno on error
|
|
bool RdbScan::setRead ( BigFile *file ,
|
|
int32_t fixedDataSize,
|
|
int64_t offset ,
|
|
int32_t bytesToRead ,
|
|
//key_t startKey ,
|
|
//key_t endKey ,
|
|
char *startKey ,
|
|
char *endKey ,
|
|
char keySize ,
|
|
RdbList *list , // we fill this up
|
|
void *state ,
|
|
void (* callback) ( void *state ) ,
|
|
bool useHalfKeys ,
|
|
char rdbId ,
|
|
int32_t niceness ,
|
|
bool allowPageCache ,
|
|
bool hitDisk ) {
|
|
// remember list
|
|
m_list = list;
|
|
// reset the list
|
|
m_list->reset();
|
|
// save keySize
|
|
m_ks = keySize;
|
|
m_rdbId = rdbId;
|
|
// save allow page cache
|
|
m_allowPageCache = allowPageCache;
|
|
m_hitDisk = hitDisk;
|
|
// ensure startKey last bit clear, endKey last bit set
|
|
//if ( (startKey.n0 & 0x01) == 0x01 )
|
|
// log("RdbScan::setRead: warning startKey lastbit set");
|
|
//if ( (endKey.n0 & 0x01) == 0x00 )
|
|
// log("RdbScan::setRead: warning endKey lastbit clear");
|
|
// set list now
|
|
m_list->set ( NULL ,
|
|
0 ,
|
|
NULL ,
|
|
0 ,
|
|
startKey ,
|
|
endKey ,
|
|
fixedDataSize ,
|
|
true , // ownData?
|
|
useHalfKeys ,
|
|
keySize );
|
|
// . don't do anything if startKey exceeds endKey
|
|
// . often Msg3 will call us with this true because it's page range
|
|
// is empty because the map knows without having to hit disk.
|
|
// therefore, just return silently now.
|
|
// . Msg3 will not merge empty lists so don't worry about setting the
|
|
// lists startKey/endKey
|
|
//if ( startKey > endKey ) return true;
|
|
if ( KEYCMP(startKey,endKey,m_ks)>0 ) return true;
|
|
// log("RdbScan::readList: startKey > endKey warning");
|
|
// return true;
|
|
//}
|
|
// don't bother doing anything if nothing needs to be read
|
|
if ( bytesToRead == 0 ) return true;
|
|
|
|
// . start reading at m_offset in the file
|
|
// . also, remember this offset for finding the offset of the last key
|
|
// to set a tighter m_bufEnd in doneReading() so we don't have to
|
|
// keep checking if the returned record's key falls exactly in
|
|
// [m_startKey,m_endKey]
|
|
// . set m_bufSize to how many bytes we need to read
|
|
// . m_keyMin is the first key we read, may be < startKey
|
|
// . we won't read any keys strictly greater than "m_keyMax"
|
|
// . m_hint is set to the offset of the BIGGEST key found in the map
|
|
// that is still <= endKey
|
|
// . we use m_hint so that RdbList::merge() can find the last key
|
|
// in the startKey/endKey range w/o having to step through
|
|
// all the records in the read
|
|
// . m_hint will limit the stepping to a PAGE_SIZE worth of records
|
|
// . m_hint is an offset, like m_offset
|
|
// . TODO: what if it returns false?
|
|
|
|
// debug msg
|
|
//if ( m_bufSize > 1024 * 1024 * 3 ) {
|
|
// fprintf(stderr,"BIG READ\n");
|
|
// sleep(5);
|
|
//}
|
|
// . alloc some read buffer space, m_buf
|
|
// . add 4 extra in case first key is half key and needs to be full
|
|
int32_t bufSize = bytesToRead ;
|
|
// add 6 more if we use half keys
|
|
if ( useHalfKeys ) m_off = 6;
|
|
else m_off = 0;
|
|
// posdb keys are 18 bytes but can be 12 ot 6 bytes compressed
|
|
if ( m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2 ) m_off = 12;
|
|
// alloc more for expanding the first 6-byte key into 12 bytes,
|
|
// or in the case of posdb, expanding a 6 byte key into 18 bytes
|
|
bufSize += m_off;
|
|
// . and a little extra in case read() reads TOO much
|
|
// . i think a read overflow might be causing a segv in malloc
|
|
// . but try badding under us, maybe read() writes before the buf
|
|
int32_t pad = 16;
|
|
bufSize += pad;
|
|
// get the memory to hold what we read
|
|
//char *buf = (char *) mmalloc ( bufSize , "RdbScan" );
|
|
//if ( ! buf ) {
|
|
// log("disk: Could not allocate %"INT32" bytes for read of %s.",
|
|
// bufSize ,file->getFilename());
|
|
// return true;
|
|
//}
|
|
// note
|
|
//logf(LOG_DEBUG,"db: list %"UINT32" has buf %"UINT32".",(int32_t)m_list,(int32_t)buf);
|
|
// . set up the list
|
|
// . set min/max keys on list if we're done reading
|
|
// . the min/maxKey defines the range of keys we read
|
|
// . m_hint is the offset of the BIGGEST key in the map that is
|
|
// still <= the m_endKey specified in setRead()
|
|
// . it's used to make it easy to find the actual biggest key that is
|
|
// <= m_endKey
|
|
/*
|
|
m_list->set ( buf + pad + m_off ,
|
|
bytesToRead ,
|
|
buf ,
|
|
bufSize ,
|
|
startKey ,
|
|
endKey ,
|
|
fixedDataSize ,
|
|
true ,
|
|
useHalfKeys , // ownData?
|
|
m_ks );
|
|
*/
|
|
// save caller's callback
|
|
m_callback = callback;
|
|
m_state = state;
|
|
// save the first key in the list
|
|
//m_startKey = startKey;
|
|
KEYSET(m_startKey,startKey,m_ks);//m_list->m_ks);
|
|
KEYSET(m_endKey,endKey,m_ks);
|
|
m_fixedDataSize = fixedDataSize;
|
|
m_useHalfKeys = useHalfKeys;
|
|
m_bytesToRead = bytesToRead;
|
|
// save file and offset for sanity check
|
|
m_file = file;
|
|
m_offset = offset;
|
|
// ensure we don't mess around
|
|
m_fstate.m_allocBuf = NULL;
|
|
m_fstate.m_buf = NULL;
|
|
// debug msg
|
|
//log("diskOff=%"INT64" nb=%"INT32"",offset,bytesToRead);
|
|
//if ( offset == 16386 && bytesToRead == 16386 )
|
|
// log("hey");
|
|
// . do a threaded, non-blocking read
|
|
// . we now pass in a NULL buffer so Threads.cpp will do the
|
|
// allocation right before launching the thread so we don't waste
|
|
// memory. i've seen like 19000 unlaunched threads each allocating
|
|
// 32KB for a tfndb read, hogging up all the memory.
|
|
//if ( ! file->read ( buf + pad + m_off ,
|
|
if ( ! file->read ( NULL ,
|
|
bytesToRead ,
|
|
offset ,
|
|
&m_fstate ,
|
|
this ,
|
|
gotListWrapper ,
|
|
niceness ,
|
|
m_allowPageCache ,
|
|
m_hitDisk ,
|
|
pad + m_off )) // allocOff, buf offset to read into
|
|
return false;
|
|
|
|
/*
|
|
// debug point
|
|
log("RDBSCAN: read %"INT32" bytes @ %"INT64"",bytesToRead, offset);
|
|
for ( int32_t i = 0 ; i < bytesToRead ; i++ ) {
|
|
if (((offset+i) % 20) == 0 )
|
|
fprintf(stderr,"\n%"INT64") ",offset+i);
|
|
fprintf(stderr,"%02hhx ",(buf+pad+m_off)[i]);
|
|
}
|
|
fprintf(stderr,"\n");
|
|
|
|
if ( offset == 49181 && bytesToRead == 98299 ) {
|
|
char *xx = NULL ;*xx = 0; }
|
|
*/
|
|
|
|
if ( m_fstate.m_errno && ! g_errno ) { char *xx=NULL;*xx=0; }
|
|
|
|
// fix the list if we need to
|
|
gotList();
|
|
// we did not block
|
|
return true;
|
|
}
|
|
|
|
void gotListWrapper ( void *state ) {
|
|
RdbScan *THIS = (RdbScan *)state;
|
|
THIS->gotList ();
|
|
// let caller know we're done
|
|
THIS->m_callback ( THIS->m_state );
|
|
}
|
|
|
|
#include "Threads.h"
|
|
|
|
void RdbScan::gotList ( ) {
|
|
char *allocBuf = m_fstate.m_allocBuf;
|
|
int32_t allocOff = m_fstate.m_allocOff; //buf=allocBuf+allocOff
|
|
int32_t allocSize = m_fstate.m_allocSize;
|
|
// do not free the allocated buf for when the actual thread
|
|
// does the read and finally completes in this case. we free it
|
|
// in Threads.cpp::ohcrap()
|
|
if ( m_fstate.m_errno == EDISKSTUCK )
|
|
return;
|
|
// just return on error, do nothing
|
|
if ( g_errno ) {
|
|
// free buffer though!! don't forget!
|
|
if ( allocBuf )
|
|
mfree ( allocBuf , allocSize , "RdbScan" );
|
|
m_fstate.m_allocBuf = NULL;
|
|
m_fstate.m_allocSize = 0;
|
|
return;
|
|
}
|
|
// . set our list here now since the buffer was allocated in
|
|
// DiskPageCache.cpp or Threads.cpp to save memory.
|
|
// . only set the list if there was a buffer. if not, it s probably
|
|
// due to a failed alloc and we'll just end up using the empty
|
|
// m_list we set way above.
|
|
if ( m_fstate.m_allocBuf ) {
|
|
// get the buffer info for setting the list
|
|
//char *allocBuf = m_fstate.m_allocBuf;
|
|
//int32_t allocSize = m_fstate.m_allocSize;
|
|
int32_t bytesDone = m_fstate.m_bytesDone;
|
|
// sanity checks
|
|
if ( bytesDone > allocSize ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
if ( allocOff + m_bytesToRead != allocSize ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
if ( allocOff != m_off + 16 ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
// now set this list. this always succeeds.
|
|
m_list->set ( allocBuf + allocOff , // buf + pad + m_off ,
|
|
m_bytesToRead , // bytesToRead ,
|
|
allocBuf ,
|
|
allocSize ,
|
|
m_startKey ,
|
|
m_endKey ,
|
|
m_fixedDataSize ,
|
|
true , // ownData?
|
|
m_useHalfKeys ,
|
|
m_ks );
|
|
}
|
|
|
|
// this was bitching a lot when running on a multinode cluster,
|
|
// so i effectively disabled it by changing to _GBSANITYCHECK2_
|
|
//#ifdef GBSANITYCHECK2
|
|
// this first test, tests to make sure the read from cache worked
|
|
DiskPageCache *pc = m_file->getDiskPageCache();
|
|
if ( pc &&
|
|
! g_errno &&
|
|
g_conf.m_logDebugDiskPageCache &&
|
|
// if we got it from the page cache, verify with disk
|
|
m_fstate.m_inPageCache ) {
|
|
// ensure threads disabled
|
|
bool on = ! g_threads.areThreadsDisabled();
|
|
if ( on ) g_threads.disableThreads();
|
|
//pc->disableCache();
|
|
FileState fstate;
|
|
// ensure we don't mess around
|
|
fstate.m_allocBuf = NULL;
|
|
fstate.m_buf = NULL;
|
|
char *bb = (char *)mmalloc ( m_bytesToRead , "RS" );
|
|
if ( ! bb ) {
|
|
log("db: Failed to alloc mem for page cache verify.");
|
|
goto skip;
|
|
}
|
|
m_file->read ( bb , // NULL, // buf + pad + m_off
|
|
m_bytesToRead ,
|
|
m_offset ,
|
|
&fstate , // &m_fstate
|
|
NULL , // callback state
|
|
gotListWrapper , // FAKE callback
|
|
MAX_NICENESS , // niceness
|
|
false, // m_allowPageCache ,... not for test!
|
|
m_hitDisk ,
|
|
16 + m_off );
|
|
//char *allocBuf = fstate.m_allocBuf;
|
|
//int32_t allocSize = fstate.m_allocSize;
|
|
//char *bb = allocBuf + fstate.m_allocOff;
|
|
// if file got unlinked from under us, or whatever, we get
|
|
// an error
|
|
if ( ! g_errno ) {
|
|
char *buf = m_list->getList();
|
|
if ( memcmp ( bb , buf , m_bytesToRead) != 0 ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
if ( m_bytesToRead != m_list->getListSize() ) {
|
|
char *xx = NULL; *xx = 0; }
|
|
}
|
|
// compare
|
|
if ( memcmp ( allocBuf+allocOff, bb , m_bytesToRead ) ) {
|
|
log("db: failed diskpagecache verify");
|
|
char *xx=NULL;*xx=0;
|
|
}
|
|
//mfree ( allocBuf , allocSize , "RS" );
|
|
mfree ( bb , m_bytesToRead , "RS" );
|
|
if ( on ) g_threads.enableThreads();
|
|
//pc->enableCache();
|
|
// . this test tests to make sure the page stores worked
|
|
// . go through each page in page cache and verify on disk
|
|
//pc->verifyData ( m_file );
|
|
}
|
|
skip:
|
|
//#endif
|
|
// assume we did not shift it
|
|
m_shifted = 0;//false;
|
|
// if we were doing a cache only read, and got nothing, bail now
|
|
if ( ! m_hitDisk && m_list->isEmpty() ) return;
|
|
// if first key in list is half, make it full
|
|
char *p = m_list->getList();
|
|
// . bitch if we read too much!
|
|
// . i think a read overflow might be causing a segv in malloc
|
|
// . NOTE: BigFile's call to DiskPageCache alters these values
|
|
if ( m_fstate.m_bytesDone != m_fstate.m_bytesToGo && m_hitDisk )
|
|
log(LOG_INFO,"disk: Read %"INT32" bytes but needed %"INT32".",
|
|
m_fstate.m_bytesDone , m_fstate.m_bytesToGo );
|
|
// adjust the list size for biased page cache if necessary
|
|
//if ( m_file->m_pc && m_allowPageCache &&
|
|
// m_file->m_pc->m_isOverriden &&
|
|
// m_fstate.m_bytesDone < m_list->m_listSize )
|
|
// m_list->m_listSize = m_fstate.m_bytesDone;
|
|
// bail if we don't do the 6 byte thing
|
|
if ( m_off == 0 ) return;
|
|
// posdb double compression?
|
|
if ( (m_rdbId == RDB_POSDB || m_rdbId == RDB2_POSDB2)
|
|
&& (p[0] & 0x04) ) {
|
|
// make it full
|
|
m_list->m_list -= 12;
|
|
m_list->m_listSize += 12;
|
|
p -= 12;
|
|
KEYSET(p,m_startKey,m_list->m_ks);
|
|
// clear the compression bits
|
|
*p &= 0xf9;
|
|
// let em know we shifted it so they can shift the hint offset
|
|
// up by 6
|
|
m_shifted = 12;
|
|
}
|
|
// if first key is already full (12 bytes) no need to do anything
|
|
else if ( m_list->isHalfBitOn ( p ) ) {
|
|
// otherwise, make it full
|
|
m_list->m_list -= 6;
|
|
m_list->m_listSize += 6;
|
|
p -= 6;
|
|
//*(key_t *)p = m_startKey;
|
|
KEYSET(p,m_startKey,m_list->m_ks);
|
|
// clear the half bit in case it is set
|
|
*p &= 0xfd;
|
|
// let em know we shifted it so they can shift the hint offset
|
|
// up by 6
|
|
m_shifted = 6; // true;
|
|
}
|
|
}
|