Reinstated pstats caching.

This commit is contained in:
Ulrich Germann 2014-03-24 13:27:49 +00:00
parent 2063da274d
commit 7fd87943ea
2 changed files with 63 additions and 27 deletions

View File

@ -49,16 +49,13 @@ namespace Moses
uint32_t fwd_o, uint32_t fwd_o,
uint32_t bwd_o) uint32_t bwd_o)
{ {
this->lock.lock(); boost::lock_guard<boost::mutex> guard(this->lock);
jstats& entry = this->trg[pid]; jstats& entry = this->trg[pid];
this->lock.unlock();
entry.add(w,a,cnt2,fwd_o,bwd_o); entry.add(w,a,cnt2,fwd_o,bwd_o);
if (this->good < entry.rcnt()) if (this->good < entry.rcnt())
{ {
this->lock.lock(); UTIL_THROW(util::Exception, "more joint counts than good counts:"
return false; << entry.rcnt() << "/" << this->good << "!");
// UTIL_THROW(util::Exception, "more joint counts than good counts!"
// << entry.rcnt() << "/" << this->good);
} }
return true; return true;
} }
@ -402,6 +399,8 @@ namespace Moses
} }
} }
#endif #endif
cache1.clear();
cache2.clear();
return ret; return ret;
} }

View File

