open-source-search-engine/blaster.cpp
Matt Wells dfab4ee13d fixed bugs with advanced.html advanced search page.
made stats graph only show last 5 minutes of stats.
tends to make the graph look more continuous.
do not use ajax to fetch the search results unless
this is running in matt wells' datacenter. it is
only an anti bot scraping measure and unnecessarily
complicates things for others.
2013-11-17 14:58:47 -07:00

451 lines
12 KiB
C++

// Matt Wells, copyright Sep 2001
// the main program that brings it all together
#include "gb-include.h"
#include "Mem.h"
#include "Conf.h"
#include "Dns.h"
#include "HttpServer.h"
#include "Loop.h"
#include <sys/resource.h> // setrlimit
#include "SafeBuf.h"
static void startSpidering ( ) ;
static void gotDocWrapper ( void *state , TcpSocket *s ) ;
static void sleepWrapper ( int fd , void *state ) ;
static long s_maxNumThreads = 1 ;
static long s_launched = 0;
static long s_total = 0;
static char *s_p = NULL;
static char *s_pend = NULL;
static bool s_portSwitch = 0;
static long s_wait;
static long s_lastTime = 0;
static long s_printIt = true;
static char s_append[512];
static SafeBuf s_words;
static SafeBuf s_windices;
static char *s_server = NULL;
static long s_numRandWords = 0;
long getRandomWords(char *buf, char *bufend, long numWords);
bool getWords();
bool mainShutdown ( bool urgent ) { return true; }
bool closeAll ( void *state , void (* callback)(void *state) ) {return true;}
bool allExit ( ) {return true;}
int main ( int argc , char *argv[] ) {
// let's ensure our core file can dump
struct rlimit lim;
lim.rlim_cur = lim.rlim_max = RLIM_INFINITY;
if ( setrlimit(RLIMIT_CORE,&lim) )
log("blaster::setrlimit: %s", mstrerror(errno) );
g_conf.m_maxMem = 500000000;
// init our table for doing zobrist hashing
if ( ! hashinit() ) {
log("blaster::hashinit failed" ); return 1; }
// init the memory class after conf since it gets maxMem from Conf
//if ( ! g_mem.init ( 20000000 ) ) {
// log("blaster::Mem init failed" ); return 1; }
g_mem.m_maxMem = 200000000;
// start up log file
if ( ! g_log.init( "/tmp/blasterLog" ) ) {
log("blaster::Log open /tmp/blasterLog failed" ); return 1; }
// get dns ip from /etc/resolv.conf
g_conf.m_dnsIps[0] = 0;
FILE *fd = fopen ( "/etc/resolv.conf" , "r" );
if ( ! fd ) {
log("blaster::fopen: /etc/resolve.conf %s",
mstrerror(errno)); return 1; }
char tmp[1024];
while ( fgets ( tmp , 1024 , fd ) ) {
// tmp buf ptr
char *p = tmp;
// skip comments
if ( *p == '#' ) continue;
// skip nameserver name
if ( ! isdigit(*p) ) while ( ! isspace ( *p ) ) p++ ;
// skip spaces
while ( isspace ( *p ) ) p++;
// if this is not a digit, continue
if ( ! isdigit(*p) ) continue;
// get ip
g_conf.m_dnsIps[0] = atoip ( p , gbstrlen(p) );
// done
break;
}
fclose ( fd );
// if no dns server found, bail
if ( g_conf.m_dnsIps[0] == 0 ) {
log("blaster:: no dns ip found in /etc/resolv.conf");return 1;}
// hack # of dns servers
g_conf.m_numDns = 1;
g_conf.m_dnsPorts[0] = 53;
//g_conf.m_dnsIps [0] = atoip ( "192.168.0.1", 11 );
//g_conf.m_dnsClientPort = 9909;
g_conf.m_dnsMaxCacheMem = 1024*10;
// hack http server port to -1 (none)
//g_conf.m_httpPort = 0;
g_conf.m_httpMaxSockets = 200;
//g_conf.m_httpMaxReadBufSize = 102*1024*1024;
g_conf.m_httpMaxSendBufSize = 16*1024;
//g_conf.m_httpMaxDownloadSockets = 200;
if ( argc != 4 && argc != 5 && argc !=6 ) {
printUsage:
log("USAGE: blaster [fileOfUrls | -r<num random words><server>] [maxNumThreads] [wait in ms] "
"<lines to skip> <string to append>");
log("USAGE: examples:");
log("USAGE: ./blaster queries.fromlog 10 1");
log("USAGE: ./blaster -r3http://www.gigablast.com/index.php?q= 1 100\n");
return 1;
}
// init the loop
if ( ! g_loop.init() ) {
log("blaster::Loop init failed" ); return 1; }
// . then dns client
// . server should listen to a socket and register with g_loop
if ( ! g_dns.init(6000) ) {
log("blaster::Dns client init failed" ); return 1; }
// . then webserver
// . server should listen to a socket and register with g_loop
for(long i = 0; i < 50; i++) {
if ( ! g_httpServer.init( 8333 + i, 9334+i ) ) {
log("blaster::HttpServer init failed" );
//return 1;
}
else break;
}
// set File class
char *fname = argv[1];
long fnameLen = gbstrlen(fname);
long fileSize = 0;
long bufSize = 0;
char *buf = NULL;
long n = 0;
//should we generate random queries?
if(fnameLen > 2 && fname[0] == '-' && fname[1] == 'r') {
char *p = fname + 2;
s_numRandWords = atoi( p );
while(is_digit(*p)) p++;
getWords();
if(*p == '\0') goto printUsage;
s_server = p;
log("blaster server is %s", s_server);
// char x[1024];
// while(1) {
// long l = getRandomWords(x, x + 1024, s_numRandWords);
// *(x + l) = '\0';
// log("blaster: %s", x);
// }
// exit(1);
}
else { //it is a real file
File f;
f.set ( fname );
// open file
if ( ! f.open ( O_RDONLY ) ) {
log("blaster::open: %s %s",fname,mstrerror(g_errno));
return 1;
}
// get file size
fileSize = f.getFileSize() ;
// store a \0 at the end
bufSize = fileSize + 1;
// make buffer to hold all
buf = (char *) mmalloc ( bufSize , "blaster" );
if ( ! buf) {log("blaster::mmalloc: %s",mstrerror(errno));return 1;}
//char *bufEnd = buf + bufSize;
// set s_p
s_p = buf;
s_pend = buf + bufSize - 1;
// read em all in
if ( ! f.read ( buf , fileSize , 0 ) ) {
log("blaster::read: %s %s",fname,mstrerror(g_errno));return 1;}
// change \n to \0
//char *p = buf;
for ( long i = 0 ; i < bufSize ; i++ ) {
if ( buf[i] != '\n' ) continue;
buf[i] = '\0';
n++;
}
f.close();
}
// log a msg
log(LOG_INIT,"blaster: read %li urls into memory", n );
long linesToSkip = 0;
if ( argc >= 5 ) {
linesToSkip = atoi ( argv[4] );
log (LOG_INIT,"blaster: skipping %li urls",linesToSkip);
}
for ( long i = 0; i < linesToSkip && s_p < s_pend; i++ )
s_p += gbstrlen(s_p) + 1;
if ( argc == 6 ) {
long len = gbstrlen ( argv[5] );
if ( len > 512 )
len = 512;
strncpy ( s_append , argv[5] , gbstrlen (argv[5]) );
}
else
s_append[0] = '\0';
// get min time bewteen each spider in milliseconds
s_wait = atoi( argv[3] );
// # of threads
s_maxNumThreads = 1;
s_maxNumThreads = atoi ( argv[2] );
s_portSwitch = 0;
//if ( argc == 4 ) s_portSwitch = 1;
//else s_portSwitch = 0;
// start our spider loop
//startSpidering( );
// wakeup wrapper every X ms
g_loop.registerSleepCallback ( s_wait , NULL , sleepWrapper );
//msg10.addUrls ( uu , gbstrlen(uu)+1, NULL,0,time(0),4,true,NULL,NULL);
// . now start g_loops main interrupt handling loop
// . it should block forever
// . when it gets a signal it dispatches to a server or db to handle it
if ( ! g_loop.runLoop() ) {
log("blaster::runLoop failed" ); return 1; }
// dummy return (0-->normal exit status for the shell)
return 0;
}
void sleepWrapper ( int fd , void *state ) {
startSpidering();
}
void startSpidering ( ) {
// url class for parsing/normalizing url
Url u;
// count total urls done
static long long s_startTime = 0;
// set startTime
if ( s_startTime == 0 ) s_startTime = gettimeofdayInMilliseconds();
// get time now
long long now = gettimeofdayInMilliseconds();
// elapsed time to do all urls
double took = (double)(now - s_startTime) / 1000.0 ;
// log this every 20 urls
if ( s_printIt && s_total > 0 && ( s_total % 20 ) == 0 ) {
logf(LOG_INFO,"did %li urls in %f seconds. %f urls per second."
" threads now = %li.",
s_total , took , ((double)s_total) / took, s_launched);
s_printIt = false;
}
// did we wait long enough?
if ( now - s_lastTime < s_wait ) return;
s_lastTime = now;
// . use HttpServer.getDoc() to fetch it
// . fetch X at a time
while ( (s_server || s_p < s_pend) && s_launched < s_maxNumThreads ) {
// clear any error
g_errno = 0;
//append s_append to the url
char url[MAX_URL_LEN];
char *p = url;
char *pend = url + MAX_URL_LEN;
char *t = NULL;
if(s_server) {
long len = gbstrlen(s_server);
memcpy ( p, s_server, len);
p += len;
p += getRandomWords(p, pend, s_numRandWords);
long appendLen = gbstrlen(s_append);
if ( p + appendLen < pend ) {
memcpy ( p, s_append, gbstrlen(s_append) );
p += gbstrlen(s_append);
}
*p++ = '\0';
u.set ( url , p - url);
t = g_mem.strdup(url, "saved url");
}
else {
memcpy ( p, s_p, gbstrlen(s_p));
p += gbstrlen ( s_p );
if ( gbstrlen(s_p) + gbstrlen(s_append) < MAX_URL_LEN )
memcpy ( p, s_append, gbstrlen(s_append) );
p += gbstrlen(s_append);
//null end
*p ='\0';
// make into a url class
u.set ( url , gbstrlen(url) );
// set port if port switch is true
//if ( s_portSwitch ) {
// long r = rand() % 32;
// u.setPort ( 8000 + r );
//}
// save s_p
t = s_p;
// skip to next url
s_p += gbstrlen ( s_p ) + 1;
}
// count it
s_launched++;
// get it
bool status = g_httpServer.getDoc ( &u , // url
0 , // offset
-1 , // size
0 , // ifModifiedSince
(void *)t , // state
gotDocWrapper, // callback
20*1000, // timeout
0, // proxy ip
0, // proxy port
30*1024*1024, //maxLen
30*1024*1024);//maxOtherLen
// continue if it blocked
if ( ! status ) continue;
// otherwise, got it right away
s_launched--;
// log msg
log("got doc1 %s: %s", u.getUrl() , mstrerror(g_errno) );
// we gotta wait
break;
}
// bail if not done yet
//if ( s_launched > 0 ) return;
if ( s_server || s_p < s_pend ) return;
// otherwise, we're all done
logf(LOG_INFO,"blaster: did %li urls in %f seconds. %f urls per "
"second.",
s_total , took , ((double)s_total) / took );
// exit now
exit ( 0 );
}
void gotDocWrapper ( void *state , TcpSocket *s ) {
// no longer launched
s_launched--;
char* url = (char*)state;
// bail if got cut off
if ( s->m_readOffset == 0 ) {
log("lost %s",(char *) state);
if(s_server) mfree(url, gbstrlen(url)+1, "saved url");
return;
}
// got one more result page
s_total++;
// allow printing
s_printIt = true;
// get time now
long long now = gettimeofdayInMilliseconds();
// get hash
char *reply = s->m_readBuf ;
long size = s->m_readOffset;
HttpMime mime;
mime.set ( reply , size , NULL );
char *content = reply + mime.getMimeLen();
long contentLen = size - mime.getMimeLen();
long status = mime.getHttpStatus();
unsigned long h = hash32 ( content , contentLen );
char *p = mime.getMime();
char *pend = p + mime.getMimeLen();
char message[256];
long mlen = 0;
// parse status message out of response
// HTTP/1.0
while ( p < pend && !isspace(*p) ) p++;
// skip space
while ( p < pend && isspace(*p) ) p++;
// copy to end of line
while (p < pend && mlen < 255 && *p != '\r' && *p != '\n'){
message[mlen++] = *p;
}
message[mlen] = '\0';
// log msg
if ( g_errno )
logf(LOG_INFO,"blaster: got doc (status=%li) (%li) (%lims) %s : "
"%s", status,
s->m_readOffset ,
(long)(now - s->m_startTime) ,
(char *)state ,
mstrerror(g_errno) );
else
logf(LOG_INFO,"blaster: got doc (status=%li) (%li) (%lims) "
"(hash=%lx) %s", status,
s->m_readOffset ,
(long)(now - s->m_startTime) ,
h ,
(char *)state );
if(s_server) mfree(url, gbstrlen(url)+1, "saved url");
// try to launch another
startSpidering();
}
long getRandomWords(char *buf, char *bufend, long numWords) {
long totalWords = s_windices.length() / sizeof(long);
char *p = buf;
while(1) {
long wordNum = rand() % totalWords;
long windex = *(long*)(&s_windices[wordNum*sizeof(long)]);
long wlen = gbstrlen(&s_words[windex]);
if(wlen + 1 + p >= bufend) return p - buf;
memcpy(p, &s_words[windex], wlen);
p += wlen;
if(--numWords <= 0) return p - buf;
*p++ = '+';
}
return p - buf;
}
bool getWords() {
FILE *fd = fopen ( "/usr/share/dict/words" , "r" );
if ( ! fd ) {
log("blaster:: failed to open /usr/share/dict/words %s",
mstrerror(errno));
return 1;
}
char tmp[1024];
while ( fgets ( tmp , 1024 , fd ) ) {
long len = gbstrlen(tmp);
if(len > 2 && tmp[len-2] == 's' && tmp[len-3] == '\'') continue;
s_windices += s_words.length();
s_words.safeMemcpy(tmp, len-1); //copy in data minus the newline
s_words += '\0';
}
fclose ( fd );
log("blaster: read %li words, %li bytes in from dictionary.",
s_windices.length() / sizeof(long), s_words.length());
return true;
}