new import code copiling. now needs runtime testing and

qa tests.
This commit is contained in:
mwells 2014-09-20 20:12:28 -07:00
parent 0ace1e3426
commit 2ca303b7d7
10 changed files with 555 additions and 435 deletions

View File

@ -551,9 +551,11 @@ class CollectionRec {
// for Spider.cpp
long m_updateRoundNum;
// IMPORT PARMS
char m_importEnabled;
SafeBuf m_importDir;
long m_importInjects;
long m_numImportInjects;
// from Conf.h
long m_posdbMinFilesToMerge ;

View File

@ -1,401 +0,0 @@
//////
//
// BEGIN IMPORT TITLEDB FUNCTIONS
//
//////
// . injecting titledb files from other gb clusters into your collection
// . select the 'import' tab in the admin gui and enter the directory of
// the titledb files you want to import/inject.
// . it will scan that directory for all titledb files.
// . you can also set max simultaneous injections. set to auto so it
// will do 10 per host, up to like 100 max.
#define MAXINJECTSOUT 100
class ImportState {
public:
// available msg7s to use
Msg7 **m_ptrs;
long m_numPtrs;
// collection we are importing INTO
collnum_t collnum;
long long m_numIn;
long long m_numOut;
ImportState() ;
ImportState~() { reset(); }
void reset();
};
ImportState::ImportState () {
m_numIn = 0 ;
m_numOut = 0;
m_ptrs = NULL;
m_numPtrs=0;
}
ImportState::reset() {
for ( long i = 0 ; i < numPtrs ; i++ ) {
Msg7 *msg7 = m_ptrs[i];
if ( ! msg7 ) continue;
msg7->reset();
delete ( msg7 );
mdelete ( msg7 );
m_ptrs[i] = NULL;
}
mfree ( m_ptrs , MAXINJECTSOUT * sizeof(Msg7 *) , "ism7f" );
m_ptrs = NULL;
m_numPtrs = 0;
}
// . call this when gb startsup
// . scan collections to see if any imports were active
bool resumeImports ( ) {
for ( long i = 0 ; i < g_collectiondb.m_numRecs ; i++ ) {
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
if ( ! cr->m_importEnabled ) continue;
// each import has its own state
ImportState *is;
// and collnum
is->m_collnum = cr->m_collnum;
// resume the import
is->importLoop ( );
}
}
void gotMsg7ReplyWrapper ( void *state ) {
Msg7 *msg7 = (Msg7 *)state;
msg7->m_numIn++;
log("tdbinject: injected %lli docs",m_numIn);
// if we were the least far ahead of scanning the files
// then save our position in case server crashes so we can
// resume
saveFileBookMark ( msg7 ) ;
importLoop();
}
//
// . ENTRY POINT FOR IMPORTING TITLEDB RECS FROM ANOTHER CLUSTER
// . when user clicks 'begin' in import page we come here..
// . so when that parm changes in Parms.cpp we sense that and call
// beginImport(CollectionRec *cr)
// . or on startup we call resumeImports to check each coll for
// an import in progress.
// . search for files named titledb*.dat
// . if none found just return
// . when msg7 inject competes it calls this
// . call this from sleep wrapper in Process.cpp
bool ImportState::importLoop ( ) {
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
if ( ! cr ) {
// if coll was deleted!
log("import: collnum %li deleted while importing into",
(long)m_collnum);
//if ( m_numOut > m_numIn ) return true;
// delete the import state i guess
delete ( this );
mdelete ( this );
return true;
}
INJECTLOOP:
// scan each titledb file scanning titledb0001.dat first,
// titledb0003.dat second etc.
long long offset = -1;
// when offset is too big for current s_bigFile file then
// we go to the next and set offset to 0.
BigFile *bf = getCurrentTitleFileAndOffset ( &offset );
// this is -1 if none remain!
if ( offset == -1 ) return true;
long need = 12;
long dataSize = -1;
// read in title rec key and data size
long n = bf->read ( &tkey, 12 , s_fileOffset );
if ( n != 12 ) goto nextFile;
// if non-negative then read in size
if ( tkey.n0 & 0x01 ) {
n = bf->read ( &dataSize , 4 , s_fileOffset );
if ( n != 4 ) goto nextFile;
need += 4;
need += dataSize;
if ( dataSize < 0 || dataSize > 500000000 ) {
log("main: could not scan in titledb rec of "
"corrupt dataSize of %li. BAILING ENTIRE "
"SCAN of file %s",s_titFilename);
goto nextFile;
}
}
// point to start of buf
sbuf.reset();
// ensure we have enough room
sbuf.reserve ( need );
// store title key
sbuf.safeMemcpy ( &tkey , sizeof(key_t) );
// then datasize if any. neg rec will have -1 datasize
if ( dataSize >= 0 )
sbuf.pushLong ( dataSize );
// then read data rec itself into it, compressed titlerec part
if ( dataSize > 0 ) {
// read in the titlerec after the key/datasize
n = bf->read ( sbuf.m_buf + sbuf.m_length ,
dataSize ,
s_fileOffset );
if ( n != dataSize ) {
log("main: failed to read in title rec "
"file. %li != %li. Skipping file %s",
n,dataSize,s_titFilename);
goto nextFile;
}
// it's good, count it
sbuf.m_length += n;
}
//XmlDoc *xd = getAvailXmlDoc();
Msg7 *msg7 = getAvailMsg7();
// if none, must have to wait for some to come back to us
if ( ! msg7 ) return false;
// set xmldoc from the title rec
//xd->set ( sbuf.getBufStart() );
//xd->m_masterState = NULL;
//xd->m_masterCallback ( titledbInjectLoop );
msg7->m_hackFileOff = s_fileOffset;
msg7->m_hackFileId = s_fileId;
GigablastRequest *gr = &msg7->m_gr;
// inject a title rec buf!!
gr->m_titleRecBuf = &sbuf;
//
// point to next doc in the titledb file
//
s_fileOffset += need;
msg7->m_numOut++;
// then index it. master callback will be called
//if ( ! xd->index() ) return false;
// TODO: make this forward the request to an appropriate host!!
if ( msg7->inject ( msg7 , // state
gotMsg7ReplyWrapper ) ) // callback
// it didn't block somehow...
m_numIn++;
goto INJECTLOOP;
nextFile:
// invalidate this flag
s_offIsValid = false;
// and call this function. we add one to s_bfFileId so we
// do not re-get the file we just injected.
bf = getCurrentTitleFileAndOffset ( &offset , s_bfFileId+1 );
// still going on? bf should be new and offset should be 0
if ( bf )
goto INJECTLOOP;
// if it returns NULL we are done!
log("main: titledb injection loop completed. waiting for "
"outstanding injects to return.");
if ( msg7->m_numOut > msg7->m_numIn )
return false;
log("main: all injects have returned. DONE.");
// dummy return
return true;
}
// return NULL with g_errno set on error
Msg7 *ImportLoop::getAvailMsg7 ( ) {
static XmlDoc **s_ptrs = NULL;
if ( ! s_ptrs ) {
s_ptrs = mmalloc ( sizeof(XmlDoc *) * MAXINJECTSOUT,"sxdp");
if ( ! s_ptrs ) return NULL;
}
// respect the user limit for this coll
long long out = s_numOut - s_numIn;
if ( out >= cr->m_importInjects ) {
g_errno = 0;
return NULL;
}
// find one not in use and return it
for ( long i = 0 ; i < (long)MAXINJECTSOUT ; i++ ) {
// point to it
XmlDoc *xd = s_ptrs[i];
// if one is there already and not in use, recycle it
if ( xd ) {
if ( xd->m_docInUse ) continue;
xd->m_docInUse = true;
return true;
}
// otherwise, make a new one
try { xd = new (XmlDoc); }
catch ( ... ) {
g_errno = ENOMEM;
log("PageInject: new(%i): %s",
(int)sizeof(XmlDoc),mstrerror(g_errno));
return NULL;
}
mnew ( xd, sizeof(XmlDoc) , "PageImport" );
s_ptrs[i] = xd;
xd->m_docInUse = true;
return xd;
}
// none avail
g_errno = 0;
return NULL;
}
static long s_fileId = -1;
static long long s_fileOffset = 0LL;
static long s_bfFileId = -2;
static BigFile s_bf;
static bool s_offIsValid = false;
BigFile *getCurrentTitleFileAndOffset ( long long *off , long minFileId ) {
if ( s_offIsValid ) {
*off = s_fileOffset;
return &s_bf;
}
s_offIsValid = true;
// look for titledb0001.dat etc. files in the
// workingDir/inject/ subdir
SafeBuf ddd;
ddd.safePrintf("%sinject",cr->m_importDir);
// now use the one provided. we should also provide the # of threads
if ( g_conf.m_importDir && g_conf.m_importDir[0] ) {
ddd.reset();
ddd.safeStrcpy ( g_conf.m_importDir );
}
//
// assume we are the first filename
// set s_fileId to the minimum
//
Dir dir;
dir.set(ddd.getBufStart());
// getNextFilename() writes into this
char pattern[8]; strcpy ( pattern , "titledb*.dat*" );
char *filename;
while ( ( filename = dir.getNextFilename ( pattern ) ) ) {
// filename must be a certain length
long filenameLen = gbstrlen(filename);
// we need at least "titledb0001.dat"
if ( filenameLen < 15 ) continue;
// ensure filename starts w/ our m_dbname
if ( strncmp ( filename , "titledb", 7 ) != 0 )
continue;
// then a 4 digit number should follow
char *s = filename + 7;
if ( ! isdigit(*(s+0)) ) continue;
if ( ! isdigit(*(s+1)) ) continue;
if ( ! isdigit(*(s+2)) ) continue;
if ( ! isdigit(*(s+3)) ) continue;
// convert digit to id
long id = atol(s);
// do not accept files we've already processed
if ( id < minFileId ) continue;
// the min of those we haven't yet processed/injected
if ( id < s_fileId ) s_fileId = id;
}
// get where we left off
static bool s_loadedPlaceHolder = false;
if ( ! s_loadedPlaceHolder ) {
// read where we left off from file if possible
char fname[256];
sprintf(fname,"%slasttitledbinjectinfo.dat",g_hostdb.m_dir);
SafeBuf ff;
ff.fillFromFile(fname);
if ( ff.length() < 1 ) goto noplaceholder;
// get the placeholder
sscanf ( ff.getBufStart()
, "%llu,%lu"
, &s_fileOffset
, &s_fileId
);
}
// if no files!
if ( s_fileId == -1 ) return NULL;
// set up s_bf then
if ( s_bfFileId != s_fileId ) {
SafeBuf tmp;
tmp.safePrintf("%sinject/titledb%04li.dat");
s_bf.set ( tmp );
s_bfFileId = s_fileId;
}
return &s_bf;
}
// "xd" is the XmlDoc thtat just completed injecting
void saveFileBookMark ( XmlDoc *xd ) {
long fileId = xd->m_hackFileId;
long fileOff = xd->m_hackFileOff;
// if there is one outstanding the preceeded us, we can't update
// the bookmark just yet.
for ( long i = 0 ; i < (long)MAXINJECTSOUT ; i++ ) {
XmlDoc *od = &s_xmlDocPtr[i];
if ( od == xd ) continue;
if ( ! od->m_docInUse ) continue;
if ( od->m_hackFileId < fileId ) return;
if ( od->m_hackFileId == fileId &&
od->m_hackFileOff < fileOff ) return;
}
char fname[256];
sprintf(fname,"%slasttitledbinjectinfo.dat",g_hostdb.m_dir);
SafeBuf ff;
ff.safePrintf("%llu,%lu",s_fileOffset,s_fileId);
ff.save ( fname );
}

View File

@ -1,6 +0,0 @@
#ifndef IMPORT_H
#define IMPORT_H
void resumeImports();
#endif

View File

@ -60,7 +60,7 @@ OBJS = UdpSlot.o Rebalance.o \
Dates.o Sections.o SiteGetter.o Syncdb.o qa.o \
Placedb.o Address.o Test.o GeoIP.o GeoIPCity.o Synonyms.o \
Cachedb.o Monitordb.o dlstubs.o PageCrawlBot.o Json.o PageBasic.o \
Import.o Version.o
Version.o
CHECKFORMATSTRING = -D_CHECK_FORMAT_STRING_

View File

@ -31,13 +31,15 @@ static void sendReplyWrapper ( void *state ) {
// HttpServer::sendReply() so we gotta copy it here
bool sendPageInject ( TcpSocket *sock , HttpRequest *hr ) {
if ( ! g_conf.m_injectionEnabled ) {
g_errno = EBADENGINEER;
log("inject: injection disabled");
return g_httpServer.sendErrorReply(sock,500,"injection is disabled by "
"the administrator in the master "
"controls");
}
if ( ! g_conf.m_injectionEnabled ) {
g_errno = EBADENGINEER;
log("inject: injection disabled");
return g_httpServer.sendErrorReply(sock,500,"injection is "
"disabled by "
"the administrator in "
"the master "
"controls");
}
@ -318,6 +320,17 @@ bool sendReply ( void *state ) {
Msg7::Msg7 () {
reset();
}
Msg7::~Msg7 () {
}
void Msg7::constructor () {
reset();
}
void Msg7::reset() {
m_round = 0;
m_firstTime = true;
m_fixMe = false;
@ -325,14 +338,13 @@ Msg7::Msg7 () {
m_start = NULL;
}
Msg7::~Msg7 () {
}
// when XmlDoc::inject() complets it calls this
void doneInjectingWrapper9 ( void *state ) {
Msg7 *msg7 = (Msg7 *)state;
msg7->m_inUse = false;
// shortcut
XmlDoc *xd = &msg7->m_xd;
@ -515,6 +527,8 @@ bool Msg7::inject ( void *state ,
// count them
m_injectCount++;
m_inUse = true;
if ( ! xd->injectDoc ( m_injectUrlBuf.getBufStart() ,
cr ,
start , // content ,
@ -533,6 +547,9 @@ bool Msg7::inject ( void *state ,
// we blocked...
return false;
m_inUse = false;
return true;
}
@ -710,3 +727,499 @@ bool Msg7::scrapeQuery ( ) {
//printReply();
return true;
}
///////////////////////////////////////
///////////////////////////////////////
// IMPORT CODE
///////////////////////////////////////
///////////////////////////////////////
//////
//
// BEGIN IMPORT TITLEDB FUNCTIONS
//
//////
// . injecting titledb files from other gb clusters into your collection
// . select the 'import' tab in the admin gui and enter the directory of
// the titledb files you want to import/inject.
// . it will scan that directory for all titledb files.
// . you can also set max simultaneous injections. set to auto so it
// will do 10 per host, up to like 100 max.
#define MAXINJECTSOUT 100
class ImportState {
public:
// available msg7s to use
class Msg7 *m_ptrs;
long m_numPtrs;
// collection we are importing INTO
collnum_t m_collnum;
long long m_numIn;
long long m_numOut;
// bookmarking helpers
long m_fileId;
long long m_fileOffset;
long m_bfFileId;
BigFile m_bf;
bool m_offIsValid;
bool m_loadedPlaceHolder;
class Msg7 *getAvailMsg7();
void saveFileBookMark ( class Msg7 *msg7 );
BigFile *getCurrentTitleFileAndOffset ( CollectionRec *cr ,
long long *off ,
long minFileId );
ImportState() ;
~ImportState() { reset(); }
bool importLoop();
void reset();
};
ImportState::ImportState () {
m_numIn = 0 ;
m_numOut = 0;
m_ptrs = NULL;
m_numPtrs=0;
}
void ImportState::reset() {
for ( long i = 0 ; i < m_numPtrs ; i++ ) {
Msg7 *msg7 = &m_ptrs[i];
if ( ! msg7 ) continue;
msg7->reset();
mdelete ( msg7, sizeof(Msg7) , "PageInject" );
delete (msg7);
//m_ptrs[i] = NULL;
}
mfree ( m_ptrs , MAXINJECTSOUT * sizeof(Msg7 *) , "ism7f" );
m_ptrs = NULL;
m_numPtrs = 0;
m_fileId = -1;
m_fileOffset = 0LL;
m_bfFileId = -2;
m_offIsValid = false;
m_loadedPlaceHolder = false;
}
// . call this when gb startsup
// . scan collections to see if any imports were active
bool resumeImports ( ) {
for ( long i = 0 ; i < g_collectiondb.m_numRecs ; i++ ) {
CollectionRec *cr = g_collectiondb.m_recs[i];
if ( ! cr ) continue;
if ( ! cr->m_importEnabled ) continue;
// each import has its own state
// it contains a sequence of msg7s to do simulataneous
// injections
ImportState *is;
try { is = new (ImportState); }
catch ( ... ) {
g_errno = ENOMEM;
log("PageInject: new(%i): %s",
(int)sizeof(ImportState),mstrerror(g_errno));
return NULL;
}
mnew ( is, sizeof(ImportState) , "isstate");
// assign to cr as well
//cr->m_importState = is;
// and collnum
is->m_collnum = cr->m_collnum;
// resume the import
is->importLoop ( );
}
return true;
}
BigFile *ImportState::getCurrentTitleFileAndOffset ( CollectionRec *cr ,
long long *off ,
long minFileId ) {
if ( m_offIsValid ) {
*off = m_fileOffset;
return &m_bf;
}
m_offIsValid = true;
// look for titledb0001.dat etc. files in the
// workingDir/inject/ subdir
SafeBuf ddd;
ddd.safePrintf("%sinject",cr->m_importDir.getBufStart());
// now use the one provided. we should also provide the # of threads
if ( cr->m_importDir.getBufStart() &&
cr->m_importDir.getBufStart()[0] ) {
ddd.reset();
ddd.safeStrcpy ( cr->m_importDir.getBufStart() );
}
//
// assume we are the first filename
// set s_fileId to the minimum
//
Dir dir;
dir.set(ddd.getBufStart());
// getNextFilename() writes into this
char pattern[8]; strcpy ( pattern , "titledb*.dat*" );
char *filename;
while ( ( filename = dir.getNextFilename ( pattern ) ) ) {
// filename must be a certain length
long filenameLen = gbstrlen(filename);
// we need at least "titledb0001.dat"
if ( filenameLen < 15 ) continue;
// ensure filename starts w/ our m_dbname
if ( strncmp ( filename , "titledb", 7 ) != 0 )
continue;
// then a 4 digit number should follow
char *s = filename + 7;
if ( ! isdigit(*(s+0)) ) continue;
if ( ! isdigit(*(s+1)) ) continue;
if ( ! isdigit(*(s+2)) ) continue;
if ( ! isdigit(*(s+3)) ) continue;
// convert digit to id
long id = atol(s);
// do not accept files we've already processed
if ( id < minFileId ) continue;
// the min of those we haven't yet processed/injected
if ( id < m_fileId ) m_fileId = id;
}
// get where we left off
if ( ! m_loadedPlaceHolder ) {
// read where we left off from file if possible
char fname[256];
sprintf(fname,"%slasttitledbinjectinfo.dat",g_hostdb.m_dir);
SafeBuf ff;
ff.fillFromFile(fname);
if ( ff.length() > 1 ) {
m_loadedPlaceHolder = true;
// get the placeholder
sscanf ( ff.getBufStart()
, "%llu,%lu"
, &m_fileOffset
, &m_fileId
);
}
}
// if no files!
if ( m_fileId == -1 ) return NULL;
// set up s_bf then
if ( m_bfFileId != m_fileId ) {
SafeBuf tmp;
tmp.safePrintf("inject/titledb%04li.dat"
,m_fileId);
m_bf.set ( g_hostdb.m_dir, tmp.getBufStart() );
m_bfFileId = m_fileId;
}
return &m_bf;
}
void gotMsg7ReplyWrapper ( void *state ) ;
//
// . ENTRY POINT FOR IMPORTING TITLEDB RECS FROM ANOTHER CLUSTER
// . when user clicks 'begin' in import page we come here..
// . so when that parm changes in Parms.cpp we sense that and call
// beginImport(CollectionRec *cr)
// . or on startup we call resumeImports to check each coll for
// an import in progress.
// . search for files named titledb*.dat
// . if none found just return
// . when msg7 inject competes it calls this
// . call this from sleep wrapper in Process.cpp
bool ImportState::importLoop ( ) {
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
if ( ! cr ) {
// if coll was deleted!
log("import: collnum %li deleted while importing into",
(long)m_collnum);
//if ( m_numOut > m_numIn ) return true;
// delete the entire import state i guess
// what happens if we have a msg7 reply come back in?
// it should see the collrec is NULL and just fail.
mdelete ( this, sizeof(ImportState) , "impstate");
delete (this);
return true;
}
INJECTLOOP:
// scan each titledb file scanning titledb0001.dat first,
// titledb0003.dat second etc.
long long offset = -1;
// when offset is too big for current m_bigFile file then
// we go to the next and set offset to 0.
BigFile *bf = getCurrentTitleFileAndOffset ( cr , &offset , -1 );
// this is -1 if none remain!
if ( offset == -1 ) return true;
Msg7 *msg7;
GigablastRequest *gr;
SafeBuf sbuf;
long need = 12;
long dataSize = -1;
XmlDoc xd;
// read in title rec key and data size
key128_t tkey;
long n = bf->read ( &tkey, 12 , m_fileOffset );
if ( n != 12 ) goto nextFile;
// if non-negative then read in size
if ( tkey.n0 & 0x01 ) {
n = bf->read ( &dataSize , 4 , m_fileOffset );
if ( n != 4 ) goto nextFile;
need += 4;
need += dataSize;
if ( dataSize < 0 || dataSize > 500000000 ) {
log("main: could not scan in titledb rec of "
"corrupt dataSize of %li. BAILING ENTIRE "
"SCAN of file %s",dataSize,bf->getFilename());
goto nextFile;
}
}
// point to start of buf
sbuf.reset();
// ensure we have enough room
sbuf.reserve ( need );
// store title key
sbuf.safeMemcpy ( &tkey , sizeof(key_t) );
// then datasize if any. neg rec will have -1 datasize
if ( dataSize >= 0 )
sbuf.pushLong ( dataSize );
// then read data rec itself into it, compressed titlerec part
if ( dataSize > 0 ) {
// read in the titlerec after the key/datasize
n = bf->read ( sbuf.getBuf() ,
dataSize ,
m_fileOffset );
if ( n != dataSize ) {
log("main: failed to read in title rec "
"file. %li != %li. Skipping file %s",
n,dataSize,bf->getFilename());
goto nextFile;
}
// it's good, count it
sbuf.m_length += n;
}
//XmlDoc *xd = getAvailXmlDoc();
msg7 = getAvailMsg7();
// if none, must have to wait for some to come back to us
if ( ! msg7 ) return false;
// set xmldoc from the title rec
//xd->set ( sbuf.getBufStart() );
//xd->m_masterState = NULL;
//xd->m_masterCallback ( titledbInjectLoop );
// we use this so we know where the doc we are injecting
// was in the foregien titledb file. so we can update our bookmark
// code.
msg7->m_hackFileOff = m_fileOffset;
msg7->m_hackFileId = m_fileId;
gr = &msg7->m_gr;
//
// inject a title rec buf this time, we are doing an import
// FROM A TITLEDB FILE!!!
//
//gr->m_titleRecBuf = &sbuf;
// break it down into gw
xd.set2 ( sbuf.getBufStart() ,
sbuf.length() , // max size
cr->m_coll, // use our coll
NULL , // pbuf for page parser
1 , // niceness
NULL ); //sreq );
// now we can set gr for the injection
gr->m_url = xd.getFirstUrl()->getUrl();
gr->m_queryToScrape = NULL;
gr->m_contentDelim = 0;
gr->m_contentTypeStr = g_contentTypeStrings [xd.m_contentType];
gr->m_contentFile = NULL;
gr->m_content = xd.ptr_utf8Content;
gr->m_diffbotReply = NULL;
gr->m_injectLinks = false;
gr->m_spiderLinks = true;
gr->m_shortReply = false;
gr->m_newOnly = false;
gr->m_deleteUrl = false;
gr->m_recycle = true; // recycle content? or sitelinks?
gr->m_dedup = false;
gr->m_hasMime = false;
gr->m_doConsistencyTesting = false;
gr->m_getSections = false;
gr->m_gotSections = false;
gr->m_charset = xd.m_charset;
gr->m_hopCount = xd.m_hopCount;
//
// point to next doc in the titledb file
//
m_fileOffset += need;
m_numOut++;
// then index it. master callback will be called
//if ( ! xd->index() ) return false;
// TODO: make this forward the request to an appropriate host!!
if ( msg7->inject ( msg7 , // state
gotMsg7ReplyWrapper ) ) // callback
// it didn't block somehow...
m_numIn++;
goto INJECTLOOP;
nextFile:
// invalidate this flag
m_offIsValid = false;
// and call this function. we add one to m_bfFileId so we
// do not re-get the file we just injected.
bf = getCurrentTitleFileAndOffset ( cr , &offset , m_bfFileId+1 );
// still going on? bf should be new and offset should be 0
if ( bf )
goto INJECTLOOP;
// if it returns NULL we are done!
log("main: titledb injection loop completed. waiting for "
"outstanding injects to return.");
if ( m_numOut > m_numIn )
return false;
log("main: all injects have returned. DONE.");
// dummy return
return true;
}
void gotMsg7ReplyWrapper ( void *state ) {
Msg7 *msg7 = (Msg7 *)state;
if ( msg7->m_inUse ) { char *xx=NULL;*xx=0; }
ImportState *is = msg7->m_importState;
is->m_numIn++;
log("tdbinject: injected %lli docs",is->m_numIn);
// if we were the least far ahead of scanning the files
// then save our position in case server crashes so we can
// resume
is->saveFileBookMark ( msg7 );
is->importLoop();
}
// . return NULL with g_errno set on error
// . importLoop() calls this to get a msg7 to inject a doc from the foreign
// titledb file into our local collection
Msg7 *ImportState::getAvailMsg7 ( ) {
//static XmlDoc **s_ptrs = NULL;
// this is legit because parent checks for it
CollectionRec *cr = g_collectiondb.getRec ( m_collnum );
// each msg7 has an xmldoc doc in it
if ( ! m_ptrs ) {
m_ptrs = (Msg7 *)mmalloc(sizeof(Msg7)* MAXINJECTSOUT,"sxdp");
if ( ! m_ptrs ) return NULL;
for ( long i = 0 ; i < MAXINJECTSOUT ;i++ )
m_ptrs[i].constructor();
}
// respect the user limit for this coll
long long out = m_numOut - m_numIn;
if ( out >= cr->m_numImportInjects ) {
g_errno = 0;
return NULL;
}
// find one not in use and return it
for ( long i = 0 ; i < (long)MAXINJECTSOUT ; i++ ) {
// point to it
Msg7 *m7 = &m_ptrs[i];
if ( m7->m_inUse ) continue;
m7->m_inUse = true;
m7->m_importState = this;
return m7;
}
// none avail
g_errno = 0;
return NULL;
}
// "xd" is the XmlDoc that just completed injecting
void ImportState::saveFileBookMark ( Msg7 *msg7 ) {
long fileId = msg7->m_hackFileId;
long long fileOff = msg7->m_hackFileOff;
// if there is one outstanding the preceeded us, we can't update
// the bookmark just yet.
for ( long i = 0 ; i < (long)MAXINJECTSOUT ; i++ ) {
Msg7 *m7 = &m_ptrs[i];
// skip if us
if ( m7 == msg7 ) continue;
// if not in use it has never launched? or is back...
if ( m7 != msg7 && ! msg7->m_inUse ) continue;
// return if we are NOT the earliest launched that just
// came back
if ( msg7->m_hackFileId < fileId ) return;
if ( msg7->m_hackFileId == fileId &&
msg7->m_hackFileOff < fileOff ) return;
}
char fname[256];
sprintf(fname,"%slasttitledbinjectinfo.dat",g_hostdb.m_dir);
SafeBuf ff;
ff.safePrintf("%llu,%lu",m_fileOffset,m_fileId);
ff.save ( fname );
}

View File

@ -3,6 +3,8 @@
bool sendPageInject ( class TcpSocket *s, class HttpRequest *hr );
bool resumeImports ( ) ;
#include "XmlDoc.h"
#include "Users.h"
#include "Parms.h" // GigablastRequest
@ -29,10 +31,19 @@ public:
void *m_state;
void (* m_callback )(void *state);
long long m_hackFileOff;
long m_hackFileId;
//long m_crawlbotAPI;
class ImportState *m_importState;
void constructor();
Msg7 ();
~Msg7 ();
bool m_inUse;
void reset();
bool scrapeQuery ( );

View File

@ -14214,17 +14214,17 @@ void Parms::init ( ) {
m->m_flags = PF_API;
m++;
m->m_title = "collection";
m->m_desc = "Collection to import documents into.";
m->m_cgi = "c";
m->m_page = PAGE_IMPORT;
m->m_obj = OBJ_GBREQUEST;
m->m_off = (char *)&gr.m_coll - (char *)&gr;
m->m_type = TYPE_CHARPTR;
m->m_def = NULL;
// PF_COLLDEFAULT: so it gets set to default coll on html page
m->m_flags = PF_API|PF_REQUIRED|PF_NOHTML;
m++;
// m->m_title = "collection";
// m->m_desc = "Collection to import documents into.";
// m->m_cgi = "c";
// m->m_page = PAGE_IMPORT;
// m->m_obj = OBJ_GBREQUEST;
// m->m_off = (char *)&cr.m_imcoll - (char *)&gr;
// m->m_type = TYPE_CHARPTR;
// m->m_def = NULL;
// // PF_COLLDEFAULT: so it gets set to default coll on html page
// m->m_flags = PF_API|PF_REQUIRED|PF_NOHTML;
// m++;
m->m_title = "directory containing titledb files";
m->m_desc = "Import documents contained in titledb files in this "
@ -14241,10 +14241,10 @@ void Parms::init ( ) {
m->m_title = "number of simultaneous injections";
m->m_desc = "Typically try one or two injections per host in "
"your cluster.";
m->m_cgi = "importinjects";
m->m_cgi = "numimportinjects";
m->m_page = PAGE_IMPORT;
m->m_obj = OBJ_COLL;
m->m_off = (char *)&cr.m_importInjects - x;
m->m_off = (char *)&cr.m_numImportInjects - x;
m->m_type = TYPE_LONG;
m->m_def = "2";
m->m_flags = PF_API;

View File

@ -141,6 +141,7 @@ class GigablastRequest {
long m_charset;
long m_hopCount;
///////////
//
// /admin/import parms

View File

@ -45,7 +45,7 @@
#include "Proxy.h"
#include "Rebalance.h"
#include "SpiderProxy.h"
#include "Import.h"
#include "PageInject.h"
// the query log hashtable defined in XmlDoc.cpp
//extern HashTableX g_qt;

View File

@ -1,5 +1,5 @@
#ifndef UNICODE_H__
#define UNICODE_H__
#ifndef UNICODEH
#define UNICODEH
#include <sys/types.h>
#include <limits.h>