open-source-search-engine/Msg28.cpp

303 lines
9.3 KiB
C++
Raw Normal View History

2013-08-03 00:12:24 +04:00
#include "gb-include.h"
#include "Msg28.h"
#include "HttpServer.h"
#include "Parms.h"
Msg28::Msg28 ( ) {
m_buf = NULL;
}
Msg28::~Msg28 ( ) {
//if ( m_buf ) mfree ( m_buf , m_bufSize , "Msg28" );
m_buf = NULL;
}
bool Msg28::massConfig ( char *requestBuf ,
void *state ,
void (* callback) (void *state ) ) {
// must be here
if ( ! requestBuf ) { char *xx=NULL;*xx=0; }
// must have a request buf already made then
m_bufSize = gbstrlen(requestBuf);
m_bufLen = m_bufSize;
m_buf = requestBuf;
m_numRequests = 0;
m_numReplies = 0;
m_numHosts = g_hostdb.getNumHosts();
m_sendTotal = m_numHosts;
m_state = state;
m_callback = callback;
m_hostId = -1;
m_hostId2 = -1;
m_ourselvesLast = false;
m_sendToProxy = false;
m_freeBuf = false;
m_i = 0;
// this returns false if blocked
if ( ! doSendLoop() ) return false;
return true;
}
// if hostId2 >= 0 then send to the range of hostIds in the
// interval [hostId,hostId2]
bool Msg28::massConfig ( TcpSocket *s ,
HttpRequest *r ,
long hostId ,
void *state ,
void (* callback) (void *state ) ,
bool ourselvesLast ,
bool sendToProxy ,
long hostId2 ) {
m_ourselvesLast = ourselvesLast;
m_state = state;
m_callback = callback;
m_sendToProxy = sendToProxy;
m_freeBuf = true;
// bail if we're an interface machine, don't configure the main
//if ( g_conf.m_interfaceMachine ) return true;
// make our own request from the cgi parms
m_bufSize = s->m_readOffset + 250 ;
m_buf = (char *)mmalloc (m_bufSize,"Msg28");
if ( ! m_buf ) {
log("admin: Could not allocate %li bytes to forward config "
"request.",m_bufSize);
return true;
}
char *p = m_buf;
char *pend = m_buf + m_bufSize;
sprintf ( p ,
"POST %s HTTP/1.0\r\n"
"Content-Length: 0000000\r\n\r\n" ,
r->getFilename());
p += gbstrlen ( p );
char *sizep = strstr ( m_buf , "Content-Length: " );
if ( ! sizep ) { char *xx = NULL; *xx = 0; }
sizep += 16;
// do not cast
char *cc = p;
sprintf ( p , "username=msg28&cast=0&" );
p += gbstrlen ( p );
for ( long i = 0 ; i < r->getNumFields() ; i++ ) {
// skip cast
if ( strcmp ( r->getField(i) , "cast" ) == 0 ) continue;
// but keep everything else
sprintf ( p , "%s=" , r->getField(i) );
p += gbstrlen( p );
if ( p + 10 >= pend ) {
g_errno = EBADENGINEER;
log(LOG_LOGIC,"Msg28: fix this buffer breech. "
"Command failed.");
return true;
}
// just a field and no "=<value>" ? then just skip it
if ( ! r->getValue(i) ) continue;
// otherwise, propagate it into the broadcast
2013-08-03 00:12:24 +04:00
p += urlEncode ( p , pend - p ,
r->getValue(i) , gbstrlen(r->getValue(i)) );
if ( i + 1 < r->getNumFields() ) *p++ = '&';
}
*p = '\0';
m_bufLen = p - m_buf;
// store content-length
sprintf ( sizep , "%07li" , (long)(p - cc) );
// recover the lost char from the sprintf's \0
*(sizep+7) = '\r';
m_numRequests = 0;
m_numReplies = 0;
m_numHosts = g_hostdb.getNumHosts();
if (m_sendToProxy)
m_numHosts = g_hostdb.m_numProxyHosts;
m_hostId = hostId;
// in order to specify an endpoint for the range the first point must
// be a valid hostid, otherwise it is a mess up
if ( hostId2 >= 0 && hostId < 0 ) {
log("admin: Endpoint of hostid interval range is valid, but "
"first point is not.");
hostId2 = -1;
}
m_hostId2 = hostId2;
m_i = 0;
if ( hostId2 >= m_numHosts ) {
g_errno = EBADENGINEER;
log("admin: Second hostid is %li, but we only have %li hosts.",
hostId2,m_numHosts);
return true;
}
if ( hostId >= m_numHosts ) {
g_errno = EBADENGINEER;
log("admin: Hostid is %li, but we only have %li hosts.",
hostId,m_numHosts);
return true;
}
// how many requests will we be sending out?
m_sendTotal = m_numHosts;
if ( m_hostId >= 0 && m_hostId2 >= 0 ) m_sendTotal = hostId2-hostId+1;
// this returns false if blocked
if ( ! doSendLoop() ) return false;
// all done w/o blocking, free the buf here
if ( m_buf ) mfree ( m_buf , m_bufSize , "Msg28" );
m_buf = NULL;
return true;
}
static void gotReply ( void *state , TcpSocket *s );
// accomodate a big cluster
#define MAX_OUT_MSG28 256
// returns false if blocked, true otherwise
bool Msg28::doSendLoop ( ) {
// only send once if we should
//if ( m_hostId >= 0 && m_numRequests >= MAX_OUT_MSG28 ) return true;
if ( m_hostId>=0 && m_numRequests>=1 && m_hostId2 == -1) return true;
// nothing to do if we sent a request to each host
//long numHosts = g_hostdb.getNumHosts();
if ( m_numRequests >= m_sendTotal ) return true;
// send to ourselves last iff m_ourselvesLast is true
long total = m_numHosts ;
if ( m_ourselvesLast ) total++;
// . now send it to all! allow up to 16 outstanding requests.
// . only allow 1 outstanding, and do ourselves last in case of
// save & exit...
for ( long i = m_i ; i < total &&
m_numRequests - m_numReplies < MAX_OUT_MSG28 ; i++ ){
// skip it for next call, but stick to the last host, that's us
if ( i != m_numHosts ) m_i++;
// if we have a range and i is not in it, skip, but watch
// out when i==m_numHosts, because that is when we send the
// request to ourselves, provided we are in the range
if ( m_hostId >= 0 && m_hostId2 >= 0 && i < m_numHosts )
if ( i < m_hostId || i > m_hostId2 ) continue;
// do not send to ourselves until sent to others
if ( m_ourselvesLast && i == g_hostdb.m_hostId && i<m_numHosts)
continue;
// if we are now sending to ourselves, check to make sure
// all replies are back and we ourselves are in the docid range
// if one was given
if ( m_ourselvesLast && i == total-1 ) {
// and we must have gotten back all the replies...
// or at least error messages like ETIMEDOUT
if ( m_numReplies < m_sendTotal - 1 ) continue;
// and we must be in range, if one was given
if ( m_hostId >= 0 && m_hostId2 >= 0 ) {
if ( g_hostdb.m_hostId < m_hostId ) continue;
if ( g_hostdb.m_hostId > m_hostId2 ) continue;
}
}
// count it
m_numRequests++;
// get the ith host
Host *h = g_hostdb.getHost ( i );
if ( m_sendToProxy )
h = g_hostdb.getProxy(i);
// . if we are the "last i", that means us
// . we do ourselves last in case of a "save & exit" request
if ( i == m_numHosts ) {
h = g_hostdb.getHost(g_hostdb.m_hostId);
if (m_sendToProxy)
h = g_hostdb.getProxy(g_hostdb.m_hostId);
}
// did we have one explicitly given?
// ... and make sure we are not a range of hostids...
if ( m_hostId >= 0 && m_hostId2 == -1 ) {
h = g_hostdb.getHost (m_hostId);
if ( m_sendToProxy )
h = g_hostdb.getProxy ( m_hostId );
}
// debug
log(LOG_INIT,"admin: sending to hostid #%li.",h->m_hostId);
// timeout is shorter if host is dead
long timeout = 30000; // 30 seconds
// only 7 seconds if it is dead seemingly
if ( g_hostdb.isDead ( h ) ) timeout = 7000;
// continue;
// . launch it
// . returns false if blocked, true otherwise
// . sets g_errno on error
TcpServer *tcp = &g_httpServer.m_tcp;
if ( tcp->sendMsg ( h->m_ip ,
h->m_httpPort,
m_buf ,
m_bufSize ,
m_bufLen ,
m_bufLen ,
this , // state
gotReply ,
timeout , // 5000, timeout
100*1024 , // maxTextDocLen
100*1024 )){ // maxOtherDocLen
log("admin: Could not send configuration request "
"to hostId #%li (%s:%li): %s.",h->m_hostId,
iptoa(h->m_ip),(long)h->m_port,mstrerror(g_errno));
g_errno = 0;//return true;
m_numReplies++;
}
// all done if only one host... and not a range
if ( m_hostId >= 0 && m_hostId2 == -1 ) break;
// it blocked
//return false;
}
// return false if we blocked
//if ( m_numReplies < m_numRequests ) return false;
// do not finish until we got them all
if ( m_numReplies < m_sendTotal ) return false;
return true;
}
void gotReply ( void *state , TcpSocket *s ) {
// send another
Msg28 *THIS = (Msg28 *)state;
// count em
THIS->m_numReplies++;
// do not free send buffer
s->m_sendBuf = NULL;
// debug
Host *h = g_hostdb.getTcpHost ( s->m_ip , s->m_port );
//if (THIS->m_sendToProxy)
// h = g_hostdb.getProxyFromTcpPort ( s->m_ip , s->m_port );
log(LOG_INIT,"admin: got reply from hostid #%li.",h->m_hostId);
//slot->m_readBufSize,h->m_hostId);
// log errors
if ( g_errno ) {
if ( h ) log("admin: Error broadcasting config request to "
"hostid #%li (%s:%li): %s.",
h->m_hostId,iptoa(h->m_ip),(long)s->m_port,
mstrerror(g_errno));
else log("admin: Error broadcasting config request: "
"%s.",mstrerror(g_errno));
g_errno = 0;
}
// try to send more
if ( ! THIS->doSendLoop ( ) ) return;
// do we have all the replies?
//if ( THIS->m_numReplies < THIS->m_numRequests ) return;
// do not finish until we got them all
if ( THIS->m_hostId < 0 && THIS->m_numReplies < THIS->m_sendTotal )
return;
if ( THIS->m_hostId >= 0 && THIS->m_hostId2 >= 0 &&
THIS->m_numReplies < THIS->m_sendTotal )
return;
// all done, free the buf here
if ( THIS->m_freeBuf )
mfree ( THIS->m_buf , THIS->m_bufSize , "Msg28" );
THIS->m_buf = NULL;
// all done if did not block
THIS->m_callback ( THIS->m_state );
}