fix collection resetting.

This commit is contained in:
Matt Wells 2013-10-18 15:21:00 -07:00
parent 50313a815f
commit b589b17e63
15 changed files with 4240 additions and 303 deletions

View File

@ -66,6 +66,8 @@ CollectionRec::CollectionRec() {
m_replies = 0;
m_doingCallbacks = false;
m_lastResetCount = 0;
// for diffbot caching the global spider stats
reset();

View File

@ -362,6 +362,8 @@ class CollectionRec {
long m_maxNumSpiders ; // per local spider host
float m_spiderNewPct; ; // appx. percentage new documents
long m_lastResetCount;
// . in seconds
// . shift all spiderTimes for urls in spider queue down this many secs
//long m_spiderTimeShift;

View File

@ -550,26 +550,6 @@ bool Collectiondb::deleteRec ( char *coll , bool deleteTurkdb ) {
log("coll: deleting coll \"%s\"",cr->m_coll);
// we need a save
m_needsSave = true;
// nuke it on disk
char oldname[1024];
sprintf(oldname, "%scoll.%s.%li/",g_hostdb.m_dir,cr->m_coll,
(long)cr->m_collnum);
char newname[1024];
sprintf(newname, "%strash/coll.%s.%li.%lli/",g_hostdb.m_dir,cr->m_coll,
(long)cr->m_collnum,gettimeofdayInMilliseconds());
//Dir d; d.set ( dname );
// ensure ./trash dir is there
char trash[1024];
sprintf(trash, "%strash/",g_hostdb.m_dir);
::mkdir ( trash,
S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IXOTH ) ;
// move into that dir
::rename ( oldname , newname );
// debug message
logf ( LOG_INFO, "admin: deleted coll \"%s\" (%li).",
coll,(long)collnum );
// nuke doleiptable and waintree and waitingtable
/*
@ -666,6 +646,23 @@ bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
char *xx=NULL;*xx=0;
}
// so XmlDoc.cpp can detect if the collection was reset since it
// launched its spider:
cr->m_lastResetCount++;
collnum_t collnum = cr->m_collnum;
// . unlink all the *.dat and *.map files for this coll in its subdir
// . remove all recs from this collnum from m_tree/m_buckets
g_posdb.getRdb()->resetColl ( collnum );
g_titledb.getRdb()->resetColl ( collnum );
g_tagdb.getRdb()->resetColl ( collnum );
g_spiderdb.getRdb()->resetColl ( collnum );
g_doledb.getRdb()->resetColl ( collnum );
g_clusterdb.getRdb()->resetColl ( collnum );
g_linkdb.getRdb()->resetColl ( collnum );
/*
// make sure an update not in progress
if ( cr->m_inProgress ) { char *xx=NULL;*xx=0; }
@ -721,15 +718,16 @@ bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
// memcpy and their ptrs are now stored in "tmp"'s SafeBufs and will
// be passed on to the new rec.
g_parms.detachSafeBufs( &tmp );
*/
// let's reset crawlinfo crap
nr->m_globalCrawlInfo.reset();
nr->m_localCrawlInfo.reset();
cr->m_globalCrawlInfo.reset();
cr->m_localCrawlInfo.reset();
// . save it again after copy
// . no, i think addRec above saves it, so don't double save
// . ah just double save since we copied "tmp" back to it above
nr->save();
cr->save();
// and clear the robots.txt cache in case we recently spidered a
@ -737,8 +735,8 @@ bool Collectiondb::resetColl ( char *coll , bool resetTurkdb ) {
// have in the test-parser subdir so we are consistent
RdbCache *robots = Msg13::getHttpCacheRobots();
RdbCache *others = Msg13::getHttpCacheOthers();
robots->clear ( cn );
others->clear ( cn );
robots->clear ( collnum );
others->clear ( collnum );
//g_templateTable.reset();
//g_templateTable.save( g_hostdb.m_dir , "turkedtemplates.dat" );

File diff suppressed because it is too large Load Diff

View File

@ -1781,6 +1781,9 @@ bool sendErrorReply2 ( TcpSocket *socket , long fmt , char *msg ) {
"</body></html>"
, msg );
// log it
log("crawlbot: %s",msg );
//return g_httpServer.sendErrorReply(socket,500,sb.getBufStart());
return g_httpServer.sendDynamicPage (socket,
sb.getBufStart(),
@ -1810,7 +1813,7 @@ void addedUrlsToSpiderdbWrapper ( void *state ) {
delete st;
mdelete ( st , sizeof(StateCD) , "stcd" );
}
/*
void injectedUrlWrapper ( void *state ) {
StateCD *st = (StateCD *)state;
@ -1886,7 +1889,7 @@ void injectedUrlWrapper ( void *state ) {
delete st;
mdelete ( st , sizeof(StateCD) , "stcd" );
}
*/
class HelpItem {
public:
@ -2061,7 +2064,7 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
}
if ( gbstrlen(token) > 32 ) {
log("crawlbot: token is over 32 chars");
//log("crawlbot: token is over 32 chars");
char *msg = "crawlbot: token is over 32 chars";
return sendErrorReply2 (socket,fmt,msg);
}
@ -2118,14 +2121,14 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
}
if ( ! name ) {
log("crawlbot: no crawl name given");
//log("crawlbot: no crawl name given");
char *msg = "invalid or missing name";
return sendErrorReply2 (socket,fmt,msg);
}
if ( gbstrlen(name) > 30 ) {
log("crawlbot: name is over 30 chars");
//log("crawlbot: name is over 30 chars");
char *msg = "crawlbot: name is over 30 chars";
return sendErrorReply2 (socket,fmt,msg);
}
@ -2157,7 +2160,7 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
//if ( JS.getInputString("resetCrawl") ) resetColl = true;
if ( resetColl && ! cr ) {
log("crawlbot: no collection found to reset.");
//log("crawlbot: no collection found to reset.");
char *msg = "Could not find crawl to reset.";
return sendErrorReply2 (socket,fmt,msg);
}
@ -2217,7 +2220,7 @@ bool sendPageCrawlbot ( TcpSocket *socket , HttpRequest *hr ) {
// send back error
char *msg = "Collection add failed";
// log it
log("crawlbot: %s",msg);
//log("crawlbot: %s",msg);
// make sure this returns in json if required
return sendErrorReply2(socket,fmt,msg);
}
@ -2445,6 +2448,12 @@ bool printCrawlBotPage2 ( TcpSocket *socket ,
CollectionRec *cr = g_collectiondb.m_recs[collnum];
// was coll deleted while adding urls to spiderdb?
if ( ! cr ) {
g_errno = EBADREQUEST;
char *msg = "invalid crawl. crawl was deleted.";
return sendErrorReply2(socket,fmt,msg);
}
char *token = cr->m_diffbotToken.getBufStart();
char *name = cr->m_diffbotCrawlName.getBufStart();
@ -2769,7 +2778,8 @@ bool printCrawlBotPage2 ( TcpSocket *socket ,
// sanity check
if ( ! xd->m_oldsrValid ) { char *xx=NULL;*xx=0; }
// skip if not our coll rec!
if ( xd->m_cr != cr ) continue;
//if ( xd->m_cr != cr ) continue;
if ( xd->m_collnum != cr->m_collnum ) continue;
// grab it
SpiderRequest *oldsr = &xd->m_oldsr;
// get status

View File

@ -510,7 +510,8 @@ bool processLoop ( void *state ) {
// . save the ips.txt file if we are the test coll
// . saveTestBuf() is a function in Msge1.cpp
if ( xd && xd->m_coll && ! strcmp ( xd->m_coll , "test"))
CollectionRec *cr = xd->getCollRec();
if ( xd && cr && cr->m_coll && ! strcmp ( cr->m_coll , "test") )
// use same dir that XmlDoc::getTestDir() would use
saveTestBuf ( "test-page-parser" );

View File

@ -2922,6 +2922,10 @@ bool sendEmailThroughMandrill ( class EmailInfo *ei ) {
char *to = ei->m_toAddress.getBufStart();
char *from = ei->m_fromAddress.getBufStart();
char *crawl = "unknown2";
CollectionRec *cr = g_collectiondb.m_recs[ei->m_collnum];
if ( cr ) crawl = cr->m_diffbotCrawlName.getBufStart();
SafeBuf ub;
ub.safePrintf( "{\"key\":\"GhWT0UpcVBl7kmumrt9dqg\","
"\"template_name\":\"crawl-finished\","
@ -2950,7 +2954,7 @@ bool sendEmailThroughMandrill ( class EmailInfo *ei ) {
, from
, from
, from
, ei->m_cr->m_diffbotCrawlName.getBufStart()//coll
, crawl
);
// this is not for application/json content type in POST request
//ub.urlEncode();
@ -3047,24 +3051,30 @@ bool sendNotification ( EmailInfo *ei ) {
if ( ei->m_inUse ) { char *xx=NULL;*xx=0; }
// caller must set this, as well as m_finalCallback/m_finalState
CollectionRec *cr = ei->m_cr;
CollectionRec *cr = g_collectiondb.m_recs[ei->m_collnum];
char *email = cr->m_notifyEmail.getBufStart();
char *url = cr->m_notifyUrl.getBufStart();
char *email = "";
char *url = "";
char *crawl = "unknown2";
if ( cr ) email = cr->m_notifyEmail.getBufStart();
if ( cr ) url = cr->m_notifyUrl.getBufStart();
if ( cr ) crawl = cr->m_diffbotCrawlName.getBufStart();
// sanity check, can only call once
if ( ei->m_notifyBlocked != 0 ) { char *xx=NULL;*xx=0; }
ei->m_inUse = true;
if ( email && email[0] ) {
log("build: sending email notification to %s for coll \"%s\"",
email,cr->m_coll);
log("build: sending email notification to %s for crawl \"%s\"",
email,crawl);
SafeBuf msg;
msg.safePrintf("Your crawl \"%s\" "
"has hit a limitation and has "
"been paused."
, cr->m_coll);
, crawl );
// use this
ei->m_toAddress.safeStrcpy ( email );
ei->m_toAddress.nullTerm();
@ -3086,7 +3096,7 @@ bool sendNotification ( EmailInfo *ei ) {
if ( url && url[0] ) {
log("build: sending url notification to %s for coll \"%s\"",
url,cr->m_coll);
url,crawl);
// GET request
if ( ! g_httpServer.getDoc ( url ,
0 , // ip

View File

@ -16,7 +16,8 @@ public:
SafeBuf m_fromAddress;
SafeBuf m_subject;
SafeBuf m_body;
CollectionRec *m_cr;
//CollectionRec *m_cr;
collnum_t m_collnum;
char *m_dom; // ref into m_toAddress of the domain in email addr
SafeBuf m_mxDomain; // just the domain with a "gbmxrec-" prepended
void *m_state;

55
Rdb.cpp
View File

@ -535,6 +535,44 @@ bool Rdb::addColl ( char *coll ) {
return true;
}
bool Rdb::resetColl ( collnum_t collnum ) {
char *coll = g_collectiondb.m_recs[collnum]->m_coll;
// remove these collnums from tree
if(m_useTree) m_tree.delColl ( collnum );
else m_buckets.delColl ( collnum );
// . close all files, set m_numFiles to 0 in RdbBase
// . TODO: what about outstanding merge or dump operations?
RdbBase *base = getBase ( collnum );
base->reset();
// move the files into trash
// nuke it on disk
char oldname[1024];
sprintf(oldname, "%scoll.%s.%li/",g_hostdb.m_dir,coll,
(long)collnum);
char newname[1024];
sprintf(newname, "%strash/coll.%s.%li.%lli/",g_hostdb.m_dir,coll,
(long)collnum,gettimeofdayInMilliseconds());
//Dir d; d.set ( dname );
// ensure ./trash dir is there
char trash[1024];
sprintf(trash, "%strash/",g_hostdb.m_dir);
::mkdir ( trash,
S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IXOTH ) ;
// move into that dir
::rename ( oldname , newname );
logf ( LOG_INFO, "admin: cleared data for coll \"%s\" (%li) rdb=%s.",
coll,(long)collnum ,getDbnameFromId(m_rdbId));
return true;
}
// returns false and sets g_errno on error, returns true on success
bool Rdb::delColl ( char *coll ) {
collnum_t collnum = g_collectiondb.getCollnum ( coll );
@ -545,21 +583,26 @@ bool Rdb::delColl ( char *coll ) {
return log("db: %s: Failed to delete collection #%i. Does "
"not exist.", m_dbname,collnum);
}
// move all files to trash and clear the tree/buckets
resetColl ( collnum );
mdelete (base, sizeof(RdbBase), "Rdb Coll");
delete (base);
CollectionRec *cr = g_collectiondb.getRec(collnum);
//m_bases[collnum] = NULL;
log("rdb: deleted base from collrec "
"rdb=%s rdbid=%li coll=%s collnum=%li base=0x%lx",
m_dbname,(long)m_rdbId,coll,(long)collnum,(long)base);
CollectionRec *cr = g_collectiondb.getRec(collnum);
// NULL it out...
cr->m_bases[(unsigned char)m_rdbId] = NULL;
log("rdb: deleted base from collrec "
"rdb=%s rdbid=%li coll=%s collnum=%li base=0x%lx",
m_dbname,(long)m_rdbId,coll,(long)collnum,(long)base);
// remove these collnums from tree
if(m_useTree) m_tree.delColl ( collnum );
else m_buckets.delColl ( collnum );
//if(m_useTree) m_tree.delColl ( collnum );
//else m_buckets.delColl ( collnum );
// don't forget to save the tree to disk
//m_needsSave = true;
// and from cache, just clear everything out

2
Rdb.h
View File

@ -87,6 +87,8 @@ class Rdb {
bool addColl ( char *coll );
bool delColl ( char *coll );
bool resetColl ( collnum_t collnum ) ;
bool init ( char *dir , // working directory
char *dbname , // "indexdb","tagdb",...
bool dedup , //= true ,

View File

@ -1213,12 +1213,23 @@ key_t makeWaitingTreeKey ( uint64_t spiderTimeMS , long firstIp ) {
return wk;
}
CollectionRec *SpiderColl::getCollRec() {
CollectionRec *cr = g_collectiondb.m_recs[m_collnum];
if ( ! cr ) log("spider: lost coll rec");
return cr;
}
char *SpiderColl::getCollName() {
CollectionRec *cr = getCollRec();
if ( ! cr ) return "lostcollection";
return cr->m_coll;
}
// . call this when changing the url filters
// . will make all entries in waiting tree have zero time basically
void SpiderColl::urlFiltersChanged ( ) {
// log it
log("spider: rebuilding waiting tree for coll=%s",m_cr->m_coll);
log("spider: rebuilding waiting tree for coll=%s",getCollName());
m_lastUrlFiltersUpdate = getTimeGlobal();
// need to recompute this!
m_ufnMapValid = false;
@ -5013,15 +5024,15 @@ bool SpiderLoop::indexedDoc ( XmlDoc *xd ) {
m_numSpidersOut--;
// get coll
collnum_t collnum = g_collectiondb.getCollnum ( xd->m_coll );
collnum_t collnum = xd->m_collnum;//tiondb.getCollnum ( xd->m_coll );
// get it
SpiderColl *sc = g_spiderCache.getSpiderColl(collnum);
// decrement this
sc->m_spidersOut--;
// get the original request from xmldoc
SpiderRequest *sreq = &xd->m_oldsr;
// update this
sc->m_outstandingSpiders[(unsigned char)sreq->m_priority]--;
// update this. if coll was deleted while spidering, sc will be NULL
if ( sc ) sc->m_outstandingSpiders[(unsigned char)sreq->m_priority]--;
// debug log
//log("XXX: decremented count to %li for %s",
@ -9499,8 +9510,15 @@ bool updateCrawlInfo ( CollectionRec *cr ,
void doneSendingNotification ( void *state ) {
EmailInfo *ei = (EmailInfo *)state;
CollectionRec *cr = ei->m_cr;
log("spider: done sending notifications for coll=%s", cr->m_coll);
collnum_t collnum = ei->m_collnum;
CollectionRec *cr = g_collectiondb.m_recs[collnum];
char *coll = "lostcoll";
if ( cr ) coll = cr->m_coll;
log("spider: done sending notifications for coll=%s", coll);
// all done if collection was deleted from under us
if ( ! cr ) return;
// mark it as sent. anytime a new url is spidered will mark this
// as false again! use LOCAL crawlInfo, since global is reset often.
cr->m_localCrawlInfo.m_sentCrawlDoneAlert = 1;
@ -9665,7 +9683,7 @@ void gotCrawlInfoReply ( void *state , UdpSlot *slot ) {
// set it up
ei->m_finalCallback = doneSendingNotification;
ei->m_finalState = ei;
ei->m_cr = cr;
ei->m_collnum = cr->m_collnum;
sendNotification ( ei );
}

View File

@ -1061,7 +1061,9 @@ class SpiderColl {
// . 0 for main collection
collnum_t m_collnum;
char m_coll [ MAX_COLL_LEN + 1 ] ;
class CollectionRec *getCollRec();
class CollectionRec *m_cr;
char *getCollName();
bool m_isTestColl;
HashTableX m_doleIpTable;

File diff suppressed because it is too large Load Diff

View File

@ -479,7 +479,6 @@ class XmlDoc {
void nukeDoc ( class XmlDoc *);
void reset ( ) ;
bool setFirstUrl ( char *u , bool addWWW , Url *base = NULL ) ;
class CollectionRec *getCollRec ( );
bool setRedirUrl ( char *u , bool addWWW ) ;
void setStatus ( char *s ) ;
void setCallback ( void *state, void (*callback) (void *state) ) ;
@ -922,8 +921,16 @@ class XmlDoc {
long long m_firstUrlHash48;
long long m_firstUrlHash64;
Url m_currentUrl;
char *m_coll;
char m_collBuf[MAX_COLL_LEN+1]; // include \0
//char *m_coll;
//char m_collBuf[MAX_COLL_LEN+1]; // include \0
CollectionRec *m_lastcr;
collnum_t m_collnum;
long m_lastCollRecResetCount;
class CollectionRec *getCollRec ( ) ;
bool setCollNum ( char *coll ) ;
char *m_content;
long m_contentLen;
@ -1044,7 +1051,7 @@ class XmlDoc {
char m_firstUrlHash64Valid;
char m_lastUrlValid;
char m_docIdValid;
char m_collValid;
//char m_collValid;
char m_tagRecValid;
char m_robotsTxtLenValid;
char m_tagRecDataValid;
@ -1285,6 +1292,7 @@ class XmlDoc {
bool m_matchesValid;
bool m_dbufValid;
bool m_titleValid;
bool m_collnumValid;
//bool m_twidsValid;
bool m_termId32BufValid;
bool m_termInfoBufValid;
@ -1365,7 +1373,7 @@ class XmlDoc {
Msg22 m_msg22d;
Msg22 m_msg22e;
Msg22 m_msg22f;
long m_collLen;
//long m_collLen;
uint32_t m_gigabitVectorHash;
char m_gigabitQuery [XD_GQ_MAX_SIZE];
long m_gigabitHashes [XD_MAX_GIGABIT_HASHES];
@ -1510,7 +1518,7 @@ class XmlDoc {
long m_filteredContentMaxSize;
char m_calledThread;
long m_errno;
class CollectionRec *m_cr;
//class CollectionRec *m_cr;
//long m_utf8ContentAllocSize;
long m_hostHash32a;
long m_hostHash32b;

View File

@ -3173,7 +3173,8 @@ int main ( int argc , char *argv[] ) {
if ( testMandrill ) {
static EmailInfo ei;
ei.m_cr = g_collectiondb.getRec(1);
//ei.m_cr = g_collectiondb.getRec(1);
ei.m_collnum = 1;
ei.m_fromAddress.safePrintf("support@diffbot.com");
ei.m_toAddress.safePrintf("matt@diffbot.com");
ei.m_callback = exitWrapper;