many more fixes for streaming mode

This commit is contained in:
Matt Wells 2014-02-06 18:21:22 -08:00
parent 874311ae52
commit 8d534b8ed8
6 changed files with 146 additions and 53 deletions

View File

@ -1585,7 +1585,7 @@ Msg20.o: Msg20.cpp gb-include.h types.h fctypes.h Unicode.h \
Posdb.h TopTree.h Clusterdb.h IndexTable2.h Msg51.h Msg17.h \
IndexReadInfo.h Msg3a.h Stats.h PostQueryRerank.h Sanity.h Msg1.h \
Datedb.h SiteGetter.h Title.h Address.h zlib.h zconf.h Spider.h \
HttpMime.h
HttpMime.h Process.h
Msg22.o: Msg22.cpp gb-include.h types.h fctypes.h Unicode.h \
UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h Msg22.h \
Url.h ip.h Multicast.h Hostdb.h Xml.h XmlNode.h Lang.h Iso8859.h \
@ -1883,7 +1883,8 @@ Msg40.o: Msg40.cpp gb-include.h types.h fctypes.h Unicode.h \
HashTable.h Catdb.h Datedb.h LanguageIdentifier.h sort.h XmlDoc.h \
Phrases.h LangList.h Images.h Msg13.h Msge0.h Msge1.h MsgC.h Dns.h \
DnsProtocol.h Msg4.h Msg8b.h SiteGetter.h Title.h Address.h zlib.h \
zconf.h Spider.h HttpMime.h Speller.h Language.h Wiki.h
zconf.h Spider.h HttpMime.h Speller.h Language.h Wiki.h HttpServer.h \
TcpServer.h openssl/err.h PageResults.h
Msg42.o: Msg42.cpp gb-include.h types.h fctypes.h Unicode.h \
UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h Msg42.h \
UdpServer.h Mem.h Conf.h Xml.h XmlNode.h Lang.h Iso8859.h iana_charset.h \
@ -2260,7 +2261,14 @@ PageDirectory.o: PageDirectory.cpp gb-include.h types.h fctypes.h \
UdpProtocol.h Dns.h DnsProtocol.h RdbCache.h RdbList.h Multicast.h \
Threads.h Rdb.h RdbBase.h RdbScan.h BigFile.h RdbMap.h RdbDump.h \
RdbTree.h RdbMem.h RdbBuckets.h Msg5.h Msg3.h RdbMerge.h Dir.h \
HttpMime.h PageCrawlBot.h Categories.h HashTable.h PageResults.h
HttpMime.h PageCrawlBot.h Categories.h HashTable.h PageResults.h \
Language.h HashTableT.h Query.h Titledb.h DiskPageCache.h IndexList.h \
Indexdb.h Msg20.h Summary.h matches2.h Words.h StopWords.h Bits.h Pos.h \
Matches.h Domains.h CountryCode.h Tagdb.h Msg0.h Events.h Sections.h \
Dates.h Msg37.h Msg36.h Msg40.h SearchInput.h Msg39.h Msg2.h Posdb.h \
TopTree.h Clusterdb.h IndexTable2.h Msg51.h Msg17.h IndexReadInfo.h \
Msg3a.h Stats.h PostQueryRerank.h Sanity.h Msg1.h Linkdb.h Msg22.h \
CatRec.h Catdb.h Datedb.h
PageEvents.o: PageEvents.cpp gb-include.h types.h fctypes.h Unicode.h \
UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h \
Collectiondb.h SafeBuf.h Url.h ip.h HashTableX.h PingServer.h Hostdb.h \
@ -2318,7 +2326,7 @@ PageGet.o: PageGet.cpp gb-include.h types.h fctypes.h Unicode.h \
Msg40.h Msg39.h Msg37.h Posdb.h TopTree.h Clusterdb.h IndexTable2.h \
Msg51.h Msg17.h IndexReadInfo.h Msg3a.h Stats.h PostQueryRerank.h \
Sanity.h Msg1.h Datedb.h SiteGetter.h Title.h Address.h zlib.h zconf.h \
Spider.h PageResults.h
Spider.h PageResults.h Language.h
PageHosts.o: PageHosts.cpp gb-include.h types.h fctypes.h Unicode.h \
UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h \
TcpSocket.h openssl/ssl.h openssl/e_os2.h openssl/opensslconf.h \
@ -3616,7 +3624,10 @@ SearchInput.o: SearchInput.cpp gb-include.h types.h fctypes.h Unicode.h \
Indexdb.h Msg20.h Summary.h matches2.h Words.h StopWords.h Bits.h Pos.h \
Matches.h HashTableT.h Domains.h CountryCode.h Tagdb.h Events.h \
Sections.h IndexList.h Dates.h Msg22.h CatRec.h Categories.h HashTable.h \
Catdb.h geo_ip_table.h Users.h Address.h Timedb.h PageResults.h
Catdb.h geo_ip_table.h Users.h Address.h Timedb.h PageResults.h \
Language.h Msg37.h Msg36.h Msg40.h Msg39.h Posdb.h TopTree.h Clusterdb.h \
IndexTable2.h Msg51.h Msg17.h IndexReadInfo.h Msg3a.h Stats.h \
PostQueryRerank.h Sanity.h Msg1.h Datedb.h
Sections.o: Sections.cpp Sections.h HashTableX.h SafeBuf.h gb-include.h \
types.h fctypes.h Unicode.h UnicodeProperties.h UCPropTable.h iconv.h \
hash.h Errno.h Log.h Msg0.h UdpServer.h Mem.h Conf.h Xml.h XmlNode.h \