@ -29,6 +29,7 @@
#include "moses/TranslationModel/UG/generic/sorting/VectorIndexSorter.h" #include "moses/TranslationModel/UG/generic/sorting/VectorIndexSorter.h"
#include "moses/TranslationModel/UG/generic/sampling/Sampling.h" #include "moses/TranslationModel/UG/generic/sampling/Sampling.h"
#include "moses/TranslationModel/UG/generic/file_io/ug_stream.h" #include "moses/TranslationModel/UG/generic/file_io/ug_stream.h"
#include "moses/TranslationModel/UG/generic/threading/ug_thread_safe_counter.h"
#include "moses/Util.h" #include "moses/Util.h"
#include "util/exception.hh" #include "util/exception.hh"
@ -47,7 +48,7 @@
using namespace ugdiss; using namespace ugdiss;
using namespace std; using namespace std;
namespace Moses { namespace Moses {
class Mmsapt;
namespace bitext namespace bitext
{ {
using namespace ugdiss; using namespace ugdiss;
@ -133,7 +134,9 @@ namespace Moses {
uint32_t ofwd[po_other+1], obwd[po_other+1]; uint32_t ofwd[po_other+1], obwd[po_other+1];
typename boost::unordered_map<uint64_t, jstats> trg; // typedef typename boost::unordered_map<uint64_t, jstats> trg_map_t;
typedef typename std::map<uint64_t, jstats> trg_map_t;
trg_map_t trg;
pstats(); pstats();
void release(); void release();
void register_worker(); void register_worker();
@ -385,6 +388,7 @@ namespace Moses {
template<typename TKN> template<typename TKN>
class Bitext class Bitext
{ {
friend class Moses::Mmsapt;
protected: protected:
mutable boost::mutex lock; mutable boost::mutex lock;
public: public:
@ -423,15 +427,22 @@ namespace Moses {
bitvector* full_alignment, bitvector* full_alignment,
bool const flip) const; bool const flip) const;
mutable boost::unordered_map<uint64_t,sptr<pstats> > cache1,cache2; #if 1
typedef boost::unordered_map<uint64_t,sptr<pstats> > pcache_t;
#else
typedef map<uint64_t,sptr<pstats> > pcache_t;
#endif
mutable pcache_t cache1,cache2;
protected: protected:
size_t default_sample_size; size_t default_sample_size;
size_t num_workers; size_t num_workers;
size_t m_pstats_cache_threshold;
private: private:
sptr<pstats> sptr<pstats>
prep2(iter const& phrase, size_t const max_sample) const; prep2(iter const& phrase, size_t const max_sample) const;
public: public:
Bitext(size_t const max_sample=5000, size_t const xnum_workers=4); Bitext(size_t const max_sample =1000,
size_t const xnum_workers =16);
Bitext(Ttrack<Token>* const t1, Bitext(Ttrack<Token>* const t1,
Ttrack<Token>* const t2, Ttrack<Token>* const t2,
@ -440,8 +451,8 @@ namespace Moses {
TokenIndex* const v2, TokenIndex* const v2,
TSA<Token>* const i1, TSA<Token>* const i1,
TSA<Token>* const i2, TSA<Token>* const i2,
size_t const max_sample=5000, size_t const max_sample=1000,
size_t const xnum_workers=4); size_t const xnum_workers=16);
virtual void open(string const base, string const L1, string const L2) = 0; virtual void open(string const base, string const L1, string const L2) = 0;
@ -449,10 +460,13 @@ namespace Moses {
sptr<pstats> lookup(iter const& phrase) const; sptr<pstats> lookup(iter const& phrase) const;
sptr<pstats> lookup(iter const& phrase, size_t const max_sample) const; sptr<pstats> lookup(iter const& phrase, size_t const max_sample) const;
void prep(iter const& phrase) const; void prep(iter const& phrase) const;
void setDefaultSampleSize(size_t const max_samples);
void setDefaultSampleSize(size_t const max_samples);
size_t getDefaultSampleSize() const; size_t getDefaultSampleSize() const;
string toString(uint64_t pid, int isL2) const; string toString(uint64_t pid, int isL2) const;
virtual size_t revision() const { return 0; }
}; };
template<typename Token> template<typename Token>
@ -487,6 +501,7 @@ namespace Moses {
Bitext<Token>:: Bitext<Token>::
setDefaultSampleSize(size_t const max_samples) setDefaultSampleSize(size_t const max_samples)
{ {
boost::lock_guard<boost::mutex> guard(this->lock);
if (max_samples != default_sample_size) if (max_samples != default_sample_size)
{ {
cache1.clear(); cache1.clear();
@ -500,6 +515,7 @@ namespace Moses {
Bitext(size_t const max_sample, size_t const xnum_workers) Bitext(size_t const max_sample, size_t const xnum_workers)
: default_sample_size(max_sample) : default_sample_size(max_sample)
, num_workers(xnum_workers) , num_workers(xnum_workers)
, m_pstats_cache_threshold(5)
{ } { }
template<typename Token> template<typename Token>
@ -516,6 +532,7 @@ namespace Moses {
: Tx(tx), T1(t1), T2(t2), V1(v1), V2(v2), I1(i1), I2(i2) : Tx(tx), T1(t1), T2(t2), V1(v1), V2(v2), I1(i1), I2(i2)
, default_sample_size(max_sample) , default_sample_size(max_sample)
, num_workers(xnum_workers) , num_workers(xnum_workers)
, m_pstats_cache_threshold(5)
{ } { }
// agenda is a pool of jobs // agenda is a pool of jobs
@ -688,20 +705,25 @@ namespace Moses {
// assert(b); // assert(b);
for (size_t i = e1; i <= e2; ++i) for (size_t i = e1; i <= e2; ++i)
{ {
if (!j->stats->add(b->getPid(),sample_weight,aln, if (! j->stats->add(b->getPid(),sample_weight,aln,
b->approxOccurrenceCount(), b->approxOccurrenceCount(),
po_fwd,po_bwd)) po_fwd,po_bwd))
{ {
cerr << "FATAL ERROR AT " << __FILE__
<< ":" << __LINE__ << endl;
assert(0);
ostringstream msg;
for (size_t z = 0; z < j->len; ++z) for (size_t z = 0; z < j->len; ++z)
{ {
id_type tid = ag.bt.T1->sntStart(sid)[offset+z].id(); id_type tid = ag.bt.T1->sntStart(sid)[offset+z].id();
cout << (*ag.bt.V1)[tid] << " "; cerr << (*ag.bt.V1)[tid] << " ";
} }
cout << endl; cerr << endl;
for (size_t z = s; z <= i; ++z) for (size_t z = s; z <= i; ++z)
cout << (*ag.bt.V2)[(o+z)->id()] << " "; cerr << (*ag.bt.V2)[(o+z)->id()] << " ";
cout << endl; cerr << endl;
exit(1); assert(0);
UTIL_THROW(util::Exception,"Error in sampling.");
} }
if (i < e2) if (i < e2)
{ {
@ -858,7 +880,8 @@ namespace Moses {
i2.open(base+L2+".sfa", this->T2); i2.open(base+L2+".sfa", this->T2);
assert(this->T1->size() == this->T2->size()); assert(this->T1->size() == this->T2->size());
} }
template<typename TKN> template<typename TKN>
class imBitext : public Bitext<TKN> class imBitext : public Bitext<TKN>
{ {
@ -867,7 +890,9 @@ namespace Moses {
sptr<imTtrack<TKN> > myT2; sptr<imTtrack<TKN> > myT2;
sptr<imTSA<TKN> > myI1; sptr<imTSA<TKN> > myI1;
sptr<imTSA<TKN> > myI2; sptr<imTSA<TKN> > myI2;
static ThreadSafeCounter my_revision;
public: public:
size_t revision() const { return my_revision; }
void open(string const base, string const L1, string L2); void open(string const base, string const L1, string L2);
imBitext(sptr<TokenIndex> const& V1, imBitext(sptr<TokenIndex> const& V1,
sptr<TokenIndex> const& V2, sptr<TokenIndex> const& V2,
@ -885,6 +910,10 @@ namespace Moses {
}; };
template<typename TKN>
ThreadSafeCounter
imBitext<TKN>::my_revision;
template<typename TKN> template<typename TKN>
imBitext<TKN>:: imBitext<TKN>::
imBitext(size_t max_sample) imBitext(size_t max_sample)
@ -894,6 +923,7 @@ namespace Moses {
this->V2.reset(new TokenIndex()); this->V2.reset(new TokenIndex());
this->V1->setDynamic(true); this->V1->setDynamic(true);
this->V2->setDynamic(true); this->V2->setDynamic(true);
++my_revision;
} }
template<typename TKN> template<typename TKN>
@ -907,6 +937,7 @@ namespace Moses {
this->V2 = v2; this->V2 = v2;
this->V1->setDynamic(true); this->V1->setDynamic(true);
this->V2->setDynamic(true); this->V2->setDynamic(true);
++my_revision;
} }
@ -927,6 +958,8 @@ namespace Moses {
this->V1 = other.V1; this->V1 = other.V1;
this->V2 = other.V2; this->V2 = other.V2;
this->default_sample_size = other.default_sample_size; this->default_sample_size = other.default_sample_size;
this->num_workers = other.num_workers;
++my_revision;
} }
template<typename TKN> class snt_adder; template<typename TKN> class snt_adder;
@ -1256,10 +1289,10 @@ namespace Moses {
if (this->num_workers > 1) if (this->num_workers > 1)
ag->add_workers(this->num_workers); ag->add_workers(this->num_workers);
} }
typedef boost::unordered_map<uint64_t,sptr<pstats> > pcache_t;
sptr<pstats> ret; sptr<pstats> ret;
#if 1
if (max_sample == this->default_sample_size && if (max_sample == this->default_sample_size &&
phrase.approxOccurrenceCount() > 100) phrase.approxOccurrenceCount() > m_pstats_cache_threshold)
{ {
// need to test what a good caching threshold is // need to test what a good caching threshold is
// is caching here the cause of the apparent memory leak in // is caching here the cause of the apparent memory leak in
@ -1268,11 +1301,15 @@ namespace Moses {
pcache_t & cache(phrase.root == &(*this->I1) ? cache1 : cache2); pcache_t & cache(phrase.root == &(*this->I1) ? cache1 : cache2);
pcache_t::value_type entry(pid,sptr<pstats>()); pcache_t::value_type entry(pid,sptr<pstats>());
pair<pcache_t::iterator,bool> foo; pair<pcache_t::iterator,bool> foo;
foo = cache.emplace(entry); // foo = cache.emplace(entry);
foo = cache.insert(entry);
if (foo.second) foo.first->second = ag->add_job(phrase, max_sample); if (foo.second) foo.first->second = ag->add_job(phrase, max_sample);
ret = foo.first->second; ret = foo.first->second;
} }
else ret = ag->add_job(phrase, max_sample); else
#endif
ret = ag->add_job(phrase, max_sample);
return ret; return ret;
} }