open-source-search-engine/SpiderProxy.cpp
2015-04-23 21:17:55 -06:00

1055 lines
32 KiB
C++

#include "gb-include.h"
#include "Pages.h"
#include "TcpSocket.h"
#include "HttpServer.h"
#include "SpiderProxy.h"
#define LOADPOINT_EXPIRE_MS (10*60*1000)
//
// BASIC DETAILS
//
// . host #0 is solely responsible for testing the proxies and keeping
// the results of the tests, using the user-defined test url which
// each proxy downloads every 60 seconds or so.
// . host #0 also saves these stats to the spiderproxies.dat file
// in the working dir (just an array of SpiderProxy instances)
// . any host needing a proxy server to use should ask host #0 for one
// but if host #0 is dead then it should ask host #1, etc.
// . host #1 (etc.) will take over if it sees host #0 went dead
// . Conf::m_proxyIps (safebuf) holds space-sep'd list of proxyip:port pairs
// . since host #0 is responsible for giving you a proxy ip it can
// do proxy load balancing for the whole cluster
// . TODO: to prevent host #0 from getting too slammed we can also recruit
// other hosts to act just like host #0.
// hashtable that maps an ip:port key (64-bits) to a SpiderProxy
static HashTableX s_iptab;
// . handle a udp request for msg 0x54 to get the best proxy to download
// the url they are requesting to download. this should be ultra fast
// because we might be downloading 1000+ urls a second, although in that
// case we should have more hosts that do the proxy load balancing.
// . when a host is done using a proxy to download a url it should
// send another 0x54 msg to indicate that.
// . if a host dies then it might not indicate it is done using a proxy
// so should timeout requests and decrement the proxy load count
// or if we notice the host is dead we should remove all its load
// . so each outstanding request needs to be kept in a hash table,
// which identifies the proxyip/port and the hostid using it and
// the time it made the request. the key is the proxy ip/port.
class LoadBucket {
public:
// ip address of the url being downloaded
int32_t m_urlIp;
// the time it started
int64_t m_downloadStartTimeMS;
int64_t m_downloadEndTimeMS;
// the host using the proxy
int32_t m_hostId;
// key is this for m_prTable
int32_t m_proxyIp;
int32_t m_proxyPort;
// id of this loadbucket in case same host is using the same
// proxy to download the same urlip
int32_t m_id;
};
// . similar to s_ipTable but maps a URL's ip to a LoadBucket
// . every download request in the last 10 minutes is represented by one
// LoadBucket
// . that way we can ensure when downloading multiple urls of the same IP
// that we splay them out evenly over all the proxies
static HashTableX s_loadTable;
// . when the Conf::m_proxyIps parm is updated we call this to rebuild
// s_iptab, our table of SpiderProxy instances, which has the proxies and
// their performance statistics.
// . we try to maintain stats of ip/ports that did NOT change when rebuilding.
bool buildProxyTable ( ) {
// scan the NEW list of proxy ip/port pairs in g_conf
char *p = g_conf.m_proxyIps.getBufStart();
HashTableX tmptab;
tmptab.set(8,0,16,NULL,0,false,0,"tmptab");
// scan the user inputted space-separated list of ip:ports
// (optional username:password@ip:port)
for ( ; *p ; ) {
// skip white space
if ( is_wspace_a(*p) ) { p++; continue; }
// skip http://
if ( strncasecmp(p,"http://",7) == 0 ) { p += 7; continue; }
// scan in an ip:port
char *s = p; char *portStr = NULL;
int32_t dc = 0, pc = 0, gc = 0, bc = 0;
char *msg;
char *usernamePwd = NULL;
int32_t usernamePwdLen = 0;
char *ipStart = p;
// scan all characters until we hit \0 or another whitespace
for ( ; *s && !is_wspace_a(*s); s++) {
if ( *s == '@' ) {
// must be username:pwd
if ( pc != 1 ) {
msg = "bad username:password";
goto hadError;
}
usernamePwd = p;
usernamePwdLen = s - p;
if ( usernamePwdLen >= MAXUSERNAMEPWD-2 ) {
msg = "username:password too long";
goto hadError;
}
dc = 0;
gc = 0;
bc = 0;
pc = 0;
portStr = NULL;
ipStart = s+1;
continue;
}
if ( *s == '.' ) { dc++; continue; }
if ( *s == ':' ) { portStr=s; pc++; continue; }
if ( is_digit(*s) ) { gc++; continue; }
bc++;
continue;
}
// ensure it is a legit ip:port combo
msg = NULL;
if ( gc < 4 )
msg = "not enough digits for an ip";
if ( pc > 1 )
msg = "too many colons";
if ( dc != 3 )
msg = "need 3 dots for an ip address";
if ( bc )
msg = "got illegal char in ip:port listing";
if ( msg ) {
hadError:
char c = *s;
*s = '\0';
log("buf: %s for %s",msg,p);
*s = c;
return false;
}
// convert it
int32_t iplen = s - ipStart;
if ( portStr ) iplen = portStr - ipStart;
int32_t ip = atoip(ipStart,iplen);
// another sanity check
if ( ip == 0 || ip == -1 ) {
log("spider: got bad proxy ip for %s",p);
return false;
}
// and the port default is 80
int32_t port = 80;
if ( portStr ) port = atol2(portStr+1,s-portStr-1);
if ( port < 0 || port > 65535 ) {
log("spider: got bad proxy port for %s",p);
return false;
}
// . we got a legit ip:port
// . see if already in our table
uint64_t ipKey = (uint32_t)ip;
ipKey <<= 16;
ipKey |= (uint16_t)(port & 0xffff);
// also store into tmptable to see what we need to remove
tmptab.addKey(&ipKey);
// see if in table
int32_t islot = s_iptab.getSlot( &ipKey);
// advance p
p = s;
// if in there, keep it as is
if ( islot >= 0 ) continue;
// otherwise add new entry
SpiderProxy newThing;
memset ( &newThing , 0 , sizeof(SpiderProxy));
newThing.m_ip = ip;
newThing.m_port = port;
newThing.m_lastDownloadTookMS = -1;
newThing.m_lastSuccessfulTestMS = -1;
gbmemcpy(newThing.m_usernamePwd,usernamePwd,usernamePwdLen);
// ensure it is NULL terminated
newThing.m_usernamePwd[usernamePwdLen] = '\0';
if ( ! s_iptab.addKey ( &ipKey, &newThing ) )
return false;
}
redo:
int32_t removed = 0;
// scan all SpiderProxies in tmptab
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty buckets in hashtable s_iptab
if ( ! s_iptab.m_flags[i] ) continue;
// get the key
int64_t key = *(int64_t *)s_iptab.getKey(i);
// must also exist in tmptab, otherwise it got removed by user
if ( tmptab.isInTable ( &key ) ) continue;
// skip if not in table
if ( s_iptab.getSlot ( &key ) < 0 ) {
log("sproxy: iptable hashing messed up");
continue;
}
// shoot, it got removed. not in the new list of ip:ports
s_iptab.removeKey ( &key );
removed++;
// hashtable is messed up now, start over
//goto redo;
}
if ( removed ) goto redo;
return true;
}
static bool s_init = true;
HashTableX s_proxyBannedTable;
HashTableX s_banCountTable;
bool initProxyTables ( ) {
// initialize proxy/urlip ban table?
if ( ! s_init ) return true;
s_init = false;
s_proxyBannedTable.set(8,0,0,NULL,0,false,1,"proxban");
s_banCountTable.set(4,4,0,NULL,0,false,1,"bancnt");
return true;
}
bool resetProxyStats ( ) {
// s_proxyBannedTable.reset();
// s_banCountTable.reset();
// s_iptab.reset();
s_iptab.set(8,sizeof(SpiderProxy),0,NULL,0,false,0,"siptab",true);
// skip port part of key magic, and get LSB of the IP as key magic
s_iptab.m_maskKeyOffset = 5;
s_proxyBannedTable.set(8,0,0,NULL,0,false,1,"proxban");
s_banCountTable.set(4,4,0,NULL,0,false,1,"bancnt");
return buildProxyTable();
}
// save the stats
bool saveSpiderProxyStats ( ) {
// save hashtable
s_proxyBannedTable.save(g_hostdb.m_dir,"proxybantable.dat");
s_banCountTable.save(g_hostdb.m_dir,"proxybancounttable.dat");
// save hash table
return s_iptab.save(g_hostdb.m_dir,"spiderproxystats.dat");
}
bool loadSpiderProxyStats ( ) {
initProxyTables();
// take this out for now since i was seeing dups in s_iptab for
// some reason. was causing an infinite loop bug calling goto redo:
// all the time above.
// save hashtable
//s_proxyBannedTable.load(g_hostdb.m_dir,"proxybantable.dat");
//s_banCountTable.load(g_hostdb.m_dir,"proxybancounttable.dat");
// save hash table. this also returns false if does not exist.
//if ( ! s_iptab.load(g_hostdb.m_dir,"spiderproxystats.dat") )
// return false;
// unset some flags
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
sp->m_isWaiting = false;
}
return true;
}
// . sets *current to how many downloads are CURRENTly outstanding for
// this proxy
// . returns total # of load points, i.e. downloads, in the last
// LOADPOINT_EXPIRE_MS milliseconds (currently 10 minutes)
int32_t getNumLoadPoints ( SpiderProxy *sp , int32_t *current ) {
// currently outstanding downloads on proxy
*current = 0;
int32_t count = 0;
// scan all proxies that have this urlip outstanding
for ( int32_t i = 0 ; i < s_loadTable.m_numSlots ; i++ ) {
// skip if empty
if ( ! s_loadTable.m_flags[i] ) continue;
// get the bucket
LoadBucket *lb= (LoadBucket *)s_loadTable.getValueFromSlot(i);
// get the spider proxy this load point was for
if ( lb->m_proxyIp != sp->m_ip ) continue;
if ( lb->m_proxyPort != sp->m_port ) continue;
// currently outstanding downloads on proxy
if ( lb->m_downloadEndTimeMS == 0LL )
*current = *current + 1;
count++;
}
return count;
}
// . we call this from Parms.cpp which prints out the proxy related controls
// and this table below them...
// . allows user to see the stats of each spider proxy
bool printSpiderProxyTable ( SafeBuf *sb ) {
// only host #0 will have the stats ... so print that link
if ( g_hostdb.m_myHost->m_hostId != 0 ) {
Host *h = g_hostdb.getHost(0);
sb->safePrintf("<br>"
"<b>See table on <a href=http://%s:%"INT32"/"
"admin/proxies>"
"host #0</a></b>"
"<br>"
, iptoa(h->m_ip)
, (int32_t)(h->m_httpPort)
);
//return true;
}
// print host table
sb->safePrintf (
"<table %s>"
"<tr><td colspan=10><center>"
"<b>Spider Proxies "
"</b>"
"</center></td></tr>"
"<tr bgcolor=#%s>"
"<td>"
"<b>proxy IP</b></td>"
"<td><b>proxy port</b></td>"
"<td><b>times used</b></td>"
"<td><b># website IPs banning</b></td>"
"<td><b>load points</b></td>"
"<td><b>currently out</b></td>"
// time of last successful download. print "none"
// if never successfully used
"<td><b>test url last successful download</b></td>"
// we fetch a test url every minute or so through
// each proxy to ensure it is up. typically this should
// be your website so you do not make someone angry.
"<td><b>test url last download attempt</b></td>"
// print "FAILED" in red if it failed to download
"<td><b>test url download took</b></td>"
"<td><b>last bytes downloaded</b></td>"
"<td><b>last test url error</b></td>"
"</tr>"
, TABLE_STYLE
, DARK_BLUE
);
int32_t now = getTimeLocal();
// print it
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
char *bg = LIGHT_BLUE;
// mark with light red bg if last test url attempt failed
if ( sp->m_lastDownloadTookMS == -1 &&
sp->m_lastDownloadTestAttemptMS>0 )
bg = "ffa6a6";
// or a perm denied error (as opposed to a timeout above)
if ( sp->m_lastDownloadError )
bg = "ffa6a6";
// print it
sb->safePrintf (
"<tr bgcolor=#%s>"
"<td>%s</td>" // proxy ip
"<td>%"UINT32"</td>" // port
, bg
, iptoa(sp->m_ip)
, (uint32_t)(uint16_t)sp->m_port
);
sb->safePrintf("<td>%"INT64"</td>",sp->m_timesUsed);
int32_t banCount = s_banCountTable.getScore32 ( &sp->m_ip );
if ( banCount < 0 ) banCount = 0;
sb->safePrintf("<td>%"INT32"</td>",banCount);
int32_t currentLoad;
// get # times it appears in loadtable
int32_t np = getNumLoadPoints ( sp , &currentLoad );
sb->safePrintf("<td>%"INT32"</td>",np);
// currently outstanding downloads on this proxy
sb->safePrintf("<td>%"INT32"</td>",currentLoad);
// last SUCCESSFUL download time ago. when it completed.
int32_t ago = now - sp->m_lastSuccessfulTestMS/1000;
sb->safePrintf("<td>");
// like 1 minute ago etc.
if ( sp->m_lastSuccessfulTestMS <= 0 )
sb->safePrintf("none");
else
sb->printTimeAgo ( ago , now , true );
sb->safePrintf("</td>");
// last download time ago
ago = now - sp->m_lastDownloadTestAttemptMS/1000;
sb->safePrintf("<td>");
// like 1 minute ago etc.
if ( sp->m_lastDownloadTestAttemptMS<= 0 )
sb->safePrintf("none");
else
sb->printTimeAgo ( ago , now , true );
sb->safePrintf("</td>");
// how long to download the test url?
if ( sp->m_lastDownloadTookMS != -1 )
sb->safePrintf("<td>%"INT32"ms</td>",
(int32_t)sp->m_lastDownloadTookMS);
else if ( sp->m_lastDownloadTestAttemptMS<= 0 )
sb->safePrintf("<td>unknown</td>");
else
sb->safePrintf("<td>"
"<font color=red>FAILED</font>"
"</td>");
sb->safePrintf("<td>%"INT32"</td>",sp->m_lastBytesDownloaded);
if ( sp->m_lastDownloadError )
sb->safePrintf("<td><font color=red>%s</font></td>",
mstrerror(sp->m_lastDownloadError));
else
sb->safePrintf("<td>none</td>");
sb->safePrintf("</tr>\n");
}
sb->safePrintf("</table><br>");
return true;
}
// class spip {
// public:
// int32_t m_ip;
// int32_t m_port;
// };
void gotTestUrlReplyWrapper ( void *state , TcpSocket *s ) {
//spip *ss = (spip *)state;
// free that thing
//mfree ( ss , sizeof(spip) ,"spip" );
HttpMime mime;
bool ret = mime.set ( s->m_readBuf , s->m_readOffset , NULL );
int32_t status = mime.getHttpStatus();
// note it
if ( g_conf.m_logDebugProxies || ! ret || status != 200 ) {
SafeBuf sb;
sb.safeMemcpy ( s->m_sendBuf , s->m_sendBufUsed );
sb.nullTerm();
log("sproxy: got test url reply (%s): %s *** sendbuf=%s",
mstrerror(g_errno),
s->m_readBuf,
sb.getBufStart() );
}
// we can get the spider proxy ip/port from the socket because
// we sent this url download request to that spider proxy
uint64_t key = (uint32_t)s->m_ip;
key <<= 16;
key |= (uint16_t)(s->m_port & 0xffff);
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValue ( &key );
// did user remove it from the list before we could finish testing it?
if ( ! sp ) {
log("sproxy: spider proxy entry for ip %s:%"INT32" is gone! "
"wtf? proxy stats table cannot be updated.",
iptoa(s->m_ip),(int32_t)s->m_port);
return;
}
sp->m_isWaiting = false;
// ok, update how long it took to do the download
int64_t nowms = gettimeofdayInMillisecondsLocal();
int64_t took = nowms - sp->m_lastDownloadTestAttemptMS;
sp->m_lastDownloadTookMS = (int32_t)took;
// tiny proxy permission denied is 403
if ( status >= 400 ) {
// if ( g_conf.m_logDebugProxies ) {
log("sproxy: got bad http status from proxy: %"INT32"",status);
g_errno = EPERMDENIED;
}
sp->m_lastBytesDownloaded = s->m_readOffset;
// ETCPTIMEDOUT?
sp->m_lastDownloadError = g_errno;
// if we had no error, that was our last successful test
if ( ! g_errno )
sp->m_lastSuccessfulTestMS = nowms;
}
// . Process.cpp should call this from its timeout wrapper
// . updates the stats of each proxy
// . returns false and sets g_errno on error
bool downloadTestUrlFromProxies ( ) {
// only host #0 should do the testing i guess
//if ( g_hostdb.m_myHost->m_hostId != 0 ) return true;
// no need if no url
if ( g_conf.m_proxyTestUrl.length() <= 1 ) return true;
// if host #0 dies then host #1 must take its place managing the
// spider proxies
Host *h0 = g_hostdb.getFirstAliveHost();
if ( g_hostdb.m_myHost != h0 ) return true;
int64_t nowms = gettimeofdayInMillisecondsLocal();
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
int64_t elapsed = nowms - sp->m_lastDownloadTestAttemptMS;
// hit test url once per 31 seconds
if ( elapsed < 31000 ) continue;
// or if never came back yet!
if ( sp->m_isWaiting ) continue;
char *tu = g_conf.m_proxyTestUrl.getBufStart();
if ( ! tu ) continue;
//spip *ss = (spip *)mmalloc(8,"sptb");
// if ( ! ss ) return false;
// ss->m_ip = sp->m_ip;
// ss->m_port = sp->m_port;
sp->m_isWaiting = true;
sp->m_lastDownloadTestAttemptMS = nowms;
char *proxyUsernamePwdAuth = sp->m_usernamePwd;
if ( proxyUsernamePwdAuth && ! proxyUsernamePwdAuth[0] )
proxyUsernamePwdAuth = NULL;
// returns false if blocked
if ( ! g_httpServer.getDoc( tu ,
0 , // ip
0 , // offset
-1 , // size
false , // useifmodsince
NULL ,// state
gotTestUrlReplyWrapper ,
30*1000, // 30 sec timeout
sp->m_ip, // proxyip
sp->m_port, // proxyport
-1, // maxtextdoclen
-1, // maxtextotherlen
NULL, // agent ,
DEFAULT_HTTP_PROTO , // "HTTP/1.0"
false , // doPost?
NULL , // cookie
NULL , // additionalHeader
NULL , // our own mime!
NULL , // postContent
// is NULL or '\0' if not there
proxyUsernamePwdAuth ) ) {
//blocked++;
continue;
}
// did not block somehow
sp->m_isWaiting = false;
// must have been an error then
sp->m_lastDownloadError = g_errno;
// free that thing
//mfree ( ss , sizeof(spip) ,"spip" );
// log it
log("sproxy: error downloading test url %s through %s:%"INT32""
,tu,iptoa(sp->m_ip),(int32_t)sp->m_port);
}
return true;
}
#include "Msg13.h"
// when a host is done using a proxy to download a url it sends a signal
// and we process that signal here. to reduce the load count of that proxy.
static void returnProxy ( Msg13Request *preq , UdpSlot *udpSlot ) ;
// a host is asking us (host #0) what proxy to use?
void handleRequest54 ( UdpSlot *udpSlot , int32_t niceness ) {
char *request = udpSlot->m_readBuf;
int32_t requestSize = udpSlot->m_readBufSize;
// we now use the top part of the Msg13Request as the ProxyRequest
Msg13Request *preq = (Msg13Request *)request;
// sanity check
if ( requestSize != preq->getProxyRequestSize() ) {
log("db: Got bad request 0x54 size of %"INT32" bytes. bad",
requestSize );
g_udpServer.sendErrorReply ( udpSlot , EBADREQUESTSIZE );
return;
}
// is the request telling us it is done downloading through a proxy?
if ( preq->m_opCode == OP_RETPROXY ) {
returnProxy ( preq , udpSlot );
return;
}
// if sender is asking for a new proxy and wants us to ban
// the previous proxy we sent for this urlIp...
if ( preq->m_banProxyIp ) {
// don't core if misses sanity. it seems we don't always
// NULLify these or something.
// these must match
if(preq->m_banProxyIp != preq->m_proxyIp ||
preq->m_banProxyPort != preq->m_proxyPort){
log("db: proxy: banproxyip != proxyip. mismatch!");
g_udpServer.sendErrorReply ( udpSlot , EBADENGINEER);
return;
}
// this will "return" the banned proxy
returnProxy ( preq , NULL );
// now add it to the banned table
int64_t uip = preq->m_urlIp;
int64_t pip = preq->m_banProxyIp;
int64_t h64 = hash64h ( uip , pip );
if ( ! s_proxyBannedTable.isInTable ( &h64 ) ) {
s_proxyBannedTable.addKey ( &h64 );
// for stats counting. each proxy ip maps to #
// of unique website IPs that have banned it.
s_banCountTable.addTerm32 ( (int32_t *)&pip );
}
}
// int16_tcut
int32_t urlIp = preq->m_urlIp;
// send to a proxy that is up and has the least amount
// of LoadBuckets with this urlIp, if tied, go to least loaded.
// clear counts for this url ip for scoring the best proxy to use
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
sp->m_countForThisIp = 0;
sp->m_lastTimeUsedForThisIp = 0LL;
}
// this table maps a url's current IP to a possibly MULTIPLE slots
// which tell us what proxy is downloading a page from that IP.
// so we can try to find a proxy that is not download a url from
// this IP currently, or hasn't been for the longest time...
int32_t hslot = s_loadTable.getSlot ( &urlIp );
// scan all proxies that have this urlip outstanding
for ( int32_t i = hslot ; i >= 0 ; i = s_loadTable.getNextSlot(i,&urlIp)){
// get the bucket
LoadBucket *lb;
lb = (LoadBucket *)s_loadTable.getValueFromSlot(i);
// get the spider proxy this load point was for
uint64_t key = (uint32_t)lb->m_proxyIp;
key <<= 16;
key |= (uint16_t)lb->m_proxyPort;
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValue(&key);
// must be there unless user remove it from the list
if ( ! sp ) continue;
// count it up
if ( lb->m_downloadEndTimeMS == 0LL )
sp->m_countForThisIp++;
// set the last time used to the most recently downloaded time
// that this proxy has downloaded from this ip
if ( lb->m_downloadEndTimeMS &&
lb->m_downloadEndTimeMS > sp->m_lastTimeUsedForThisIp )
sp->m_lastTimeUsedForThisIp = lb->m_downloadEndTimeMS;
}
// first try to get a spider proxy that is not "dead"
bool skipDead = true;
int32_t numBannedProxies = 0;
int32_t aliveProxyCandidates = 0;
redo:
// get the min of the counts
int32_t minCount = 999999;
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
// get the spider proxy
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
// if this proxy was banned by the url's ip... skip it. it is
// not a candidate...
if ( skipDead ) {
int64_t uip = preq->m_urlIp;
int64_t pip = sp->m_ip;
int64_t h64 = hash64h ( uip , pip );
if ( s_proxyBannedTable.isInTable ( &h64 ) ) {
numBannedProxies++;
continue;
}
}
// if it failed the last test, skip it
if ( skipDead && sp->m_lastDownloadError ) continue;
if ( skipDead ) aliveProxyCandidates++;
if ( sp->m_countForThisIp >= minCount ) continue;
minCount = sp->m_countForThisIp;
}
// all dead? then get the best dead one
if ( minCount == 999999 ) {
skipDead = false;
goto redo;
}
// . we only use one proxy if none are banned by this IP
// . when that gets banned, we will use the next 2 proxies with
// a higher backoff/crawlDelay, etc.
int32_t threshHold;
if ( numBannedProxies <= 0 ) threshHold = 1;
// if first proxy gets banned, try next 2 proxies until both get ban'd
else if ( numBannedProxies == 1 ) threshHold = 2;
else if ( numBannedProxies < 1+2) threshHold = 3 - numBannedProxies;
// if next two proxies got banned, try next 4 proxies until banned
else if ( numBannedProxies == 3 ) threshHold = 4;
else if ( numBannedProxies < 3+4) threshHold = 7 - numBannedProxies;
// if next 4 proxies got banned, try next 8 proxies until they get band
else if ( numBannedProxies == 7 ) threshHold = 8;
else if ( numBannedProxies < 7+8) threshHold = 15 - numBannedProxies;
else if ( numBannedProxies == 15) threshHold = 16;
else if ( numBannedProxies < 15+16 ) threshHold = 31-numBannedProxies;
else if ( numBannedProxies == 31 ) threshHold = 32;
else if ( numBannedProxies < 31+32)threshHold=63-numBannedProxies;
else if ( numBannedProxies == 63 ) threshHold = 64;
else if ( numBannedProxies < 63+64)threshHold=127-numBannedProxies;
else if ( numBannedProxies == 127 ) threshHold = 128;
else if ( numBannedProxies < 127+128)threshHold=255-numBannedProxies;
else if ( numBannedProxies == 255 ) threshHold = 256;
else if ( numBannedProxies < 255+256)threshHold=512-numBannedProxies;
else if ( numBannedProxies == 511 ) threshHold = 512;
else if ( numBannedProxies < 511+512)threshHold=1024-numBannedProxies;
else threshHold = 1024;
if ( threshHold <= 0 ) {
log("proxy: spiderproxy error in threshold of %"INT32" "
"for banned=%"INT32"",threshHold,numBannedProxies);
threshHold = 1;
}
// reset minCount so we can take the min over those we check here
minCount = -1;
int64_t oldest = 0x7fffffffffffffffLL;
SpiderProxy *winnersp = NULL;
int32_t count = 0;
// start at a random slot based on url's IP so we don't
// overload the first proxy
int32_t start = ((uint32_t)urlIp) % s_iptab.getNumSlots();
int32_t slotCount = s_iptab.getNumSlots();
// . now find the best proxy wih the minCount
for ( int32_t i = start ; ; i++ ) {
// scan all slots in hash table, then stop
if ( slotCount-- <= 0 ) break;
// wrap around to zero if we hit the end
if ( i == s_iptab.getNumSlots() ) i = 0;
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
// get the spider proxy
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
// if it failed the last test, skip it... not here...
if ( skipDead && sp->m_lastDownloadError ) continue;
// if this proxy was banned by the url's ip... skip it. it is
// not a candidate...
if ( skipDead ) {
int64_t uip = preq->m_urlIp;
int64_t pip = sp->m_ip;
int64_t h64 = hash64h ( uip , pip );
if ( s_proxyBannedTable.isInTable ( &h64 ) ) continue;
}
// if some proxies are "alive" then only pick from
// the first half of the proxies that are alive (i.e. still
// work). that way, when one of those goes dead we will inc
// the backoff (crawldelay) and a new proxy that we haven't
// used for this url's IP will take it's place. and such
// new proxies will only have the new backoff count used
// through them. that way, we don't get ALL of our proxies
// banned at about the same time since we do somewhat uniform
// load balancing over them.
if ( skipDead && count >= threshHold)//aliveProxyCandidates/2 )
continue;
// count the alive/non-banned candidates
count++;
// if all hosts were "dead" because they all had
// m_lastDownloadError set then minCount will be 999999
// and nobody should continue from this statement:
if ( sp->m_countForThisIp > minCount && minCount>=0 ) continue;
// then go by last download time for this ip
if ( sp->m_countForThisIp == minCount && minCount>=0 &&
sp->m_lastTimeUsedForThisIp >= oldest )
continue;
// pick the spider proxy used longest ago
oldest = sp->m_lastTimeUsedForThisIp;
minCount = sp->m_countForThisIp;
// got a new winner
winnersp = sp;
}
// we must have a winner
if ( ! winnersp ) { char *xx=NULL;*xx=0; }
int64_t nowms = gettimeofdayInMillisecondsLocal();
// winner count update
winnersp->m_timesUsed++;
// add a new load bucket then!
LoadBucket bb;
bb.m_urlIp = urlIp;
// the time it started
bb.m_downloadStartTimeMS = nowms;
// download has not ended yet
bb.m_downloadEndTimeMS = 0LL;
// the host using the proxy
bb.m_hostId = udpSlot->m_hostId;
// key is this for m_prTable
bb.m_proxyIp = winnersp->m_ip;
bb.m_proxyPort = winnersp->m_port;
// a new id. we use this to update the downloadEndTime when done
static int32_t s_lbid = 0;
bb.m_id = s_lbid++;
// add it now
s_loadTable.addKey ( &urlIp , &bb );
// sanity
if ( (int32_t)sizeof(ProxyReply) > TMPBUFSIZE ){char *xx=NULL;*xx=0;}
// and give proxy ip/port back to the requester so they can
// use that to download their url
ProxyReply *prep = (ProxyReply *)udpSlot->m_tmpBuf;
prep->m_proxyIp = winnersp->m_ip;
prep->m_proxyPort = winnersp->m_port;
// this is just '\0' if none
strcpy(prep->m_usernamePwd,winnersp->m_usernamePwd);
// do not count the proxy we are returning as "more"
prep->m_hasMoreProxiesToTry = ( aliveProxyCandidates > 1 );
// and the loadbucket id, so requester can tell us it is done
// downloading through the proxy and we can update the LoadBucket
// for this transaction (m_lbId)
prep->m_lbId = bb.m_id;
// requester wants to know how many proxies have been banned by the
// urlIp so it can increase a self-imposed crawl-delay to be more
// sensitive to the spider policy.
prep->m_numBannedProxies = numBannedProxies;
//char *p = udpSlot->m_tmpBuf;
//*(int32_t *)p = winnersp->m_ip ; p += 4;
//*(int16_t *)p = winnersp->m_port; p += 2;
// and the loadbucket id
//*(int32_t *)p = bb.m_id; p += 4;
// now remove old entries from the load table. entries that
// have completed and have a download end time more than 10 mins ago
for ( int32_t i = 0 ; i < s_loadTable.getNumSlots() ; i++ ) {
// skip if empty
if ( ! s_loadTable.m_flags[i] ) continue;
// get the bucket
LoadBucket *pp =(LoadBucket *)s_loadTable.getValueFromSlot(i);
// skip if still active
if ( pp->m_downloadEndTimeMS == 0LL ) continue;
// delta t
int64_t took = nowms - pp->m_downloadEndTimeMS;
// < 10 mins?
if ( took < LOADPOINT_EXPIRE_MS ) continue;
// ok, its too old, nuke it to save memory
s_loadTable.removeSlot(i);
// the keys might have buried us but we really should not
// mis out on analyzing any keys if we just keep looping here
// should we? TODO: figure it out. if we miss a few it's not
// a big deal.
i--;
}
// send the proxy ip/port/LBid back to user
g_udpServer.sendReply_ass ( udpSlot->m_tmpBuf , // msg
sizeof(ProxyReply) , // msgSize
udpSlot->m_tmpBuf , // alloc
sizeof(ProxyReply) ,
udpSlot ,
60 ) ; // 60s timeout
}
// . use msg 0x55 to say you are done using the proxy
// . we now use the top part of the Msg13Request as the proxy request
void returnProxy ( Msg13Request *preq , UdpSlot *udpSlot ) {
//char *p = request;
//int32_t proxyIp = *(int32_t *)p; p += 4;
//int16_t proxyPort = *(int16_t *)p; p += 2;
//int32_t lbId = *(int32_t *)p; p += 4;
int32_t urlIp = preq->m_urlIp;
//
// update the load bucket
//
// scan over all that match to find lbid
int32_t hslot = s_loadTable.getSlot ( &urlIp );
// scan all proxies that have this urlip outstanding
int32_t i;for (i=hslot ; i >= 0 ; i = s_loadTable.getNextSlot(i,&urlIp)){
// get the bucket
LoadBucket *lb= (LoadBucket *)s_loadTable.getValueFromSlot(i);
// is it the right id?
if ( lb->m_id != preq->m_lbId ) continue;
if ( lb->m_proxyIp != preq->m_proxyIp ) continue;
if ( lb->m_proxyPort != preq->m_proxyPort ) continue;
// that's it. set the download end time
int64_t nowms = gettimeofdayInMillisecondsLocal();
lb->m_downloadEndTimeMS = nowms;
break;
}
if ( i < 0 )
log("sproxy: could not find load bucket id #%"INT32"",preq->m_lbId);
// if no slot provided, return to called without sending back reply,
// they are banning a proxy and need to also return it before
// we send them back another proxy to try.
if ( ! udpSlot ) return;
// gotta send reply back
g_udpServer.sendReply_ass ( 0,
0 , // msgSize
0 , // alloc
0 ,
udpSlot ,
60 ) ; // 60s timeout
}
// call this at startup to register the handlers
bool initSpiderProxyStuff() {
// do this for all hosts in case host #0 goes dead, then everyone
// will, according to Msg13.cpp, send to host #1, the next in line
// if she is alive
//if ( g_hostdb.m_myHostId != 0 ) return true;
// only host #0 has handlers
if ( ! g_udpServer.registerHandler ( 0x54, handleRequest54 ))
return false;
// key is ip/port
s_iptab.set(8,sizeof(SpiderProxy),0,NULL,0,false,0,"siptab",true);
// skip port part of key magic, and get LSB of the IP as key magic
s_iptab.m_maskKeyOffset = 5;
loadSpiderProxyStats();
// build the s_iptab hashtable for the first time
buildProxyTable ();
// make the loadtable hashtable
static bool s_flag = 0;
if ( s_flag ) return true;
s_flag = true;
return s_loadTable.set(4,
sizeof(LoadBucket),
128,
NULL,
0,
true, // allow dups?
MAX_NICENESS,
"lbtab",
true); // use key magic to mix things up
}
SpiderProxy *getSpiderProxyByIpPort ( int32_t ip , uint16_t port ) {
for ( int32_t i = 0 ; i < s_iptab.getNumSlots() ; i++ ) {
// skip empty slots
if ( ! s_iptab.m_flags[i] ) continue;
SpiderProxy *sp = (SpiderProxy *)s_iptab.getValueFromSlot(i);
if ( sp->m_ip != ip ) continue;
if ( sp->m_port != port ) continue;
return sp;
}
return NULL;
}