View File

@ -21,6 +21,8 @@
// node cluster....
#define MAX_OUTSTANDING_MSG20S 200
bool printHttpMime ( class State0 *st ) ;
//static void handleRequest40 ( UdpSlot *slot , long netnice );
//static void gotExternalReplyWrapper ( void *state , void *state2 ) ;
static void gotCacheReplyWrapper ( void *state );
@ -81,6 +83,7 @@ static bool gotSummaryWrapper ( void *state );
bool isSubDom(char *s , long len);
Msg40::Msg40() {
m_socketHadError = 0;
m_buf = NULL;
m_buf2 = NULL;
m_cachedResults = false;
@ -919,6 +922,9 @@ void didTaskWrapper ( void* state ) {
bool Msg40::launchMsg20s ( bool recalled ) {
// don't launch any more if client browser closed socket
if ( m_socketHadError ) { char *xx=NULL; *xx=0; }
// these are just like for passing to Msg39 above
long maxAge = 0 ;
//if ( m_si->m_rcache ) maxAge = g_conf.m_titledbMaxCacheAge;
@ -1221,6 +1227,14 @@ void doneSendingWrapper9 ( void *state , TcpSocket *sock ) {
Msg40 *THIS = (Msg40 *)state;
// the send completed, count it
THIS->m_sendsIn++;
// socket error? if client closes the socket midstream we get one.
if ( g_errno ) {
THIS->m_socketHadError = g_errno;
log("msg40: streaming socket had error: %s",
mstrerror(g_errno));
}
// clear it so we don't think it was a msg20 error below
g_errno = 0;
// try to send more... returns false if blocked on something
if ( ! THIS->gotSummary() ) return;
// all done!!!???
@ -1291,6 +1305,7 @@ bool Msg40::gotSummary ( ) {
if ( m_si && m_si->m_streamResults && ! m_printedHeader ) {
// only print header once
m_printedHeader = true;
printHttpMime ( st );
printSearchResultsHeader ( st );
}
@ -1332,7 +1347,7 @@ bool Msg40::gotSummary ( ) {
}
log("msg40: printing #%li",m_printi);
//log("msg40: printing #%li",m_printi);
// . ok, we got it, so print it and stream it
// . this might set m_hadPrintError to true
@ -1343,6 +1358,9 @@ bool Msg40::gotSummary ( ) {
m20->freeReply();
}
// set it to true on all but the last thing we send!
if ( m_si->m_streamResults )
st->m_socket->m_streamingMode = true;
// . wrap it up with Next 10 etc.
// . this is in PageResults.cpp
@ -1350,25 +1368,16 @@ bool Msg40::gotSummary ( ) {
m_printi >= m_msg3a.m_numDocIds ) {
m_printedTail = true;
printSearchResultsTail ( st );
}
// . if everything has been sent on the socket, then we are done!
// . we are likely being called from TcpServer::writeSOcketWrapper()
// calling makeCallback()
if ( m_si &&
m_si->m_streamResults &&
m_sendsIn >= m_sendsOut &&
sb->length() == 0 && m_printedTail ) {
// this will cause the socket to be destroyed immediately!
// and we are only here because our last write completed!
if ( m_sendsIn < m_sendsOut ) { char *xx=NULL;*xx=0; }
// this will be our final send
st->m_socket->m_streamingMode = false;
return true;
}
TcpServer *tcp = &g_httpServer.m_tcp;
//g_conf.m_logDebugTcp = 1;
// . transmit the chunk in sb if non-zero length
// . steals the allocated buffer from sb and stores in the
// TcpSocket::m_sendBuf, which it frees when socket is
@ -1379,6 +1388,8 @@ bool Msg40::gotSummary ( ) {
// . when we are truly done sending all the data, then we set lastChunk
// to true and TcpServer.cpp will destroy m_socket when done
if ( sb->length() &&
// did client browser close the socket on us midstream?
! m_socketHadError &&
! tcp->sendChunk ( st->m_socket ,
sb ,
this ,
@ -1389,8 +1400,14 @@ bool Msg40::gotSummary ( ) {
m_sendsOut++;
// writing on closed socket?
if ( g_errno ) {
m_socketHadError = g_errno;
log("msg40: got tcp error : %s",mstrerror(g_errno));
}
// do we need to launch another batch of summary requests?
if ( m_numRequests < m_msg3a.m_numDocIds ) {
if ( m_numRequests < m_msg3a.m_numDocIds && ! m_socketHadError ) {
// . if we can launch another, do it
// . say "true" here so it does not call us, gotSummary() and
// do a recursive stack explosion
@ -1419,6 +1436,9 @@ bool Msg40::gotSummary ( ) {
if ( m_si && m_si->m_streamResults ) {
// unless waiting for last transmit to complete
if ( m_sendsOut > m_sendsIn ) return false;
// delete everything! no, doneSendingWrapper9 does...
//mdelete(st, sizeof(State0), "msg40st0");
//delete st;
// otherwise, all done!
return true;
}
@ -4840,3 +4860,50 @@ bool Msg40::printSearchResult9 ( long ix ) {
return true;
}
bool printHttpMime ( State0 *st ) {
SearchInput *si = &st->m_si;
// grab the query
//Msg40 *msg40 = &(st->m_msg40);
//char *q = msg40->getQuery();
//long qlen = msg40->getQueryLen();
//char local[ 128000 ];
//SafeBuf sb(local, 128000);
SafeBuf *sb = &st->m_sb;
// reserve 1.5MB now!
if ( ! sb->reserve(1500000 ,"pgresbuf" ) ) // 128000) )
return true;
// just in case it is empty, make it null terminated
sb->nullTerm();
char *ct = "text/csv";
if ( si->m_format == FORMAT_JSON )
ct = "application/json";
if ( si->m_format == FORMAT_XML )
ct = "text/xml";
//if ( si->m_format == FORMAT_TEXT )
// ct = "text/plain";
if ( si->m_format == FORMAT_CSV )
ct = "text/csv";
// . if we haven't yet sent an http mime back to the user
// then do so here, the content-length will not be in there
// because we might have to call for more spiderdb data
HttpMime mime;
mime.makeMime ( -1, // totel content-lenght is unknown!
0 , // do not cache (cacheTime)
0 , // lastModified
0 , // offset
-1 , // bytesToSend
NULL , // ext
false, // POSTReply
ct, // "text/csv", // contenttype
"utf-8" , // charset
-1 , // httpstatus
NULL ); //cookie
sb->safeMemcpy(mime.getMime(),mime.getMimeLen() );
return true;
}

View File

@ -248,6 +248,7 @@ class Msg40 {
long m_sendsOut ;
long m_sendsIn ;
long m_printi ;
long m_socketHadError;
// use msg3a to get docIds

View File

@ -302,7 +302,8 @@ bool readAndSendLoop ( StateCD *st , bool readFirst ) {
return false;
}
// are we all done?
// are we all done? we still have to call sendList() to
// set socket's streamingMode to false to close things up
if ( readFirst && ! st->m_someoneNeedsMore ) {
log("crawlbot: done sending for download request");
mdelete ( st , sizeof(StateCD) , "stcd" );
@ -557,24 +558,20 @@ bool StateCD::sendList ( ) {
// (long)m_rdbId,(long)m_fmt,(long)m_someoneNeedsMore,
// (long)m_printedEndingBracket);
m_socket->m_streamingMode = true;
// if nobody needs to read more...
if ( m_rdbId == RDB_TITLEDB &&
m_fmt == FMT_JSON &&
! m_someoneNeedsMore &&
! m_printedEndingBracket ) {
if ( ! m_someoneNeedsMore && ! m_printedEndingBracket ) {
// use this for printing out urls.csv as well...
m_printedEndingBracket = true;
// end array of json objects. might be empty!
sb.safePrintf("\n]\n");
if ( m_rdbId == RDB_TITLEDB && m_fmt == FMT_JSON )
sb.safePrintf("\n]\n");
//log("adding ]. len=%li",sb.length());
}
if ( ! m_someoneNeedsMore && sb.length() == 0 ) {
// i guess the send has completed. we are likely being
// called by TcpServer::writeSocketWrapper() makeCallback()
// so take us out of streaming mode so socket can be
// immediately destroyed
m_socket->m_streamingMode = false;
return true;
// i'd like to exit streaming mode here. i fixed tcpserver.cpp
// so if we are called from makecallback() there it won't
// call destroysocket if we WERE in streamingMode just yet
m_socket->m_streamingMode = false;
}
TcpServer *tcp = &g_httpServer.m_tcp;

View File

@ -726,6 +726,14 @@ bool gotResults ( void *state ) {
SearchInput *si = &st->m_si;
// if already printed from Msg40.cpp, bail out now
if ( si->m_streamResults ) {
log("msg40: done streaming. nuking state.");
mdelete(st, sizeof(State0), "PageResults2");
delete st;
return true;
}
// shortcuts
char *coll = si->m_coll2;
long collLen = si->m_collLen2;
@ -878,9 +886,6 @@ bool gotResults ( void *state ) {
//
// if already printed from Msg40.cpp, bail out now
if ( si->m_streamResults ) return true;
// print logo, search box, results x-y, ... into st->m_sb
printSearchResultsHeader ( st );

View File

@ -1104,7 +1104,7 @@ bool TcpServer::closeLeastUsed ( long maxIdleTime ) {
// . g_errno will be set by Loop if there was a kinda socket reset error
void readSocketWrapper ( int sd , void *state ) {
// debug msg
//log("........... TcpServer::readSocketWrapper\n");
//log("........... TcpServer::readSocketWrapper %li\n",sd);
// extract our this ptr
TcpServer *THIS = (TcpServer *)state;
// get a TcpSocket from sd
@ -1391,7 +1391,7 @@ bool TcpServer::setTotalToRead ( TcpSocket *s ) {
// . we call this when socket is connected, too
void writeSocketWrapper ( int sd , void *state ) {
// debug msg
// log("........... TcpServer::writeSocketWrapper\n");
//log("........... TcpServer::writeSocketWrapper sd=%li\n",sd);
TcpServer *THIS = (TcpServer *)state;
// get the TcpSocket for this socket descriptor
TcpSocket *s = THIS->getSocket ( sd );
@ -1416,7 +1416,7 @@ void writeSocketWrapper ( int sd , void *state ) {
iptoa(s->m_ip),nowms-s->m_lastActionTime);
// . some http servers close socket as end of transmission
// . so it's not really an g_errno
g_errno = 0;
if ( ! s->m_streamingMode ) g_errno = 0;
THIS->makeCallback ( s );
THIS->destroySocket ( s );
return;
@ -1455,12 +1455,15 @@ void writeSocketWrapper ( int sd , void *state ) {
if ( status == 1 && ! s->m_readBuf ) return;
// good?
g_errno = 0;
// . otherwise, call callback on done writing or error
// . in m_streamingMode this may call another sendChunk()!!!
// OR it may set streamingMode to false.. it can only do one or
// the other and not both!!! because if it sets streamingMode to
// false then we destroy the socket below!!!! so it can't be
// sending anything new!!!
// in m_streamingMode this may call another sendChunk()!!!
// OR it may set streamingMode to false.. it can only do one or
// the other and not both!!! because if it sets streamingMode to
// false then we destroy the socket below!!!! so it can't be
// sending anything new!!!
bool wasStreaming = s->m_streamingMode;
// otherwise, call callback on done writing or error
THIS->makeCallback ( s );
// if callback changed socket status to ST_SEND_AGAIN
@ -1473,8 +1476,9 @@ void writeSocketWrapper ( int sd , void *state ) {
// goto sendAgain;
//}
// wait for it to exit streaming mode before destroying
if ( s->m_streamingMode ) return;
// we have to do a final call to writeSocket with m_streamingMode
// set to false, so don't destroy socket just yet...
if ( wasStreaming ) return;
// . destroy the socket on error, recycle on transaction completion
// . this will also unregister all our callbacks for the socket
@ -1500,7 +1504,7 @@ long TcpServer::writeSocket ( TcpSocket *s ) {
// send some stuff
long toSend = s->m_sendBufUsed - s->m_sendOffset;
// if nothing to send we are done!
if ( ! toSend ) return 1;
//if ( ! toSend ) return 1;
// get a ptr to the msg piece to send
char *msg = s->m_sendBuf;
if ( ! msg ) return 1;
@ -1702,7 +1706,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) {
log("tcp: destroying socket in streaming mode. err=%s",
mstrerror(g_errno));
// why is it being destroyed without g_errno set?
if ( ! g_errno ) { char *xx=NULL;*xx=0; }
//if ( ! g_errno ) { char *xx=NULL;*xx=0; }
//char *xx=NULL;*xx=0; }
}
@ -2276,12 +2280,18 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
s->m_totalToSend = 0;
s->m_totalSent = 0;
//
// caller must set it to true on all but the last thing they send!!
//
// let it know not to close the socket while this is set
//if ( ! lastChunk ) s->m_streamingMode = true;
//else s->m_streamingMode = false;
s->m_streamingMode = true;
//s->m_streamingMode = true;
//g_conf.m_logDebugTcp = true;
/*
g_conf.m_logDebugTcp = true;
long term = 20;
if ( sb->length() < term ) term = sb->length();
@ -2294,6 +2304,8 @@ bool TcpServer::sendChunk ( TcpSocket *s ,
long minus = 20;
if ( sb->length() < minus ) minus = sb->length() ;
log("tcp: chunkend=%s",sb->getBuf() - minus);
*/
// . start the send process
// . returns false if send did not complete