From 8d534b8ed8bc0cbbfe36ea679d813f4f144e95ee Mon Sep 17 00:00:00 2001 From: Matt Wells Date: Thu, 6 Feb 2014 18:21:22 -0800 Subject: [PATCH] many more fixes for streaming mode --- Make.depend | 21 ++++++++--- Msg40.cpp | 97 ++++++++++++++++++++++++++++++++++++++++-------- Msg40.h | 1 + PageCrawlBot.cpp | 27 ++++++-------- PageResults.cpp | 11 ++++-- TcpServer.cpp | 42 +++++++++++++-------- 6 files changed, 146 insertions(+), 53 deletions(-) diff --git a/Make.depend b/Make.depend index bd3e253d..525b892b 100644 --- a/Make.depend +++ b/Make.depend @@ -1585,7 +1585,7 @@ Msg20.o: Msg20.cpp gb-include.h types.h fctypes.h Unicode.h \ Posdb.h TopTree.h Clusterdb.h IndexTable2.h Msg51.h Msg17.h \ IndexReadInfo.h Msg3a.h Stats.h PostQueryRerank.h Sanity.h Msg1.h \ Datedb.h SiteGetter.h Title.h Address.h zlib.h zconf.h Spider.h \ - HttpMime.h + HttpMime.h Process.h Msg22.o: Msg22.cpp gb-include.h types.h fctypes.h Unicode.h \ UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h Msg22.h \ Url.h ip.h Multicast.h Hostdb.h Xml.h XmlNode.h Lang.h Iso8859.h \ @@ -1883,7 +1883,8 @@ Msg40.o: Msg40.cpp gb-include.h types.h fctypes.h Unicode.h \ HashTable.h Catdb.h Datedb.h LanguageIdentifier.h sort.h XmlDoc.h \ Phrases.h LangList.h Images.h Msg13.h Msge0.h Msge1.h MsgC.h Dns.h \ DnsProtocol.h Msg4.h Msg8b.h SiteGetter.h Title.h Address.h zlib.h \ - zconf.h Spider.h HttpMime.h Speller.h Language.h Wiki.h + zconf.h Spider.h HttpMime.h Speller.h Language.h Wiki.h HttpServer.h \ + TcpServer.h openssl/err.h PageResults.h Msg42.o: Msg42.cpp gb-include.h types.h fctypes.h Unicode.h \ UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h Msg42.h \ UdpServer.h Mem.h Conf.h Xml.h XmlNode.h Lang.h Iso8859.h iana_charset.h \ @@ -2260,7 +2261,14 @@ PageDirectory.o: PageDirectory.cpp gb-include.h types.h fctypes.h \ UdpProtocol.h Dns.h DnsProtocol.h RdbCache.h RdbList.h Multicast.h \ Threads.h Rdb.h RdbBase.h RdbScan.h BigFile.h RdbMap.h RdbDump.h \ RdbTree.h RdbMem.h RdbBuckets.h Msg5.h Msg3.h RdbMerge.h Dir.h \ - HttpMime.h PageCrawlBot.h Categories.h HashTable.h PageResults.h + HttpMime.h PageCrawlBot.h Categories.h HashTable.h PageResults.h \ + Language.h HashTableT.h Query.h Titledb.h DiskPageCache.h IndexList.h \ + Indexdb.h Msg20.h Summary.h matches2.h Words.h StopWords.h Bits.h Pos.h \ + Matches.h Domains.h CountryCode.h Tagdb.h Msg0.h Events.h Sections.h \ + Dates.h Msg37.h Msg36.h Msg40.h SearchInput.h Msg39.h Msg2.h Posdb.h \ + TopTree.h Clusterdb.h IndexTable2.h Msg51.h Msg17.h IndexReadInfo.h \ + Msg3a.h Stats.h PostQueryRerank.h Sanity.h Msg1.h Linkdb.h Msg22.h \ + CatRec.h Catdb.h Datedb.h PageEvents.o: PageEvents.cpp gb-include.h types.h fctypes.h Unicode.h \ UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h \ Collectiondb.h SafeBuf.h Url.h ip.h HashTableX.h PingServer.h Hostdb.h \ @@ -2318,7 +2326,7 @@ PageGet.o: PageGet.cpp gb-include.h types.h fctypes.h Unicode.h \ Msg40.h Msg39.h Msg37.h Posdb.h TopTree.h Clusterdb.h IndexTable2.h \ Msg51.h Msg17.h IndexReadInfo.h Msg3a.h Stats.h PostQueryRerank.h \ Sanity.h Msg1.h Datedb.h SiteGetter.h Title.h Address.h zlib.h zconf.h \ - Spider.h PageResults.h + Spider.h PageResults.h Language.h PageHosts.o: PageHosts.cpp gb-include.h types.h fctypes.h Unicode.h \ UnicodeProperties.h UCPropTable.h iconv.h hash.h Errno.h Log.h \ TcpSocket.h openssl/ssl.h openssl/e_os2.h openssl/opensslconf.h \ @@ -3616,7 +3624,10 @@ SearchInput.o: SearchInput.cpp gb-include.h types.h fctypes.h Unicode.h \ Indexdb.h Msg20.h Summary.h matches2.h Words.h StopWords.h Bits.h Pos.h \ Matches.h HashTableT.h Domains.h CountryCode.h Tagdb.h Events.h \ Sections.h IndexList.h Dates.h Msg22.h CatRec.h Categories.h HashTable.h \ - Catdb.h geo_ip_table.h Users.h Address.h Timedb.h PageResults.h + Catdb.h geo_ip_table.h Users.h Address.h Timedb.h PageResults.h \ + Language.h Msg37.h Msg36.h Msg40.h Msg39.h Posdb.h TopTree.h Clusterdb.h \ + IndexTable2.h Msg51.h Msg17.h IndexReadInfo.h Msg3a.h Stats.h \ + PostQueryRerank.h Sanity.h Msg1.h Datedb.h Sections.o: Sections.cpp Sections.h HashTableX.h SafeBuf.h gb-include.h \ types.h fctypes.h Unicode.h UnicodeProperties.h UCPropTable.h iconv.h \ hash.h Errno.h Log.h Msg0.h UdpServer.h Mem.h Conf.h Xml.h XmlNode.h \ diff --git a/Msg40.cpp b/Msg40.cpp index b79e1398..bf6f56e9 100644 --- a/Msg40.cpp +++ b/Msg40.cpp @@ -21,6 +21,8 @@ // node cluster.... #define MAX_OUTSTANDING_MSG20S 200 +bool printHttpMime ( class State0 *st ) ; + //static void handleRequest40 ( UdpSlot *slot , long netnice ); //static void gotExternalReplyWrapper ( void *state , void *state2 ) ; static void gotCacheReplyWrapper ( void *state ); @@ -81,6 +83,7 @@ static bool gotSummaryWrapper ( void *state ); bool isSubDom(char *s , long len); Msg40::Msg40() { + m_socketHadError = 0; m_buf = NULL; m_buf2 = NULL; m_cachedResults = false; @@ -919,6 +922,9 @@ void didTaskWrapper ( void* state ) { bool Msg40::launchMsg20s ( bool recalled ) { + // don't launch any more if client browser closed socket + if ( m_socketHadError ) { char *xx=NULL; *xx=0; } + // these are just like for passing to Msg39 above long maxAge = 0 ; //if ( m_si->m_rcache ) maxAge = g_conf.m_titledbMaxCacheAge; @@ -1221,6 +1227,14 @@ void doneSendingWrapper9 ( void *state , TcpSocket *sock ) { Msg40 *THIS = (Msg40 *)state; // the send completed, count it THIS->m_sendsIn++; + // socket error? if client closes the socket midstream we get one. + if ( g_errno ) { + THIS->m_socketHadError = g_errno; + log("msg40: streaming socket had error: %s", + mstrerror(g_errno)); + } + // clear it so we don't think it was a msg20 error below + g_errno = 0; // try to send more... returns false if blocked on something if ( ! THIS->gotSummary() ) return; // all done!!!??? @@ -1291,6 +1305,7 @@ bool Msg40::gotSummary ( ) { if ( m_si && m_si->m_streamResults && ! m_printedHeader ) { // only print header once m_printedHeader = true; + printHttpMime ( st ); printSearchResultsHeader ( st ); } @@ -1332,7 +1347,7 @@ bool Msg40::gotSummary ( ) { } - log("msg40: printing #%li",m_printi); + //log("msg40: printing #%li",m_printi); // . ok, we got it, so print it and stream it // . this might set m_hadPrintError to true @@ -1343,6 +1358,9 @@ bool Msg40::gotSummary ( ) { m20->freeReply(); } + // set it to true on all but the last thing we send! + if ( m_si->m_streamResults ) + st->m_socket->m_streamingMode = true; // . wrap it up with Next 10 etc. // . this is in PageResults.cpp @@ -1350,25 +1368,16 @@ bool Msg40::gotSummary ( ) { m_printi >= m_msg3a.m_numDocIds ) { m_printedTail = true; printSearchResultsTail ( st ); - } - - - // . if everything has been sent on the socket, then we are done! - // . we are likely being called from TcpServer::writeSOcketWrapper() - // calling makeCallback() - if ( m_si && - m_si->m_streamResults && - m_sendsIn >= m_sendsOut && - sb->length() == 0 && m_printedTail ) { - // this will cause the socket to be destroyed immediately! - // and we are only here because our last write completed! + if ( m_sendsIn < m_sendsOut ) { char *xx=NULL;*xx=0; } + // this will be our final send st->m_socket->m_streamingMode = false; - return true; } TcpServer *tcp = &g_httpServer.m_tcp; + //g_conf.m_logDebugTcp = 1; + // . transmit the chunk in sb if non-zero length // . steals the allocated buffer from sb and stores in the // TcpSocket::m_sendBuf, which it frees when socket is @@ -1379,6 +1388,8 @@ bool Msg40::gotSummary ( ) { // . when we are truly done sending all the data, then we set lastChunk // to true and TcpServer.cpp will destroy m_socket when done if ( sb->length() && + // did client browser close the socket on us midstream? + ! m_socketHadError && ! tcp->sendChunk ( st->m_socket , sb , this , @@ -1389,8 +1400,14 @@ bool Msg40::gotSummary ( ) { m_sendsOut++; + // writing on closed socket? + if ( g_errno ) { + m_socketHadError = g_errno; + log("msg40: got tcp error : %s",mstrerror(g_errno)); + } + // do we need to launch another batch of summary requests? - if ( m_numRequests < m_msg3a.m_numDocIds ) { + if ( m_numRequests < m_msg3a.m_numDocIds && ! m_socketHadError ) { // . if we can launch another, do it // . say "true" here so it does not call us, gotSummary() and // do a recursive stack explosion @@ -1419,6 +1436,9 @@ bool Msg40::gotSummary ( ) { if ( m_si && m_si->m_streamResults ) { // unless waiting for last transmit to complete if ( m_sendsOut > m_sendsIn ) return false; + // delete everything! no, doneSendingWrapper9 does... + //mdelete(st, sizeof(State0), "msg40st0"); + //delete st; // otherwise, all done! return true; } @@ -4840,3 +4860,50 @@ bool Msg40::printSearchResult9 ( long ix ) { return true; } + +bool printHttpMime ( State0 *st ) { + + SearchInput *si = &st->m_si; + + // grab the query + //Msg40 *msg40 = &(st->m_msg40); + //char *q = msg40->getQuery(); + //long qlen = msg40->getQueryLen(); + + //char local[ 128000 ]; + //SafeBuf sb(local, 128000); + SafeBuf *sb = &st->m_sb; + // reserve 1.5MB now! + if ( ! sb->reserve(1500000 ,"pgresbuf" ) ) // 128000) ) + return true; + // just in case it is empty, make it null terminated + sb->nullTerm(); + + char *ct = "text/csv"; + if ( si->m_format == FORMAT_JSON ) + ct = "application/json"; + if ( si->m_format == FORMAT_XML ) + ct = "text/xml"; + //if ( si->m_format == FORMAT_TEXT ) + // ct = "text/plain"; + if ( si->m_format == FORMAT_CSV ) + ct = "text/csv"; + + // . if we haven't yet sent an http mime back to the user + // then do so here, the content-length will not be in there + // because we might have to call for more spiderdb data + HttpMime mime; + mime.makeMime ( -1, // totel content-lenght is unknown! + 0 , // do not cache (cacheTime) + 0 , // lastModified + 0 , // offset + -1 , // bytesToSend + NULL , // ext + false, // POSTReply + ct, // "text/csv", // contenttype + "utf-8" , // charset + -1 , // httpstatus + NULL ); //cookie + sb->safeMemcpy(mime.getMime(),mime.getMimeLen() ); + return true; +} diff --git a/Msg40.h b/Msg40.h index f6e80c10..77118560 100644 --- a/Msg40.h +++ b/Msg40.h @@ -248,6 +248,7 @@ class Msg40 { long m_sendsOut ; long m_sendsIn ; long m_printi ; + long m_socketHadError; // use msg3a to get docIds diff --git a/PageCrawlBot.cpp b/PageCrawlBot.cpp index 2176d381..1e3893c4 100644 --- a/PageCrawlBot.cpp +++ b/PageCrawlBot.cpp @@ -302,7 +302,8 @@ bool readAndSendLoop ( StateCD *st , bool readFirst ) { return false; } - // are we all done? + // are we all done? we still have to call sendList() to + // set socket's streamingMode to false to close things up if ( readFirst && ! st->m_someoneNeedsMore ) { log("crawlbot: done sending for download request"); mdelete ( st , sizeof(StateCD) , "stcd" ); @@ -557,24 +558,20 @@ bool StateCD::sendList ( ) { // (long)m_rdbId,(long)m_fmt,(long)m_someoneNeedsMore, // (long)m_printedEndingBracket); + m_socket->m_streamingMode = true; + // if nobody needs to read more... - if ( m_rdbId == RDB_TITLEDB && - m_fmt == FMT_JSON && - ! m_someoneNeedsMore && - ! m_printedEndingBracket ) { + if ( ! m_someoneNeedsMore && ! m_printedEndingBracket ) { + // use this for printing out urls.csv as well... m_printedEndingBracket = true; // end array of json objects. might be empty! - sb.safePrintf("\n]\n"); + if ( m_rdbId == RDB_TITLEDB && m_fmt == FMT_JSON ) + sb.safePrintf("\n]\n"); //log("adding ]. len=%li",sb.length()); - } - - if ( ! m_someoneNeedsMore && sb.length() == 0 ) { - // i guess the send has completed. we are likely being - // called by TcpServer::writeSocketWrapper() makeCallback() - // so take us out of streaming mode so socket can be - // immediately destroyed - m_socket->m_streamingMode = false; - return true; + // i'd like to exit streaming mode here. i fixed tcpserver.cpp + // so if we are called from makecallback() there it won't + // call destroysocket if we WERE in streamingMode just yet + m_socket->m_streamingMode = false; } TcpServer *tcp = &g_httpServer.m_tcp; diff --git a/PageResults.cpp b/PageResults.cpp index 3eb09020..44684110 100644 --- a/PageResults.cpp +++ b/PageResults.cpp @@ -726,6 +726,14 @@ bool gotResults ( void *state ) { SearchInput *si = &st->m_si; + // if already printed from Msg40.cpp, bail out now + if ( si->m_streamResults ) { + log("msg40: done streaming. nuking state."); + mdelete(st, sizeof(State0), "PageResults2"); + delete st; + return true; + } + // shortcuts char *coll = si->m_coll2; long collLen = si->m_collLen2; @@ -878,9 +886,6 @@ bool gotResults ( void *state ) { // - // if already printed from Msg40.cpp, bail out now - if ( si->m_streamResults ) return true; - // print logo, search box, results x-y, ... into st->m_sb printSearchResultsHeader ( st ); diff --git a/TcpServer.cpp b/TcpServer.cpp index 93030bd4..eea98534 100644 --- a/TcpServer.cpp +++ b/TcpServer.cpp @@ -1104,7 +1104,7 @@ bool TcpServer::closeLeastUsed ( long maxIdleTime ) { // . g_errno will be set by Loop if there was a kinda socket reset error void readSocketWrapper ( int sd , void *state ) { // debug msg - //log("........... TcpServer::readSocketWrapper\n"); + //log("........... TcpServer::readSocketWrapper %li\n",sd); // extract our this ptr TcpServer *THIS = (TcpServer *)state; // get a TcpSocket from sd @@ -1391,7 +1391,7 @@ bool TcpServer::setTotalToRead ( TcpSocket *s ) { // . we call this when socket is connected, too void writeSocketWrapper ( int sd , void *state ) { // debug msg - // log("........... TcpServer::writeSocketWrapper\n"); + //log("........... TcpServer::writeSocketWrapper sd=%li\n",sd); TcpServer *THIS = (TcpServer *)state; // get the TcpSocket for this socket descriptor TcpSocket *s = THIS->getSocket ( sd ); @@ -1416,7 +1416,7 @@ void writeSocketWrapper ( int sd , void *state ) { iptoa(s->m_ip),nowms-s->m_lastActionTime); // . some http servers close socket as end of transmission // . so it's not really an g_errno - g_errno = 0; + if ( ! s->m_streamingMode ) g_errno = 0; THIS->makeCallback ( s ); THIS->destroySocket ( s ); return; @@ -1455,12 +1455,15 @@ void writeSocketWrapper ( int sd , void *state ) { if ( status == 1 && ! s->m_readBuf ) return; // good? g_errno = 0; - // . otherwise, call callback on done writing or error - // . in m_streamingMode this may call another sendChunk()!!! - // OR it may set streamingMode to false.. it can only do one or - // the other and not both!!! because if it sets streamingMode to - // false then we destroy the socket below!!!! so it can't be - // sending anything new!!! + + // in m_streamingMode this may call another sendChunk()!!! + // OR it may set streamingMode to false.. it can only do one or + // the other and not both!!! because if it sets streamingMode to + // false then we destroy the socket below!!!! so it can't be + // sending anything new!!! + bool wasStreaming = s->m_streamingMode; + + // otherwise, call callback on done writing or error THIS->makeCallback ( s ); // if callback changed socket status to ST_SEND_AGAIN @@ -1473,8 +1476,9 @@ void writeSocketWrapper ( int sd , void *state ) { // goto sendAgain; //} - // wait for it to exit streaming mode before destroying - if ( s->m_streamingMode ) return; + // we have to do a final call to writeSocket with m_streamingMode + // set to false, so don't destroy socket just yet... + if ( wasStreaming ) return; // . destroy the socket on error, recycle on transaction completion // . this will also unregister all our callbacks for the socket @@ -1500,7 +1504,7 @@ long TcpServer::writeSocket ( TcpSocket *s ) { // send some stuff long toSend = s->m_sendBufUsed - s->m_sendOffset; // if nothing to send we are done! - if ( ! toSend ) return 1; + //if ( ! toSend ) return 1; // get a ptr to the msg piece to send char *msg = s->m_sendBuf; if ( ! msg ) return 1; @@ -1702,7 +1706,7 @@ void TcpServer::destroySocket ( TcpSocket *s ) { log("tcp: destroying socket in streaming mode. err=%s", mstrerror(g_errno)); // why is it being destroyed without g_errno set? - if ( ! g_errno ) { char *xx=NULL;*xx=0; } + //if ( ! g_errno ) { char *xx=NULL;*xx=0; } //char *xx=NULL;*xx=0; } } @@ -2276,12 +2280,18 @@ bool TcpServer::sendChunk ( TcpSocket *s , s->m_totalToSend = 0; s->m_totalSent = 0; + // + // caller must set it to true on all but the last thing they send!! + // // let it know not to close the socket while this is set //if ( ! lastChunk ) s->m_streamingMode = true; //else s->m_streamingMode = false; - s->m_streamingMode = true; + //s->m_streamingMode = true; - //g_conf.m_logDebugTcp = true; + + /* + + g_conf.m_logDebugTcp = true; long term = 20; if ( sb->length() < term ) term = sb->length(); @@ -2294,6 +2304,8 @@ bool TcpServer::sendChunk ( TcpSocket *s , long minus = 20; if ( sb->length() < minus ) minus = sb->length() ; log("tcp: chunkend=%s",sb->getBuf() - minus); + */ + // . start the send process // . returns false if send did not complete