retry if too man docids deduped when &stream=1

This commit is contained in:
Matt Wells 2014-05-01 17:07:31 -07:00
parent 060f7da967
commit 1d766826ae
8 changed files with 164 additions and 114 deletions

View File

@ -297,8 +297,8 @@ bool Conf::init ( char *dir ) { // , long hostId ) {
log(LOG_INFO,"db: Split is FULL");
*/
// sanity check
if ( g_hostdb.m_indexSplits > MAX_INDEXDB_SPLIT ) {
log("db: Increase MAX_INDEXDB_SPLIT");
if ( g_hostdb.m_indexSplits > MAX_SHARDS ) {
log("db: Increase MAX_SHARDS");
char *xx = NULL; *xx = 0;
}
// and always keep a decent site quality cache of at least 3M

View File

@ -33,7 +33,7 @@
//#define INDEXDB_SPLIT 8
//#define DOCID_OFFSET_MASK (INDEXDB_SPLIT-1)
#define DOCID_OFFSET_MASK (g_conf.m_indexdbSplit-1)
#define MAX_INDEXDB_SPLIT 128
#define MAX_SHARDS 128
class Indexdb {

View File

@ -31,7 +31,7 @@ void Msg0::constructor ( ) {
m_msg5b = NULL;
//#ifdef SPLIT_INDEXDB
//for ( long i = 0; i < INDEXDB_SPLIT; i++ )
//for ( long i = 0; i < MAX_INDEXDB_SPLIT; i++ )
//for ( long i = 0; i < MAX_SHARDS; i++ )
// m_mcast[i].constructor();
m_mcast.constructor();
m_mcasts = NULL;
@ -726,8 +726,8 @@ void Msg0::gotSplitReply ( ) {
char *xx=NULL;*xx=0;
// get all the split lists
long totalSize = 0;
RdbList lists[MAX_INDEXDB_SPLIT];
RdbList *listPtrs[MAX_INDEXDB_SPLIT];
RdbList lists[MAX_SHARDS];
RdbList *listPtrs[MAX_SHARDS];
for ( long i = 0; i < m_numSplit; i++ ) {
listPtrs[i] = &lists[i];
long replySize;

2
Msg0.h
View File

@ -216,7 +216,7 @@ class Msg0 {
// used for multicasting the request
//#ifdef SPLIT_INDEXDB
//Multicast m_mcast[INDEXDB_SPLIT];
//Multicast m_mcast[MAX_INDEXDB_SPLIT];
//Multicast m_mcast[MAX_SHARDS];
// casting to multiple splits is obsolete, but for PageIndexdb.cpp
// we still need to do it, but we alloc for it
Multicast m_mcast;

View File

@ -65,12 +65,12 @@ class Msg36 {
//#else
// char m_reply[8];
//#endif
char m_reply[8*MAX_INDEXDB_SPLIT];
char m_reply[8*MAX_SHARDS];
// for sending the request
//#ifdef SPLIT_INDEXDB
//Multicast m_mcast[INDEXDB_SPLIT];
Multicast m_mcast[1];//MAX_INDEXDB_SPLIT];
Multicast m_mcast[1];//MAX_SHARDS];
long m_numRequests;
long m_numReplies;
long m_errno;

202
Msg3a.cpp
View File

@ -26,17 +26,17 @@ void Msg3a::constructor ( ) {
m_rbuf2.constructor();
// NULLify all the reply buffer ptrs
for ( long j = 0; j < MAX_INDEXDB_SPLIT; j++ )
for ( long j = 0; j < MAX_SHARDS; j++ )
m_reply[j] = NULL;
m_rbufPtr = NULL;
for ( long j = 0; j < MAX_INDEXDB_SPLIT; j++ )
for ( long j = 0; j < MAX_SHARDS; j++ )
m_mcast[j].constructor();
m_seoCacheList.constructor();
}
Msg3a::~Msg3a ( ) {
reset();
for ( long j = 0; j < MAX_INDEXDB_SPLIT; j++ )
for ( long j = 0; j < MAX_SHARDS; j++ )
m_mcast[j].destructor();
m_seoCacheList.freeList();
}
@ -48,12 +48,12 @@ void Msg3a::reset ( ) {
m_siteHashes26 = NULL;
// . NULLify all the reply buffer ptrs
// . have to count DOWN with "i" because of the m_reply[i-1][j] check
for ( long j = 0; j < MAX_INDEXDB_SPLIT; j++ ) {
for ( long j = 0; j < MAX_SHARDS; j++ ) {
if ( ! m_reply[j] ) continue;
mfree(m_reply[j],m_replyMaxSize[j], "Msg3aR");
m_reply[j] = NULL;
}
for ( long j = 0; j < MAX_INDEXDB_SPLIT; j++ )
for ( long j = 0; j < MAX_SHARDS; j++ )
m_mcast[j].reset();
// and the buffer that holds the final docids, etc.
if ( m_finalBuf )
@ -89,7 +89,7 @@ static void gotCacheReplyWrapper ( void *state ) {
// . sets g_errno on error
// . "query/coll" should NOT be on the stack in case we block
// . uses Msg36 to retrieve term frequencies for each termId in query
// . sends Msg39 request to get docids from each indexdb split
// . sends Msg39 request to get docids from each indexdb shard
// . merges replies together
// . we print out debug info if debug is true
// . "maxAge"/"addToCache" is talking about the clusterdb cache as well
@ -337,7 +337,7 @@ bool Msg3a::gotCacheReply ( ) {
}
}
// time how long to get each split's docids
// time how long to get each shard's docids
if ( m_debug )
m_startTime = gettimeofdayInMilliseconds();
@ -483,7 +483,7 @@ bool Msg3a::gotCacheReply ( ) {
Multicast *m = &m_mcast[i];
// clear it for transmit
m->reset();
// . send out a msg39 request to each split
// . send out a msg39 request to each shard
// . multicasts to a host in group "groupId"
// . we always block waiting for the reply with a multicast
// . returns false and sets g_errno on error
@ -532,10 +532,10 @@ bool Msg3a::gotCacheReply ( ) {
if ( m_numReplies < m_numHosts ) return false;//indexdbSplit )
// . otherwise, we did not block... error?
// . it must have been an error or just no new lists available!!
// . if we call gotAllSplitReplies() here, and we were called by
// . if we call gotAllShardReplies() here, and we were called by
// mergeLists() we end up calling mergeLists() again... bad. so
// just return true in that case.
//return gotAllSplitReplies();
//return gotAllShardReplies();
return true;
}
@ -553,7 +553,7 @@ void gotReplyWrapper3a ( void *state , void *state2 ) {
" err=%s", (long)THIS, THIS->m_numReplies ,
mstrerror(g_errno) );
// if one split times out, ignore it!
// if one shard times out, ignore it!
if ( g_errno == EQUERYTRUNCATED ||
g_errno == EUDPTIMEDOUT )
g_errno = 0;
@ -576,7 +576,7 @@ void gotReplyWrapper3a ( void *state , void *state2 ) {
// . sanity check
// . ntpd can screw with our local time and make this negative
if ( delta >= 0 ) {
// count the split
// count the shards
h->m_splitsDone++;
// accumulate the times so we can do an average display
// in PageHosts.cpp.
@ -587,8 +587,8 @@ void gotReplyWrapper3a ( void *state , void *state2 ) {
THIS->m_numReplies++;
// bail if still awaiting more replies
if ( THIS->m_numReplies < THIS->m_numHosts ) return;
// return if gotAllSplitReplies() blocked
if ( ! THIS->gotAllSplitReplies( ) ) return;
// return if gotAllShardReplies() blocked
if ( ! THIS->gotAllShardReplies( ) ) return;
// set g_errno i guess so parent knows
if ( THIS->m_errno ) g_errno = THIS->m_errno;
// call callback if we did not block, since we're here. all done.
@ -603,9 +603,9 @@ static void gotSerpdbReplyWrapper ( void *state ) {
THIS->m_callback ( THIS->m_state );
}
bool Msg3a::gotAllSplitReplies ( ) {
bool Msg3a::gotAllShardReplies ( ) {
// if any of the split requests had an error, give up and set m_errno
// if any of the shard requests had an error, give up and set m_errno
// but don't set if for non critical errors like query truncation
if ( m_errno ) {
g_errno = m_errno;
@ -705,23 +705,23 @@ bool Msg3a::gotAllSplitReplies ( ) {
if ( mr->m_nqt != m_q->getNumTerms() ) {
g_errno = EBADREPLY;
m_errno = EBADREPLY;
log("query: msg3a: Split reply qterms=%li != %li.",
log("query: msg3a: Shard reply qterms=%li != %li.",
(long)mr->m_nqt,(long)m_q->getNumTerms() );
return true;
}
// return if split had an error, but not for a non-critical
// return if shard had an error, but not for a non-critical
// error like query truncation
if ( mr->m_errno && mr->m_errno != EQUERYTRUNCATED ) {
g_errno = mr->m_errno;
m_errno = mr->m_errno;
log("query: msg3a: Split had error: %s",
log("query: msg3a: Shard had error: %s",
mstrerror(g_errno));
return true;
}
// skip down here if reply was already set
//skip:
// add of the total hits from each split, this is how many
// total results the lastest split is estimated to be able to
// add of the total hits from each shard, this is how many
// total results the lastest shard is estimated to be able to
// return
// . THIS should now be exact since we read all termlists
// of posdb...
@ -732,12 +732,12 @@ bool Msg3a::gotAllSplitReplies ( ) {
// cast these for printing out
long long *docIds = (long long *)mr->ptr_docIds;
double *scores = (double *)mr->ptr_scores;
// print out every docid in this split reply
// print out every docid in this shard reply
for ( long j = 0; j < mr->m_numDocIds ; j++ ) {
// print out score_t
logf( LOG_DEBUG,
"query: msg3a: [%lu] %03li) "
"split=%li docId=%012llu domHash=0x%02lx "
"shard=%li docId=%012llu domHash=0x%02lx "
"score=%f" ,
(unsigned long)this ,
j ,
@ -849,13 +849,13 @@ bool Msg3a::mergeLists ( ) {
// shortcut
//long numSplits = m_numHosts;//indexdbSplit;
// . point to the various docids, etc. in each split reply
// . point to the various docids, etc. in each shard reply
// . tcPtr = term count. how many required query terms does the doc
// have? formerly called topExplicits in IndexTable2.cpp
long long *diPtr [MAX_INDEXDB_SPLIT];
double *rsPtr [MAX_INDEXDB_SPLIT];
key_t *ksPtr [MAX_INDEXDB_SPLIT];
long long *diEnd [MAX_INDEXDB_SPLIT];
long long *diPtr [MAX_SHARDS];
double *rsPtr [MAX_SHARDS];
key_t *ksPtr [MAX_SHARDS];
long long *diEnd [MAX_SHARDS];
for ( long j = 0; j < m_numHosts ; j++ ) {
Msg39Reply *mr =m_reply[j];
// if we have gbdocid:| in query this could be NULL
@ -953,7 +953,7 @@ bool Msg3a::mergeLists ( ) {
return true;
//
// ***MERGE ALL SPLITS INTO m_docIds[], etc.***
// ***MERGE ALL SHARDS INTO m_docIds[], etc.***
//
// . merge all lists in m_replyDocIds[splitNum]
// . we may be re-called later after m_docsToGet is increased
@ -966,7 +966,7 @@ bool Msg3a::mergeLists ( ) {
//Msg39Reply *mr;
long hslot;
// get the next highest-scoring docids from all split lists
// get the next highest-scoring docids from all shard termlists
for ( long j = 0; j < m_numHosts; j++ ) {
// . skip exhausted lists
// . these both should be NULL if reply was skipped because
@ -1026,82 +1026,84 @@ bool Msg3a::mergeLists ( ) {
// . only add it to the final list if the docid is "unique"
// . BUT since different event ids share the same docid, exception!
if ( hslot < 0 ) {
// always inc this
//m_totalDocCount++;
// only do this if we need more
if ( m_numDocIds < m_docsToGet ) {
// get DocIdScore class for this docid
Msg39Reply *mr = m_reply[maxj];
// point to the array of DocIdScores
DocIdScore *ds = (DocIdScore *)mr->ptr_scoreInfo;
long nds = mr->size_scoreInfo/sizeof(DocIdScore);
DocIdScore *dp = NULL;
for ( long i = 0 ; i < nds ; i++ ) {
if ( ds[i].m_docId != *diPtr[maxj] ) continue;
dp = &ds[i];
break;
}
// add the max to the final merged lists
m_docIds [m_numDocIds] = *diPtr[maxj];
if ( hslot >= 0 ) goto skip; // < 0 ) {
// wtf?
if ( ! dp ) {
// this is empty if no scoring info
// supplied!
if ( m_r->m_getDocIdScoringInfo )
log("msg3a: CRAP! got empty score "
"info for "
"d=%lli",
m_docIds[m_numDocIds]);
//char *xx=NULL; *xx=0; 261561804684
// qry = www.yahoo
}
// point to the single DocIdScore for this docid
m_scoreInfos[m_numDocIds] = dp;
// always inc this
//m_totalDocCount++;
// only do this if we need more
if ( m_numDocIds < m_docsToGet ) {
// get DocIdScore class for this docid
Msg39Reply *mr = m_reply[maxj];
// point to the array of DocIdScores
DocIdScore *ds = (DocIdScore *)mr->ptr_scoreInfo;
long nds = mr->size_scoreInfo/sizeof(DocIdScore);
DocIdScore *dp = NULL;
for ( long i = 0 ; i < nds ; i++ ) {
if ( ds[i].m_docId != *diPtr[maxj] ) continue;
dp = &ds[i];
break;
}
// add the max to the final merged lists
m_docIds [m_numDocIds] = *diPtr[maxj];
// reset this just in case
if ( dp ) {
dp->m_singleScores = NULL;
dp->m_pairScores = NULL;
}
// wtf?
if ( ! dp ) {
// this is empty if no scoring info
// supplied!
if ( m_r->m_getDocIdScoringInfo )
log("msg3a: CRAP! got empty score "
"info for "
"d=%lli",
m_docIds[m_numDocIds]);
//char *xx=NULL; *xx=0; 261561804684
// qry = www.yahoo
}
// point to the single DocIdScore for this docid
m_scoreInfos[m_numDocIds] = dp;
// now fix DocIdScore::m_pairScores and m_singleScores
// ptrs so they reference into the
// Msg39Reply::ptr_pairScoreBuf and ptr_singleSingleBuf
// like they should. it seems we do not free the
// Msg39Replies so we should be ok referencing them.
if ( dp && dp->m_singlesOffset >= 0 )
dp->m_singleScores =
(SingleScore *)(mr->ptr_singleScoreBuf+
dp->m_singlesOffset) ;
if ( dp && dp->m_pairsOffset >= 0 )
dp->m_pairScores =
(PairScore *)(mr->ptr_pairScoreBuf +
dp->m_pairsOffset );
// reset this just in case
if ( dp ) {
dp->m_singleScores = NULL;
dp->m_pairScores = NULL;
}
// now fix DocIdScore::m_pairScores and m_singleScores
// ptrs so they reference into the
// Msg39Reply::ptr_pairScoreBuf and ptr_singleSingleBuf
// like they should. it seems we do not free the
// Msg39Replies so we should be ok referencing them.
if ( dp && dp->m_singlesOffset >= 0 )
dp->m_singleScores =
(SingleScore *)(mr->ptr_singleScoreBuf+
dp->m_singlesOffset) ;
if ( dp && dp->m_pairsOffset >= 0 )
dp->m_pairScores =
(PairScore *)(mr->ptr_pairScoreBuf +
dp->m_pairsOffset );
// turn it into a float, that is what rscore_t is.
// we do this to make it easier for PostQueryRerank.cpp
m_scores [m_numDocIds]=(double)*rsPtr[maxj];
if ( m_r->m_doSiteClustering )
m_clusterRecs[m_numDocIds]= *ksPtr[maxj];
// clear this out
//m_eventIdBits[m_numDocIds].clear();
// set this for use below
hslot = m_numDocIds;
// point to next available slot to add to
m_numDocIds++;
}
// if it has ALL the required query terms, count it
//if ( *bsPtr[maxj] & 0x60 ) m_numAbove++;
// . add it, this should be pre-allocated!
// . returns false and sets g_errno on error
if ( ! htable.addKey(*diPtr[maxj],1) ) return true;
// turn it into a float, that is what rscore_t is.
// we do this to make it easier for PostQueryRerank.cpp
m_scores [m_numDocIds]=(double)*rsPtr[maxj];
if ( m_r->m_doSiteClustering )
m_clusterRecs[m_numDocIds]= *ksPtr[maxj];
// clear this out
//m_eventIdBits[m_numDocIds].clear();
// set this for use below
hslot = m_numDocIds;
// point to next available slot to add to
m_numDocIds++;
}
// if it has ALL the required query terms, count it
//if ( *bsPtr[maxj] & 0x60 ) m_numAbove++;
// . add it, this should be pre-allocated!
// . returns false and sets g_errno on error
if ( ! htable.addKey(*diPtr[maxj],1) ) return true;
skip:
// increment the split pointers from which we took the max
// increment the shard pointers from which we took the max
rsPtr[maxj]++;
diPtr[maxj]++;
ksPtr[maxj]++;
@ -1113,7 +1115,7 @@ bool Msg3a::mergeLists ( ) {
if ( m_debug ) {
// show how long it took
logf( LOG_DEBUG,"query: msg3a: [%lu] merged %li docs from %li "
"splits in %llu ms. "
"shards in %llu ms. "
,
(unsigned long)this,
m_numDocIds, (long)m_numHosts,
@ -1138,7 +1140,7 @@ bool Msg3a::mergeLists ( ) {
}
// if we had a full split, we should have gotten the cluster recs
// from each split already
// from each shard already
memset ( m_clusterLevels , CR_OK , m_numDocIds );
return true;

10
Msg3a.h
View File

@ -80,7 +80,7 @@ public:
return m_numTotalEstimatedHits; };
// called when we got a reply of docIds
bool gotAllSplitReplies ( );
bool gotAllShardReplies ( );
bool gotCacheReply ( );
@ -135,13 +135,13 @@ public:
float m_termFreqWeights[MAX_QUERY_TERMS];
// a multicast class to send the request, one for each split
Multicast m_mcast[MAX_INDEXDB_SPLIT];
Multicast m_mcast[MAX_SHARDS];
// for timing how long things take
long long m_startTime;
// this buffer should be big enough to hold all requests
//char m_request [MAX_MSG39_REQUEST_SIZE * MAX_INDEXDB_SPLIT];
//char m_request [MAX_MSG39_REQUEST_SIZE * MAX_SHARDS];
long m_numReplies;
// . # estimated total hits
@ -157,8 +157,8 @@ public:
SafeBuf m_rbuf2;
// each split gives us a reply
class Msg39Reply *m_reply [MAX_INDEXDB_SPLIT];
long m_replyMaxSize[MAX_INDEXDB_SPLIT];
class Msg39Reply *m_reply [MAX_SHARDS];
long m_replyMaxSize[MAX_SHARDS];
char m_debug;

View File

@ -1789,6 +1789,54 @@ bool Msg40::gotSummary ( ) {
// delete everything! no, doneSendingWrapper9 does...
//mdelete(st, sizeof(State0), "msg40st0");
//delete st;
// if we did not ask for enough docids and they were mostly
// dups so they got deduped, then ask for more.
// m_numDisplayed includes results before the &s=X parm.
// and so does m_docsToGetVisiable, so we can compare them.
if ( m_numDisplayed < m_docsToGetVisible &&
// this is true if we can get more docids from merging
// more of the termlists from the shards together.
// otherwise, we will have to ask each shard for a
// higher number of docids.
m_msg3a.m_moreDocIdsAvail ) { //&&
// doesn't work on multi-coll just yet, it cores.
// MAKE it.
//m_numCollsToSearch == 1 ) {
// can it cover us?
long need = m_msg3a.m_docsToGet + 20;
// note it
log("msg40: too many summaries deduped. "
"getting more "
"docids from msg3a merge and getting summaries. "
"%li are visible, need %li. "
"changing docsToGet from %li to %li. "
"numReplies=%li numRequests=%li",
m_numDisplayed,
m_docsToGetVisible,
m_msg3a.m_docsToGet,
need,
m_numReplies,
m_numRequests);
// merge more docids from the shards' termlists
m_msg3a.m_docsToGet = need;
m_msg3a.mergeLists();
// . rellaoc the msg20 array
// . i don't think we need to do this when streaming...
//if ( ! reallocMsg20Buf() ) return true;
// . reset this before launch
// . not for streaming!
//m_numReplies = 0;
//m_numRequests = 0;
// reprocess all!
//m_lastProcessedi = -1;
// now launch!
if ( ! launchMsg20s ( true ) ) return false;
// all done, call callback
return true;
}
// otherwise, all done!
return true;
}