Merge branch 'testing'

Conflicts:
	Rdb.cpp
This commit is contained in:
Matt 2015-11-14 10:57:27 -07:00
commit 6e12f96aea
38 changed files with 5345 additions and 510 deletions

View File

@ -581,6 +581,9 @@ bool Collectiondb::addNewColl ( char *coll ,
// reset the crawl stats
// always turn off gigabits so &s=1000 can do summary skipping
cr->m_docsToScanForTopics = 0;
// turn off link voting, etc. to speed up
cr->m_getLinkInfo = false;
cr->m_computeSiteNumInlinks = false;
}
// . this will core if a host was dead and then when it came

View File

@ -525,6 +525,7 @@ class CollectionRec {
char m_enforceNewQuotas ;
char m_doIpLookups ; // considered iff using proxy
char m_useRobotsTxt ;
char m_obeyRelNoFollowLinks ;
char m_forceUseFloaters ;
char m_automaticallyUseProxies ;
char m_automaticallyBackOff ;

View File

@ -131,8 +131,12 @@ void DailyMerge::dailyMergeLoop ( ) {
// get our dow
int32_t todayDOW = tt->tm_wday + 1;
// make sure 1 to 7
if ( todayDOW < 0 ) { char *xx=NULL;*xx=0; }
if ( todayDOW > 6 ) { char *xx=NULL;*xx=0; }
if ( todayDOW < 0 || todayDOW > 6 ) {
log("merge: bad today dow of %i for coll %s",
(int)todayDOW,cr->m_coll);
return;
}
//if ( todayDOW > 6 ) { char *xx=NULL;*xx=0; }
// skip if not a dayofweek to merge on
if ( dowCounts [ todayDOW ] == 0 ) continue;

View File

@ -2470,7 +2470,8 @@ Host *Dns::getResponsibleHost ( key_t key ) {
// get the hostNum that should handle this
int32_t hostId = key.n1 % hostdb->getNumHosts();
// return it if it is alive
if ( ! hostdb->isDead ( hostId ) ) return hostdb->getHost ( hostId );
Host* h = hostdb->getHost ( hostId );
if ( h->m_spiderEnabled && ! hostdb->isDead ( hostId ) ) return h;
// how many are up?
int32_t numAlive = hostdb->getNumHostsAlive();
// NULL if none
@ -2482,6 +2483,7 @@ Host *Dns::getResponsibleHost ( key_t key ) {
for ( int32_t i = 0 ; i < hostdb->m_numHosts ; i++ ) {
// get the ith host
Host *host = &hostdb->m_hosts[i];
if ( !host->m_spiderEnabled ) continue;
// skip him if he is dead
if ( hostdb->isDead ( host ) ) continue;
// count it if alive, continue if not our number

View File

@ -1652,6 +1652,45 @@ Host *Hostdb::getLiveHostInShard ( int32_t shardNum ) {
return &shard[0];
}
int32_t Hostdb::getHostIdWithSpideringEnabled ( uint32_t shardNum ) {
Host *hosts = g_hostdb.getShard ( shardNum);
int32_t numHosts = g_hostdb.getNumHostsPerShard();
int32_t hostNum = 0;
int32_t numTried = 0;
while( !hosts [ hostNum ].m_spiderEnabled && numTried < numHosts ) {
hostNum = (hostNum+1) % numHosts;
numTried++;
}
if( !hosts [ hostNum ].m_spiderEnabled) {
log("build: cannot spider when entire shard has nospider enabled");
char *xx = NULL; *xx = 0;
}
return hosts [ hostNum ].m_hostId ;
}
Host *Hostdb::getLeastLoadedInShard ( uint32_t shardNum ) {
int32_t minOutstandingRequests = 0x7fffffff;
int32_t minOutstandingRequestsIndex = -1;
Host *shard = getShard ( shardNum );
for(int32_t i = 0; i < m_numHostsPerShard; i++) {
Host *hh = &shard[i];
if(isDead(hh)) continue;
// log("host %"INT32 " numOutstanding is %"INT32, hh->m_hostId,
// hh->m_pingInfo.m_udpSlotsInUseIncoming);
if(hh->m_pingInfo.m_udpSlotsInUseIncoming > minOutstandingRequests) continue;
minOutstandingRequests = hh->m_pingInfo.m_udpSlotsInUseIncoming;
minOutstandingRequestsIndex = i;
}
if(minOutstandingRequestsIndex == -1) return shard;
return &shard[minOutstandingRequestsIndex];
}
// if all are dead just return host #0
Host *Hostdb::getFirstAliveHost ( ) {
for ( int32_t i = 0 ; i < m_numHosts ; i++ )

View File

@ -450,6 +450,9 @@ class Hostdb {
//Host *getLiveHostInGroup ( int32_t groupId );
Host *getLiveHostInShard ( int32_t shardNum );
Host *getLeastLoadedInShard ( uint32_t shardNum );
int32_t getHostIdWithSpideringEnabled ( uint32_t shardNum );
// in the entire cluster. return host #0 if its alive, otherwise
// host #1, etc.
@ -469,6 +472,7 @@ class Hostdb {
return &m_hosts[shardNum * m_numHostsPerShard];
};
//Host *getGroupFromGroupId ( uint32_t gid ) {
// return getGroup ( gid );
//};

View File

@ -603,6 +603,10 @@ bool getLinkInfo ( SafeBuf *reqBuf ,
Host *hosts = g_hostdb.getShard ( shardNum); // Group ( groupId );
if ( hostNum >= numHosts ) { char *xx = NULL; *xx = 0; }
int32_t hostId = hosts [ hostNum ].m_hostId ;
if( !hosts [ hostNum ].m_spiderEnabled) {
hostId = g_hostdb.getHostIdWithSpideringEnabled ( shardNum );
}
// . serialize the string buffers
// . use Msg25Request::m_buf[MAX_NEEDED]

View File

@ -2708,6 +2708,32 @@ void Loop::enableTimer() {
}
FILE* gbpopen(char* cmd) {
// Block everything from interrupting this system call because
// if there is an alarm or a child thread crashes (pdftohtml)
// then this will hang forever.
// We should actually write our own popen so that we do
// fork, close all fds in the child, then exec.
// These child processes can hold open the http server and
// prevent a new gb from running even after it has died.
g_loop.disableTimer();
sigset_t oldSigs;
sigset_t sigs;
sigfillset ( &sigs );
if ( sigprocmask ( SIG_BLOCK , &sigs, &oldSigs ) < 0 ) {
log("build: had error blocking signals for popen");
}
FILE* fh = popen(cmd, "r");
if ( sigprocmask ( SIG_SETMASK , &oldSigs, NULL ) < 0 ) {
log("build: had error unblocking signals for popen");
}
g_loop.enableTimer();
return fh;
}
//calling with a 0 niceness will turn off the timer interrupt

2
Loop.h
View File

@ -19,6 +19,8 @@
#define QUICKPOLL_INTERVAL 10
int gbsystem(char *cmd);
FILE* gbpopen(char* cmd);
#define sleep(a) { char *xx=NULL;*xx=0; }
//#define sleep(a) logf(LOG_INFO,"sleep: sleep");

View File

@ -788,5 +788,5 @@ install-pkgs-local:
warcinjector:
-rm -r /home/zak/.pex/build/inject-*
-rm -r /home/zak/.pex/install/inject-*
cd script && pex -v . requests pyopenssl ndg-httpsclient pyasn1 multiprocessing flask -e inject -o warc-inject --inherit-path --no-wheel
cd script && pex -v . gevent gevent-socketio requests pyopenssl ndg-httpsclient pyasn1 multiprocessing -e inject -o warc-inject --inherit-path --no-wheel

View File

@ -295,8 +295,9 @@ bool Msg13::forwardRequest ( ) {
// get that host
//h = g_hostdb.getProxy ( hostId );;
h = g_hostdb.getHost ( hostId );
// stop if he is alive
if ( ! g_hostdb.isDead ( h ) ) break;
// stop if he is alive and able to spider
if ( h->m_spiderEnabled && ! g_hostdb.isDead ( h ) ) break;
// get the next otherwise
if ( ++hostId >= nh ) hostId = 0;
}
@ -1209,6 +1210,11 @@ bool ipWasBanned ( TcpSocket *ts , const char **msg , Msg13Request *r ) {
*msg = "status 999 request denied";
return true;
}
// let's add this new one
if ( httpStatus == 503 ) {
*msg = "status 503 service unavailable";
return true;
}
// if it has link to "google.com/recaptcha"
// TODO: use own gbstrstr so we can do QUICKPOLL(niceness)

View File

@ -189,9 +189,10 @@ bool Msg20::getSummary ( Msg20Request *req ) {
Host *hh = &allHosts[i];
// skip if dead
if ( g_hostdb.isDead(hh) ) continue;
// NEVER add a noquery host to the candidate list, even
// if the query host is dead
if ( ! hh->m_queryEnabled ) continue;
// Respect no-spider, no-query directives from hosts.conf
if ( !req->m_getLinkInfo && ! hh->m_queryEnabled ) continue;
if ( req->m_getLinkInfo && ! hh->m_spiderEnabled ) continue;
// add it if alive
cand[nc++] = hh;
}

View File

@ -158,45 +158,8 @@ bool Msg22::getTitleRec ( Msg22Request *r ,
firstHostId = hosts [ hostNum ].m_hostId ;
*/
// get our group
int32_t allNumHosts = g_hostdb.getNumHostsPerShard();
Host *allHosts = g_hostdb.getShard ( shardNum );//Group ( groupId );
int32_t firstHostId = g_hostdb.getLeastLoadedInShard ( shardNum )->m_hostId;
// put all alive hosts in this array
Host *cand[32];
int64_t nc = 0;
for ( int32_t i = 0 ; i < allNumHosts ; i++ ) {
// get that host
Host *hh = &allHosts[i];
// skip if dead
if ( g_hostdb.isDead(hh) ) continue;
// add it if alive
cand[nc++] = hh;
}
// if none alive, make them all candidates then
bool allDead = (nc == 0);
for ( int32_t i = 0 ; allDead && i < allNumHosts ; i++ )
cand[nc++] = &allHosts[i];
// route based on docid region, not parity, because we want to hit
// the urldb page cache as much as possible
int64_t sectionWidth =((128LL*1024*1024)/nc)+1;//(DOCID_MASK/nc)+1LL;
// we mod by 1MB since tied scores resort to sorting by docid
// so we don't want to overload the host responsible for the lowest
// range of docids. CAUTION: do this for msg22 too!
// in this way we should still ensure a pretty good biased urldb
// cache...
// . TODO: fix the urldb cache preload logic
int32_t hostNum = (docId % (128LL*1024*1024)) / sectionWidth;
if ( hostNum < 0 ) hostNum = 0; // watch out for negative docids
if ( hostNum >= nc ) { char *xx = NULL; *xx = 0; }
int32_t firstHostId = cand [ hostNum ]->m_hostId ;
// while this prevents tfndb seeks, it also causes bottlenecks
// if one host is particularly slow, because load balancing is
// bypassed.
//if ( ! g_conf.m_useBiasedTfndb ) firstHostId = -1;
// flag it
m_outstanding = true;
r->m_inUse = 1;

View File

@ -83,7 +83,7 @@ static bool gotSummaryWrapper ( void *state );
bool isSubDom(char *s , int32_t len);
Msg40::Msg40() {
m_firstTime = true;
m_calledFacets = false;
m_doneWithLookup = false;
m_socketHadError = 0;
m_buf = NULL;
@ -1762,7 +1762,7 @@ bool gotSummaryWrapper ( void *state ) {
THIS->m_numReplies,
THIS->m_msg3a.m_numDocIds);
// it returns false if we're still awaiting replies
if ( ! THIS->gotSummary ( ) ) return false;
if ( ! THIS->m_calledFacets && ! THIS->gotSummary ( ) ) return false;
// lookup facets
if ( THIS->m_si &&
! THIS->m_si->m_streamResults &&
@ -6285,8 +6285,8 @@ bool Msg40::lookupFacets ( ) {
if ( m_doneWithLookup ) return true;
if ( m_firstTime ) {
m_firstTime = false;
if ( !m_calledFacets ) {
m_calledFacets = true;
m_numMsg20sOut = 0;
m_numMsg20sIn = 0;
m_j = 0;

View File

@ -223,7 +223,7 @@ class Msg40 {
bool m_doneWithLookup;
HashTableX m_facetTextTable;
SafeBuf m_facetTextBuf;
bool m_firstTime;
bool m_calledFacets;
int32_t m_omitCount;
bool printFacetTables ( class SafeBuf *sb ) ;

View File

@ -607,6 +607,11 @@ loop:
// debug msg
//log("Multicast:: no hosts left to send to");
g_errno = ENOHOSTS; return false; }
// log("build: msg %x sent to host %"INT32 " first hostId is %"INT32
// " oustanding msgs %"INT32,
// m_msgType, i, firstHostId, m_hostPtrs[i]->m_numOutstandingRequests);
// . send to this guy, if we haven't yet
// . returns false and sets g_errno on error
// . if it returns true, we sent ok, so we should return true

View File

@ -760,6 +760,10 @@ void handleRequest7 ( UdpSlot *slot , int32_t netnice ) {
s_injectHead = xd;
s_injectTail = xd;
}
if(ir->ptr_content && ir->ptr_content[ir->size_content - 1]) {
// XmlDoc expects this buffer to be null terminated.
char *xx=NULL;*xx=0;
}
if ( ! xd->injectDoc ( ir->ptr_url , // m_injectUrlBuf.getBufStart() ,
cr ,
@ -790,7 +794,8 @@ void handleRequest7 ( UdpSlot *slot , int32_t netnice ) {
ir->m_injectDocIp ,
ir->ptr_contentDelim,
ir->ptr_metadata,
ir->size_metadata
ir->size_metadata,
ir->size_content - 1 // there should be a null in that last byte
) )
// we blocked...
return;

View File

@ -149,6 +149,15 @@ bool sendPageGraph ( TcpSocket *s, HttpRequest *r ) {
return true;
}
void genStatsDataset(SafeBuf *buf, StateStatsdb *st) {
if ( ! g_conf.m_useStatsdb ) {
buf->safePrintf("{\"error\":\"statsdb disabled\"}\n" );
return;
}
}
static void writeControls ( SafeBuf *buf, StateStatsdb *st ) ;
void genStatsGraphTable(SafeBuf *buf, StateStatsdb *st) {
if ( ! g_conf.m_useStatsdb )
@ -208,6 +217,10 @@ void sendReply ( void *state ) {
TcpSocket *s = st->m_socket;
if(st->m_request.getLong("json", 0)) {
//xxxxxxxxxxxxxxxxxxxxxxxxx
}
if(st->m_request.getLong("justgraph", 0)) {
SafeBuf buf( 1024*32 , "tmpbuf0" );
genStatsGraphTable(&buf, st);

View File

@ -10956,7 +10956,7 @@ void Parms::init ( ) {
m->m_cgi = "errstrone";
m->m_off = (char *)&g_conf.m_errstr1 - g;
m->m_type = TYPE_STRING;
m->m_def = "";
m->m_def = "I/O error";
m->m_size = MAX_URL_LEN;
m->m_priv = 2;
m->m_page = PAGE_MASTER;
@ -16560,9 +16560,10 @@ void Parms::init ( ) {
m->m_flags = PF_CLONE;
m++;
m->m_title = "use robots.txt";
m->m_title = "obey robots.txt";
m->m_xml = "useRobotstxt";
m->m_desc = "If this is true Gigablast will respect "
"the robots.txt convention.";
"the robots.txt convention and rel no follow meta tags.";
m->m_cgi = "obeyRobots";
m->m_off = (char *)&cr.m_useRobotsTxt - x;
m->m_type = TYPE_BOOL;
@ -16572,6 +16573,18 @@ void Parms::init ( ) {
m->m_flags = PF_CLONE;
m++;
m->m_title = "obey rel no follow links";
m->m_desc = "If this is true Gigablast will respect "
"the rel no follow link attribute.";
m->m_cgi = "obeyRelNoFollow";
m->m_off = (char *)&cr.m_obeyRelNoFollowLinks - x;
m->m_type = TYPE_BOOL;
m->m_def = "1";
m->m_page = PAGE_SPIDER;
m->m_obj = OBJ_COLL;
m->m_flags = PF_CLONE;
m++;
m->m_title = "max robots.txt cache age";
m->m_desc = "How many seconds to cache a robots.txt file for. "
"86400 is 1 day. 0 means Gigablast will not read from the "

View File

@ -860,6 +860,18 @@ void doneCmdWrapper ( void *state ) {
void hdtempWrapper ( int fd , void *state ) {
// current local time
int32_t now = getTime();
// from SpiderProxy.h
static int32_t s_lastTime = 0;
if ( ! s_lastTime ) s_lastTime = now;
// reset spider proxy stats every hour to alleviate false positives
if ( now - s_lastTime >= 3600 ) {
s_lastTime = now;
resetProxyStats();
}
// also download test urls from spider proxies to ensure they
// are up and running properly
downloadTestUrlFromProxies();
@ -870,8 +882,6 @@ void hdtempWrapper ( int fd , void *state ) {
if ( g_process.m_threadOut ) return;
// skip if exiting
if ( g_process.m_mode == EXIT_MODE ) return;
// current local time
int32_t now = getTime();
// or if haven't waited int32_t enough
if ( now < s_nextTime ) return;
@ -1542,7 +1552,7 @@ bool Process::shutdown2 ( ) {
// make sure they are in a saveable state. we need to make sure
// they have dumped out the latest merged list and updated the
// appropriate RdbMap so we can save it below
// appropriate RdbMap so we can save it below.
bool wait = false;
if ( g_merge.m_isMerging && ! g_merge.m_isReadyToSave ) wait = true;
if ( g_merge2.m_isMerging && ! g_merge2.m_isReadyToSave ) wait = true;
@ -1550,7 +1560,9 @@ bool Process::shutdown2 ( ) {
if ( isRdbDumping() ) wait = true;
// . wait for the merge or dump to complete
// . but NOT if urgent...
if ( wait && ! m_urgent ) return false;
// . this stuff holds everything up too long, take out, we already
// wait for write threads to complete, that should be good enough
//if ( wait && ! m_urgent ) return false;
// . disable adds/deletes on all rdb trees
// . Msg1 requests will get ECLOSING error msgs
@ -1642,7 +1654,7 @@ bool Process::shutdown2 ( ) {
log("gb: Dumping core after saving.");
// at least destroy the page caches that have shared memory
// because they seem to not clean it up
resetPageCaches();
//resetPageCaches();
// let's ensure our core file can dump
struct rlimit lim;

View File

@ -1821,12 +1821,16 @@ Profiler::printRealTimeInfo(SafeBuf *sb,
//,coll,
// rtall, showMessage);
);
sb->safePrintf("<a href=\"/admin/profiler?c=%s&rtstop=1\">"
"(Stop)</a> [Click refresh to get latest profile "
"stats][Don't forget to click STOP when done so you "
"don't leave the profiler running which can slow "
"things down.]</b></td></tr>\n",
coll);
sb->safePrintf(
// "<a href=\"/admin/profiler?c=%s&rtstop=1\">"
// "(Stop)</a> [Click refresh to get latest profile "
// "stats][Don't forget to click STOP when done so you "
// "don't leave the profiler running which can slow "
//"things down.]"
"</b>"
"</td></tr>\n"
//,coll
);
/*
rtall = !rtall;
@ -2138,7 +2142,9 @@ Profiler::printRealTimeInfo(SafeBuf *sb,
*/
// just leave it off if we printed something. but if we just
// turn the profiler on then m_ipBuf will be empty so start it
if ( m_ipBuf.length() == 0 )
g_profiler.startRealTimeProfiler();
return true;

View File

@ -1858,7 +1858,6 @@ void attemptMergeAll2 ( ) {
if ( base && base->attemptMerge(niceness,force,true) )
return;
// try next collection
s_lastCollnum++;

View File

@ -1433,10 +1433,14 @@ bool RdbMap::generateMap ( BigFile *f ) {
log("db: Generating map for %s/%s",f->getDir(),f->getFilename());
// we don't support headless datafiles right now
if ( ! f->doesPartExist(0) ) {
bool allowHeadless = true;
if ( m_fixedDataSize != 0 ) allowHeadless = false;
if ( m_ks != 18 ) allowHeadless = false;
// allow posdb to go through
if ( ! f->doesPartExist(0) && ! allowHeadless ) {
g_errno = EBADENGINEER;
return log("db: Cannot generate map for "
"headless data files yet.");
"this headless data file yet");
}
// scan through all the recs in f
int64_t offset = 0;
@ -1445,6 +1449,19 @@ bool RdbMap::generateMap ( BigFile *f ) {
if ( fileSize == 0 ) return true;
// g_errno should be set on error
if ( fileSize < 0 ) return false;
// find first existing part file
bool firstRead = true;
int32_t fp = 0;
for ( ; ; fp++ )
// stop when the part file exists
if ( f->doesPartExist(fp) ) break;
if ( fp > 0 ) {
//m_fileStartOffset = MAX_PART_SIZE * fp;
offset = MAX_PART_SIZE * fp;
}
// don't read in more than 10 megs at a time initially
int64_t bufSize = fileSize;
if ( bufSize > 10*1024*1024 ) bufSize = 10*1024*1024;
@ -1501,8 +1518,53 @@ bool RdbMap::generateMap ( BigFile *f ) {
"offset=%"INT64". Map generation failed.",
bufSize,f->getFilename(),offset);
}
// set the list
RdbList list;
// if we were headless then first key on that page could be cut
if ( fp > 0 && firstRead ) {
firstRead = false;
// scan the buffer to find the right key.
int32_t fullKeyOff = findNextFullPosdbKeyOffset (buf,readSize);
// if none found, bail
if ( fullKeyOff < 0 )
return log("rdbmap: could not get a full key in the "
"first %"INT64" bytes read of headless "
"file",readSize);
// for each page before add a -1 entry i guess
int32_t p = 0;
int32_t pageNum = 0;
for ( ; p + m_pageSize < fullKeyOff ; p += m_pageSize ) {
// add a dummy entry indicating a continuation of
// a previous thing. we never had the full posdb key
// so we don't know what the top 6 bytes were so
// just stick -1 in there
setOffset (pageNum, -1 );
setKey (pageNum , key );
pageNum++;
}
// tell rdbmap where "list" occurs in the big file
m_offset = offset + fullKeyOff;
// now the offset on this page
//int32_t pageOffset = p - off;
// must be less than key size
//if ( pageOffset > m_ks ) { char *xx=NULL;*xx=0; }
// set the list special here
list.set ( buf + fullKeyOff ,
readSize - fullKeyOff ,
buf ,
readSize ,
startKey ,
endKey ,
m_fixedDataSize ,
false , // own data?
//m_useHalfKeys );
m_useHalfKeys ,
m_ks );
}
else {
// set the list
list.set ( buf ,
readSize ,
buf ,
@ -1514,6 +1576,8 @@ bool RdbMap::generateMap ( BigFile *f ) {
//m_useHalfKeys );
m_useHalfKeys ,
m_ks );
}
// . HACK to fix useHalfKeys compression thing from one read to the nxt
// . "key" should still be set to the last record we read last read
//if ( offset > 0 ) list.m_listPtrHi = ((char *)&key)+6;
@ -1732,3 +1796,150 @@ bool RdbMap::truncateFile ( BigFile *f ) {
// success
return true;
}
int64_t RdbMap::findNextFullPosdbKeyOffset ( char *buf, int32_t bufSize ) {
char *p = buf;
char *lastKeyLo;
char *lastKeyMe;
char *lastKeyHi;
int32_t keyCount;
char *bufEnd = buf + bufSize;
int32_t numWinners = 0;
int64_t winnerOffset = -1;
// try an offset of 0
int64_t tryOffset = -2;
bool printed;
int64_t firstFullKeyOff;
offsetLoop:
printed = false;
firstFullKeyOff = -1;
tryOffset += 2;
// only need to try 18 of them
if ( tryOffset >= 18 )
goto done;
keyCount = 0;
lastKeyLo = NULL;
lastKeyMe = NULL;
lastKeyHi = NULL;
keyLoop:
// posdbkey
//key144_t *kp = (key144_t *)p;
if ( p + 18 >= bufEnd )
goto bufExhausted;
char lastKey[18];
if ( lastKeyHi ) {
memcpy ( lastKey , lastKeyLo , 6 );
memcpy ( lastKey + 6 , lastKeyMe , 6 );
memcpy ( lastKey + 12 , lastKeyHi , 6 );
}
char thisKey[18];
// get lower compression bits
if ( (p[0] & 0x04) ) {
// make the full key to compare
if ( lastKeyHi ) {
memcpy ( thisKey , p , 6 );
memcpy ( thisKey + 6 , lastKeyMe , 6 );
memcpy ( thisKey + 12 , lastKeyHi , 6 );
if ( KEYCMP ( lastKey , thisKey , m_ks ) >= 0 ) {
log("rdbmap: key out of order 1 tryoff of %i",
(int)tryOffset);
goto offsetLoop;
}
keyCount++;
//log("rdbmap: good key6 tryoff %i",(int)tryOffset);
}
lastKeyLo = p;
p += 6;
goto keyLoop;
}
// a 12 byte key?
if ( (p[0] & 0x02) ) {
// make the full key to compare
if ( lastKeyHi ) {
memcpy ( thisKey , p , 12 );
memcpy ( thisKey + 12 , lastKeyHi , 6 );
if ( KEYCMP ( lastKey , thisKey , m_ks ) >= 0 ) {
log("rdbmap: key out of order 2 @ %i "
"tryoff of %i",
(int)(p-buf),(int)tryOffset);
goto offsetLoop;
}
keyCount++;
//log("rdbmap: good key12 tryoff %i",(int)tryOffset);
}
lastKeyLo = p;
lastKeyMe = p + 6;
p += 12;
goto keyLoop;
}
// did we have a key before us?
if ( lastKeyHi && KEYCMP ( lastKey , p , 18 ) >= 0 ) {
log("rdbmap: tryoffset of %i is bogus",(int)tryOffset);
// keys out of order must not be a good 'tryOffset'
goto offsetLoop;
}
// ensure it is valid with alignment bits
keyCount++;
if ( ! printed ) {
log("rdbmap: good key18 @ %i tryoff %i",
(int)(p-buf),(int)tryOffset);
printed = true;
firstFullKeyOff = p-buf;
}
lastKeyLo = p;
lastKeyMe = p + 6;
lastKeyHi = p + 12;
p += 18;
// if ( keyCount >= 1000 ) {
// log("rdbmap: got good tryoffset of %i",(int)tryOffset);
// goodTry = tryOffset;
// goodOnes++;
// goto offsetLoop;
// }
goto keyLoop;
bufExhausted:
// only one key compared successfully, forget it then, not a winner
if ( keyCount > 1 ) {
// got a winner?
numWinners++;
winnerOffset = firstFullKeyOff;
log("rdbmap: got winner @ %i at tryoff %i (keycount=%i)",
(int)winnerOffset,(int)tryOffset,(int)keyCount);
}
goto offsetLoop;
done:
if ( numWinners != 1 ) {
log("rdbmap: could not figure out offset of first full key "
"in headless posdb file (numWinners=%i)",
(int)numWinners);
return -1;
}
return winnerOffset;
}

View File

@ -136,6 +136,8 @@ class RdbMap {
// get the size of the file we are mapping
int64_t getFileSize () { return m_offset; };
int64_t findNextFullPosdbKeyOffset ( char *buf, int32_t bufSize ) ;
// . gets total size of all recs in this page range
// . if subtract is true we subtract the sizes of pages that begin
// with a delete key (low bit is clear)

View File

@ -635,6 +635,13 @@ void RdbMerge::doneMerging ( ) {
// then the rdbbase should be NULL i guess.
if ( saved == ENOCOLLREC ) return;
// if we are exiting then dont bother renaming the files around now.
// this prevents a core in RdbBase::incorporateMerge()
if ( g_process.m_mode == EXIT_MODE ) {
log("merge: exiting. not ending merge.");
return;
}
// get base, returns NULL and sets g_errno to ENOCOLLREC on error
RdbBase *base; if (!(base=getRdbBase(m_rdbId,m_collnum))) return;
// pass g_errno on to incorporate merge so merged file can be unlinked

View File

@ -198,6 +198,15 @@ bool SafeBuf::safeMemcpy ( Words *w , int32_t a , int32_t b ) {
return safeMemcpy ( p , pend - p );
}
char* SafeBuf::pushStr (char* str, uint32_t len) {
int32_t initLen = m_length;
bool status = safeMemcpy ( str , len );
status &= nullTerm();
m_length++; //count the null so it isn't overwritten
if(!status) return NULL;
return m_buf + initLen;
}
bool SafeBuf::pushPtr ( void *ptr ) {
if ( m_length + (int32_t)sizeof(char *) > m_capacity )
if(!reserve(sizeof(char *)))//2*m_capacity + 1))

View File

@ -348,6 +348,7 @@ public:
// hack off trailing 0's
bool printFloatPretty ( float f ) ;
char* pushStr (char* str, uint32_t len);
bool pushPtr ( void *ptr );
bool pushLong (int32_t i);
bool pushLongLong (int64_t i);

View File

@ -53,7 +53,12 @@ void testWinnerTreeKey ( ) ;
#define SPIDER_DONE_TIMER 20
// seems like timecity.com as gigabytes of spiderdb data so up from 40 to 400
#define MAX_WINNER_NODES 400
//#define MAX_WINNER_NODES 400
// up it to 2000 because shard #15 has slow disk reads and some collections
// are taking forever to spider because the spiderdb scan is so slow.
// we reduce this below if the spiderdb is smaller.
#define MAX_WINNER_NODES 2000
Doledb g_doledb;
@ -2204,10 +2209,10 @@ bool SpiderColl::addSpiderRequest ( SpiderRequest *sreq ,
int64_t nowGlobalMS ) {
// don't add negative keys or data less thangs
if ( sreq->m_dataSize <= 0 ) {
if ( g_conf.m_logDebugSpider )
//if ( g_conf.m_logDebugSpider )
log("spider: add spider request is dataless for "
"uh48=%"UINT64"",sreq->getUrlHash48());
char *xx=NULL;*xx=0;
//char *xx=NULL;*xx=0;
return true;
}
@ -3579,7 +3584,11 @@ bool SpiderColl::evalIpLoop ( ) {
&doleBuf,
&doleBufSize ,
false, // doCopy?
600, // maxAge, 600 seconds
// we raised MAX_WINNER_NODES so
// grow from 600 to 1200
// (10 mins to 20 mins) to make
// some crawls faster
1200, // maxAge, 600 seconds
true ,// incCounts
&cachedTimestamp , // rec timestamp
true ); // promote rec?
@ -5262,14 +5271,14 @@ bool SpiderColl::addWinnersIntoDoledb ( ) {
// i am seeing dup uh48's in the m_winnerTree
int32_t firstIp = m_waitingTreeKey.n0 & 0xffffffff;
char dbuf[147456];//3*MAX_WINNER_NODES*(8+1)];
//char dbuf[147456];//3*MAX_WINNER_NODES*(8+1)];
HashTableX dedup;
int32_t ntn = m_winnerTree.getNumNodes();
dedup.set ( 8,
0,
(int32_t)2*ntn, // # slots to initialize to
dbuf,
147456,//(int32_t)(3*MAX_WINNER_NODES*(8+1)),
NULL,//dbuf,
0,//147456,//(int32_t)(3*MAX_WINNER_NODES*(8+1)),
false,
MAX_NICENESS,
"windt");
@ -6365,6 +6374,33 @@ void doneSleepingWrapperSL ( int fd , void *state ) {
// set initial priority to the highest to start spidering there
//g_spiderLoop.m_pri = MAX_SPIDER_PRIORITIES - 1;
// if recently called, do not call again from the sleep wrapper
int64_t nowms = gettimeofdayInMillisecondsLocal();
if ( nowms - g_spiderLoop.m_lastCallTime < 50 )
return;
// if we have a ton of collections, reduce cpu load from calling
// spiderDoledUrls()
static uint64_t s_skipCount = 0;
s_skipCount++;
// so instead of every 50ms make it every 200ms if we got
// 100+ collections in use.
//CollectionRec *tmp = g_spiderLoop.getActiveList();
g_spiderLoop.getActiveList();
int32_t activeListCount = g_spiderLoop.m_activeListCount;
if ( ! g_spiderLoop.m_activeListValid )
activeListCount = 0;
int32_t skip = 1;
if ( activeListCount >= 50 )
skip = 2;
if ( activeListCount >= 100 )
skip = 4;
if ( activeListCount >= 200 )
skip = 8;
if ( ( s_skipCount % skip ) != 0 )
return;
// spider some urls that were doled to us
g_spiderLoop.spiderDoledUrls( );
}
@ -6626,6 +6662,8 @@ void SpiderLoop::spiderDoledUrls ( ) {
// log("spider: trying to get a doledb rec to spider. "
// "currentnumout=%"INT32"",m_numSpidersOut);
m_lastCallTime = gettimeofdayInMillisecondsLocal();
// when getting a lock we keep a ptr to the SpiderRequest in the
// doledb list, so do not try to read more just yet until we know
// if we got the lock or not
@ -7548,6 +7586,10 @@ bool SpiderLoop::gotDoledbList2 ( ) {
XmlDoc *xd = m_docs[i];
if ( ! xd ) continue;
if ( ! xd->m_sreqValid ) continue;
// to prevent one collection from hogging all the urls for
// particular IP and starving other collections, let's make
// this a per collection count.
// then allow msg13.cpp to handle the throttling on its end.
// also do a global count over all collections now
if ( xd->m_sreq.m_firstIp == sreq->m_firstIp ) globalOut++;
// only count for our same collection otherwise another
@ -14786,6 +14828,8 @@ void SpiderLoop::buildActiveList ( ) {
m_activeListValid = true;
m_activeListCount = 0;
// reset the linked list of active collections
m_activeList = NULL;
bool found = false;
@ -14832,6 +14876,8 @@ void SpiderLoop::buildActiveList ( ) {
cr->m_isActive = true;
m_activeListCount++;
if ( cr == m_crx ) found = true;

View File

@ -1615,9 +1615,12 @@ class SpiderLoop {
CollectionRec *m_bookmark;
bool m_activeListValid;
bool m_activeListModified;
int32_t m_activeListCount;
uint32_t m_recalcTime;
bool m_recalcTimeValid;
int64_t m_lastCallTime;
int64_t m_doleStart;
int32_t m_processed;

View File

@ -880,9 +880,6 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
int64_t nowms = gettimeofdayInMillisecondsLocal();
// winner count update
winnersp->m_timesUsed++;
// add a new load bucket then!
LoadBucket bb;
bb.m_urlIp = urlIp;
@ -897,9 +894,13 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
bb.m_proxyPort = winnersp->m_port;
// a new id. we use this to update the downloadEndTime when done
static int32_t s_lbid = 0;
bb.m_id = s_lbid++;
// add it now
// add it now, iff not for passing to diffbot backend
if ( preq->m_opCode != OP_GETPROXYFORDIFFBOT ) {
s_loadTable.addKey ( &urlIp , &bb );
bb.m_id = s_lbid++;
// winner count update
winnersp->m_timesUsed++;
}
// sanity
if ( (int32_t)sizeof(ProxyReply) > TMPBUFSIZE ){char *xx=NULL;*xx=0;}
@ -934,8 +935,8 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
// top:
// now remove old entries from the load table. entries that
// have completed and have a download end time more than 10 mins ago
for ( int32_t i = 0 ; i < s_loadTable.getNumSlots() ; i++ ) {
// have completed and have a download end time more than 10 mins ago.
for ( int32_t i = s_loadTable.getNumSlots() - 1 ; i >= 0 ; i-- ) {
// skip if empty
if ( ! s_loadTable.m_flags[i] ) continue;
// get the bucket
@ -956,7 +957,7 @@ void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
// mis out on analyzing any keys if we just keep looping here
// should we? TODO: figure it out. if we miss a few it's not
// a big deal.
i--;
//i--;
//goto top;
}

View File

@ -71,6 +71,9 @@ class SpiderProxy *getSpiderProxyByIpPort ( int32_t ip , uint16_t port ) ;
// value for m_opCode. tell host #0 we are done using a proxy:
#define OP_RETPROXY 2
// do not do load balancing for this request:
#define OP_GETPROXYFORDIFFBOT 3
// ask host #0 for a proxy to use:
// we now just use Msg13Request for this...
//class ProxyRequest {
@ -91,7 +94,7 @@ public:
// id of the transaction
int32_t m_lbId;
// proxy port to use
int16_t m_proxyPort;
int32_t m_proxyPort;
// if this proxy fails us are there more proxies to try?
bool m_hasMoreProxiesToTry;
// how many proxies do we have that are banned by the urlip?

View File

@ -2803,24 +2803,10 @@ bool Msg8a::launchGetRequests ( ) {
//uint32_t gid = g_hostdb.getGroupId ( m_rdbId , &startKey , true );
//Host *group = g_hostdb.getGroup ( gid );
int32_t shardNum = getShardNum ( m_rdbId , &startKey );//, true );
Host *group = g_hostdb.getShard ( shardNum );
//int32_t numTwins = g_hostdb.getNumHostsPerShard();
// use top byte!
uint8_t *sks = (uint8_t *)&startKey;
uint8_t top = sks[sizeof(TAGDB_KEY)-1];
//int32_t hostNum = 0;
//if ( numTwins == 2 && (top & 0x80) ) hostNum = 1;
// TODO: fix this!
//if ( numTwins >= 3 ) { char *xx=NULL;*xx=0; }
// support more than 2 stripes now...
int32_t hostNum = top % g_hostdb.getNumHostsPerShard();
int32_t hostId = group[hostNum].m_hostId;
int32_t firstHostId = g_hostdb.getLeastLoadedInShard ( shardNum )->m_hostId;
// . launch this request, even if to ourselves
// . TODO: just use msg0!!
bool status = m->getList ( hostId , // hostId
bool status = m->getList ( firstHostId , // hostId
0 , // ip
0 , // port
0 , // maxCacheAge
@ -2837,7 +2823,7 @@ bool Msg8a::launchGetRequests ( ) {
true , // error correction?
true , // include tree?
true , // doMerge?
-1 , // firstHostId
firstHostId , // firstHostId
0 , // startFileNum
-1 , // numFiles
3600*24*365 );// timeout

View File

@ -3316,4 +3316,3 @@ void Threads::printState() {
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -485,7 +485,9 @@ class XmlDoc {
// for container docs, what is the separator of subdocs?
char *contentDelim = NULL,
char *metadata = NULL,
uint32_t metadataLen = 0) ;
uint32_t metadataLen = 0,
// for injected docs we have the recv, buffer size don't exceed that
int32_t payloadLen = -1) ;
// we now call this right away rather than at download time!
int32_t getSpideredTime();
@ -513,7 +515,9 @@ class XmlDoc {
bool indexDoc2 ( );
bool isContainerDoc ( );
bool indexContainerDoc ( );
bool indexWarcOrArc ( char ct ) ;
bool readMoreWarc();
bool indexWarcOrArc ( ) ;
key_t *getTitleRecKey() ;
//char *getSkipIndexing ( );
char *prepareToMakeTitleRec ( ) ;
@ -706,7 +710,7 @@ class XmlDoc {
char **getExpandedUtf8Content ( ) ;
char **getUtf8Content ( ) ;
// we download large files to a file on disk, like warcs and arcs
BigFile *getUtf8ContentInFile ( int64_t *fileSizeArg );
FILE *getUtf8ContentInFile ( );
int32_t *getContentHash32 ( ) ;
int32_t *getContentHashJson32 ( ) ;
//int32_t *getTagHash32 ( ) ;
@ -1088,12 +1092,16 @@ class XmlDoc {
int32_t m_warcError ;
int32_t m_arcError ;
bool m_doneInjectingWarc ;
bool m_doneInjectingArc ;
int64_t m_fileOff ;
int64_t m_bytesStreamed;
char *m_fileBuf ;
int32_t m_fileBufAllocSize;
bool m_registeredWgetReadCallback;
char *m_fptr ;
char *m_fptrEnd ;
FILE* m_pipe;
BigFile m_file;
int64_t m_fileSize;
FileState m_fileState;
@ -1495,6 +1503,7 @@ class XmlDoc {
bool m_imageUrl2Valid;
bool m_matchOffsetsValid;
bool m_queryValid;
bool m_diffbotProxyReplyValid;
bool m_matchesValid;
bool m_dbufValid;
bool m_titleValid;
@ -1696,6 +1705,8 @@ class XmlDoc {
bool m_isChildDoc;
Msg13 m_msg13;
Msg13Request m_msg13Request;
Msg13Request m_diffbotProxyRequest;
ProxyReply *m_diffbotProxyReply;
bool m_isSpiderProxy;
// for limiting # of iframe tag expansions
int32_t m_numExpansions;
@ -2470,7 +2481,8 @@ class XmlDoc {
// for container docs consisting of subdocs to inject
char *contentDelim = NULL,
char* metadata = NULL,
uint32_t metadataLen = 0);
uint32_t metadataLen = 0,
int32_t payloadLen = -1);
bool injectLinks ( HashTableX *linkDedupTable ,

View File

@ -11,15 +11,14 @@ import sqlite3
import datetime
import sys
import time
import flask
# import flask
import signal, os
import random
from itertools import repeat
staleTime = datetime.timedelta(90,0,0) # three month for now
app = flask.Flask(__name__)
app.secret_key = 'oaisj84alwsdkjhf9238u'
staleTime = datetime.timedelta(7,0,0) # one week for now
# app = flask.Flask(__name__)
# app.secret_key = 'oaisj84alwsdkjhf9238u'
def getDb(makeDates=True):
if makeDates:
@ -35,6 +34,9 @@ def handler(signum, frame):
#Generate environment with:
#pex -r requests -r multiprocessing -e inject:main -o warc-inject -s '.' --no-wheel
#pex -r requests -r multiprocessing -o warc-inject
# see the Makefile
# TODO: add argument parser
# import argparse
# parser = argparse.ArgumentParser()
# parser.add_argument('--foo', help='foo help')
@ -65,13 +67,16 @@ def reallyExecuteMany(c, query, qargs):
def injectItem(item, db, mode):
itemStart = time.time()
c = db.cursor()
res = reallyExecute(c, 'select * from items where item = ?', (item,)).fetchone()
db.commit()
itemId = None
if res:
if res[1] > (datetime.datetime.now() - staleTime):
print 'skipping %s because we checked recently' % item
return 0 # We checked recently
return time.time() - itemStart # We checked recently
itemId = res[0]
@ -93,11 +98,12 @@ def injectItem(item, db, mode):
db.commit()
if 'files' not in md:
return
time.time() - itemStart
res = None
res = reallyExecute(c, "select fileName, updated, status, took from files where itemId = ?",
(itemId,)).fetchall()
db.commit()
lastUpdate = {}
for fileName, updated, status, took in res:
@ -107,13 +113,18 @@ def injectItem(item, db, mode):
dbUpdates = []
skipped = 0
for ff in md['files']:
if not ff['name'].endswith('arc.gz'): continue
warcs = filter(lambda x: 'name' in x and x['name'].endswith and x['name'].endswith('arc.gz'), md['files'])
collectionName = md['metadata'].get('archiveit-collection-name', '')
for ii, ff in enumerate(warcs):
#if not ff['name'].endswith('arc.gz'): continue
itemMetadata = {'mtime':ff['mtime']}
updateTime = datetime.datetime.fromtimestamp(float(ff['mtime']))
if ff['name'] in lastUpdate and updateTime <= lastUpdate[ff['name']]:
if mode != 'force' and ff['name'] in lastUpdate and updateTime <= lastUpdate[ff['name']]:
print "skip {0} because it is up to date".format(ff['name'])
skipped += 1
requests.post('http://localhost:10008/progress',
json={'item':item, 'total':len(warcs), 'done':ii+1,
'collection-name':collectionName})
continue
itemMetadata.update(md['metadata'])
@ -123,7 +134,10 @@ def injectItem(item, db, mode):
'c':'ait',
'spiderlinks':0}
start = time.time()
if mode == 'production':
if mode == 'testing':
time.sleep(random.randint(1,4))
statusCode = 999
else:
try:
rp = requests.post("http://localhost:8000/admin/inject", postVars)
statusCode = rp.status_code
@ -132,27 +146,33 @@ def injectItem(item, db, mode):
print 'error: gb inject', postVars['url'], e
statusCode = -1
#print postVars['url'], rp.status_code
else:
time.sleep(random.randint(1,4))
statusCode = 999
took = time.time() - start
print "sent", ff['name'],'to gb, took', took
sys.stdout.flush()
dbUpdates.append((itemId, ff['name'], updateTime, statusCode, took))
requests.post('http://localhost:10008/progress',
json={'item':item, 'total':len(warcs), 'done':ii+1,
'collection-name':collectionName})
if len(dbUpdates):
reallyExecuteMany(c, "DELETE FROM files where fileName = ? ", zip(lastUpdate.iterkeys()))
reallyExecuteMany(c, "INSERT INTO files VALUES (?,?,?,?,?)",
dbUpdates)
db.commit()
print 'completed %s with %s items injected and %s skipped' % (item, len(dbUpdates), skipped)
return time.time() - itemStart
def getPage(zippedArgs):
page, mode = zippedArgs
page, mode, resultsPerPage, extraQuery = zippedArgs
query = 'collection%3Aarchiveitdigitalcollection+' + extraQuery
#r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&rows=1&page={0}&output=json&save=yes'.format(page))
r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&sort[]=date+desc&rows=100&page={0}&output=json&save=yes'.format(page))
url = 'https://archive.org/advancedsearch.php?q={1}&fl%5B%5D=identifier&sort[]=date+asc&rows={2}&page={0}&output=json'.format(page, query, resultsPerPage)
try:
r = requests.get(url)
if r.status_code != 200:
return 0
@ -162,19 +182,24 @@ def getPage(zippedArgs):
numFound = jsonContents['response']['numFound']
if len(items) == 0:
requests.post('http://localhost:10008/progress', json={'total':numFound, 'completed':'', 'query':extraQuery})
print 'got 0 items for search page', page
return 0
print 'loading %s items, %s - %s of %s' % (len(items), items[0], items[-1], numFound)
db = getDb()
for item in items:
injectItem(item, db, mode)
db = getDb()
took = injectItem(item, db, mode)
db.close()
requests.post('http://localhost:10008/progress', json={'total':numFound,
'completed':item,
'query':extraQuery,
'took':took})
return len(items)
except Exception, e:
print 'Caught', e, 'sleep and retry', url
time.sleep(60)
return getPage(zippedArgs)
def dumpDb():
@ -200,6 +225,10 @@ def showItems():
def nuke(lastPid, fromOrbit=False):
try:
requests.post('http://localhost:10008/shutdown', {})
except:
pass
sig = signal.SIGTERM
if fromOrbit:
sig = signal.SIGKILL
@ -212,7 +241,7 @@ def nuke(lastPid, fromOrbit=False):
except:
pass
killed = subprocess.Popen("""kill `ps auxx |grep warc-inject|awk -e '{print $2}'`""" % sys.argv[0],
killed = subprocess.Popen("""kill `ps auxx |grep warc-inject|grep -v grep|awk -e '{print $2}'`""",
shell=True,stdout=subprocess.PIPE).communicate()[0]
if killed == 'Terminated':
@ -222,13 +251,47 @@ def nuke(lastPid, fromOrbit=False):
def main():
global staleTime
print 'arguments were', sys.argv, 'pid is', os.getpid()
if sys.argv[1] != 'monitor':
try:
lastPid = open('running.pid', 'r').read()
except:
lastPid = None
print 'arguments were', sys.argv, 'pid is', os.getpid()
open('running.pid', 'w').write(str(os.getpid()))
# p = multiprocessing.Process(target=serveForever)
# p.start()
if sys.argv[1] == 'test':
query = ''
if len(sys.argv) == 3:
query = sys.argv[2]
#subprocess.Popen(['python','inject', 'monitor'])
mode = 'testing'
runInjects(10, 'testing', query)
if sys.argv[1] == 'run':
query = ''
if len(sys.argv) == 4:
query = sys.argv[3]
#subprocess.Popen(['./warc-inject','monitor'])
threads = int(sys.argv[2])
runInjects(threads, 'production', query)
print "done running"
if len(sys.argv) == 2:
if sys.argv[1] == 'monitor':
import monitor
monitor.main()
if sys.argv[1] == 'init':
init()
print 'initialized'
@ -250,6 +313,8 @@ def main():
nuke(lastPid, fromOrbit=True)
if sys.argv[1] == 'test':
subprocess.Popen(['./warc-inject','monitor'])
mode = 'testing'
runInjects(10, 'testing')
@ -311,22 +376,20 @@ def main():
signal.alarm(0) # Disable the alarm
if sys.argv[1] == 'serve':
serveForever()
# if sys.argv[1] == 'serve':
# serveForever()
if len(sys.argv) == 3:
if sys.argv[1] == 'force':
itemName = sys.argv[2]
db = getDb()
injectItem(itemName, db, 'production')
sys.exit(0)
if len(sys.argv) == 4:
if sys.argv[1] == 'forcefile':
global staleTime
if sys.argv[1] == 'injectfile':
staleTime = datetime.timedelta(0,0,0)
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
@ -341,6 +404,24 @@ def main():
return ret
answer = pool.map(injectItemTupleWrapper, items)
print 'finished: ', answer
sys.exit(0)
if sys.argv[1] == 'forcefile':
staleTime = datetime.timedelta(0,0,0)
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
threads = int(sys.argv[3])
pool = ThreadPool(processes=threads)
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
def injectItemTupleWrapper(itemName):
db = getDb()
ret = injectItem(itemName, db, 'force')
db.close()
return ret
answer = pool.map(injectItemTupleWrapper, items)
print 'finished: ', answer
sys.exit(0)
if sys.argv[1] == 'injectitems':
@ -360,21 +441,41 @@ def main():
sys.exit(0)
if sys.argv[1] == 'run':
threads = int(sys.argv[2])
runInjects(threads)
def getNumResults(query):
query = 'collection%3Aarchiveitdigitalcollection+' + query
r = requests.get('https://archive.org/advancedsearch.php?q={0}&fl%5B%5D=identifier&sort[]=date+asc&rows=1&page=0&output=json'.format(query))
if r.status_code != 200:
return 0
contents = r.content
jsonContents = json.loads(contents)
numFound = jsonContents['response']['numFound']
return numFound
def runInjects(threads, mode='production'):
def runInjects(threads, mode='production', query=''):
from multiprocessing.pool import ThreadPool
import math
pool = ThreadPool(processes=threads)
try:
maxPages = 1300
answer = pool.map(getPage, zip(xrange(1,maxPages), repeat(mode, maxPages)))
totalResults = getNumResults(query)
resultsPerPage = 100
maxPages = int(math.ceil(totalResults / float(resultsPerPage)))
if maxPages < threads:
maxPages = threads
resultsPerPage = int(math.ceil(totalResults / float(maxPages)))
print threads, ' threads,', totalResults, 'total,', maxPages, 'pages', resultsPerPage, 'results per page'
answer = pool.map(getPage, zip(xrange(1,maxPages),
repeat(mode, maxPages),
repeat(resultsPerPage, maxPages),
repeat(query, maxPages)))
print "finished item pass", answer
except (KeyboardInterrupt, SystemExit):
print 'ok, caught'
raise
requests.post('http://localhost:10008/shutdown', {})
sys.exit(0)
#raise
def init():
@ -391,73 +492,67 @@ def init():
db.close()
def serveForever():
@app.route('/',
methods=['GET', 'POST'], endpoint='home')
def home():
db = getDb(makeDates=False)
res = db.execute('select * from items limit 10')
for item, checked in res.fetchall():
print item
try:
metadata = subprocess.Popen(['./ia','metadata', item],
stdout=subprocess.PIPE).communicate()[0]
# def serveForever():
# @app.route('/',
# methods=['GET', 'POST'], endpoint='home')
# def home():
# db = getDb(makeDates=False)
# res = db.execute('select * from items limit 10')
# for item, checked in res.fetchall():
# print item
# try:
# metadata = subprocess.Popen(['./ia','metadata', item],
# stdout=subprocess.PIPE).communicate()[0]
break
except Exception, e:
pass
db.close()
# break
# except Exception, e:
# pass
# db.close()
# return flask.make_response(metadata)
# @app.route('/progress',
# methods=['GET', 'POST'], endpoint='progress')
# def progress():
# r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&sort[]=date+desc&rows=1&page=1&output=json')
# if r.status_code != 200:
# return flask.make_response(json.dumps({error:'ia search feed is down'}),
# 'application/json')
# contents = r.content
# jsonContents = json.loads(contents)
# numFound = jsonContents['response']['numFound']
# db = getDb()
# examinedItems = db.execute('select count(*) from items').fetchone()
# itemsWithWarc = db.execute('select count(*) from items where ROWID in (select itemId from files where files.status = 200)').fetchone()
# return flask.make_response(json.dumps({'totalItems':numFound,
# 'examinedItems':examinedItems,
# 'itemsWithWarc':itemsWithWarc
# }, indent=4), 'application/json')
# @app.route('/items',
# methods=['GET', 'POST'], endpoint='items')
# def items():
# db = getDb(makeDates=False)
# c = db.cursor()
# res = c.execute("select item, checked from items")
# out = []
# for item, checked in res.fetchall():
# out.append({'item':item, 'checked':checked})
# db.close()
return flask.make_response('hihih' + metadata)
# return flask.make_response(json.dumps(out), 'application/json')
@app.route('/progress',
methods=['GET', 'POST'], endpoint='progress')
def progress():
r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&sort[]=date+desc&rows=1&page=1&output=json')
if r.status_code != 200:
return flask.make_response(json.dumps({error:'ia search feed is down'}),
'application/json')
contents = r.content
jsonContents = json.loads(contents)
numFound = jsonContents['response']['numFound']
db = getDb()
examinedItems = db.execute('select count(*) from items').fetchone()
itemsWithWarc = db.execute('select count(*) from items where ROWID in (select itemId from files where files.status = 200)').fetchone()
return flask.make_response(json.dumps({'totalItems':numFound,
'examinedItems':examinedItems,
'itemsWithWarc':itemsWithWarc
}, indent=4), 'application/json')
@app.route('/items',
methods=['GET', 'POST'], endpoint='items')
def items():
db = getDb(makeDates=False)
c = db.cursor()
res = c.execute("select item, checked from items")
out = []
for item, checked in res.fetchall():
out.append({'item':item, 'checked':checked})
db.close()
return flask.make_response(json.dumps(out), 'application/json')
app.run('0.0.0.0',
port=7999,
debug=True,
use_reloader=True,
use_debugger=True)
# app.run('0.0.0.0',
# port=7999,
# debug=False,
# use_reloader=False,
# use_debugger=False)
if __name__ == '__main__':

4109
script/inject/monitor.py Normal file

File diff suppressed because one or more lines are too long

Binary file not shown.