Overhaul of TargetPhraseCollectionCache.

This commit is contained in:
Ulrich Germann 2015-10-08 18:32:58 +01:00
parent 28f67fac14
commit 1acb7dd622
3 changed files with 109 additions and 177 deletions

View File

@ -4,182 +4,111 @@ namespace Moses
{
using std::vector;
#if defined(timespec)
bool operator<(timespec const& a, timespec const& b)
{
if (a.tv_sec != b.tv_sec) return a.tv_sec < b.tv_sec;
return (a.tv_nsec < b.tv_nsec);
}
bool operator>=(timespec const& a, timespec const& b)
{
if (a.tv_sec != b.tv_sec) return a.tv_sec > b.tv_sec;
return (a.tv_nsec >= b.tv_nsec);
}
#endif
bool operator<(timeval const& a, timeval const& b)
{
if (a.tv_sec != b.tv_sec) return a.tv_sec < b.tv_sec;
return (a.tv_usec < b.tv_usec);
}
bool operator>=(timeval const& a, timeval const& b)
{
if (a.tv_sec != b.tv_sec) return a.tv_sec > b.tv_sec;
return (a.tv_usec >= b.tv_usec);
}
void
bubble_up(std::vector<TPCollWrapper*>& v, size_t k)
{
if (k >= v.size()) return;
for (;k && (v[k]->tstamp < v[k/2]->tstamp); k /=2)
{
std::swap(v[k],v[k/2]);
std::swap(v[k]->idx,v[k/2]->idx);
}
}
void
bubble_down(std::vector<TPCollWrapper*>& v, size_t k)
{
for (size_t j = 2*(k+1); j <= v.size(); j = 2*((k=j)+1))
{
if (j == v.size() || (v[j-1]->tstamp < v[j]->tstamp)) --j;
if (v[j]->tstamp >= v[k]->tstamp) break;
std::swap(v[k],v[j]);
v[k]->idx = k;
v[j]->idx = j;
}
}
TPCollCache
::TPCollCache(size_t capacity)
{
m_history.reserve(capacity);
m_doomed_first = m_doomed_last = NULL;
m_doomed_count = 0;
m_capacity = capacity;
}
TPCollWrapper*
/// remove a TPC from the "doomed" queue
void
TPCollCache
::encache(TPCollWrapper* const& ptr)
::remove_from_queue(TPCollWrapper* x)
{
using namespace boost;
// update time stamp:
#if defined(timespec)
clock_gettime(CLOCK_MONOTONIC, &ptr->tstamp);
#else
gettimeofday(&ptr->tstamp, NULL);
#endif
unique_lock<shared_mutex> lock(m_history_lock);
if (m_history.capacity() > 1)
{
vector<TPCollWrapper*>& v = m_history;
if (ptr->idx >= 0) // ptr is already in history
{
assert(ptr == v[ptr->idx]);
size_t k = 2 * (ptr->idx + 1);
if (k < v.size()) bubble_up(v,k--);
if (k < v.size()) bubble_up(v,k);
}
else if (v.size() < v.capacity())
{
size_t k = ptr->idx = v.size();
v.push_back(ptr);
bubble_up(v,k);
}
else // someone else needs to go
{
v[0]->idx = -1;
release(v[0]);
v[0] = ptr;
bubble_down(v,0);
}
}
return ptr;
} // TPCollCache::encache(...)
// caller must lock!
if (m_doomed_first == x)
m_doomed_first = x->next;
else (x->prev->next) = x->next;
if (m_doomed_last == x)
m_doomed_last = x->prev;
else x->next->prev = x->prev;
x->next = x->prev = NULL;
--m_doomed_count;
}
void
TPCollCache
::add_to_queue(TPCollWrapper* x)
{
// caller must lock!
x->prev = m_doomed_last;
if (!m_doomed_first) m_doomed_first = x;
(m_doomed_last ? m_doomed_last->next : m_doomed_last) = x;
++m_doomed_count;
}
TPCollWrapper*
TPCollCache
::get(uint64_t key, size_t revision)
{
using namespace boost;
cache_t::iterator m;
{
shared_lock<shared_mutex> lock(m_cache_lock);
m = m_cache.find(key);
if (m == m_cache.end() || m->second->revision != revision)
return NULL;
++m->second->refCount;
}
encache(m->second);
// return NULL;
upgrade_lock<shared_mutex> rlock(m_lock);
cache_t::iterator m = m_cache.find(key);
if (m == m_cache.end()) // new
{
std::pair<uint64_t,TPCollWrapper*> e(key,NULL);
upgrade_to_unique_lock<shared_mutex> wlock(rlock);
std::pair<cache_t::iterator,bool> foo = m_cache.insert(e);
if (foo.second) foo.first->second = new TPCollWrapper(key, revision);
m = foo.first;
}
else
{
if (m->second->refCount == 0)
{
upgrade_to_unique_lock<shared_mutex> wlock(rlock);
remove_from_queue(m->second);
}
if (m->second->revision != revision) // out of date
{
upgrade_to_unique_lock<shared_mutex> wlock(rlock);
m->second = new TPCollWrapper(key, revision);
}
}
++m->second->refCount;
return m->second;
} // TPCollCache::get(...)
void
TPCollCache
::add(uint64_t key, TPCollWrapper* ptr)
{
{
boost::unique_lock<boost::shared_mutex> lock(m_cache_lock);
m_cache[key] = ptr;
++ptr->refCount;
// ++m_tpc_ctr;
}
encache(ptr);
} // TPCollCache::add(...)
void
TPCollCache
::release(TPCollWrapper*& ptr)
::release(TPCollWrapper* ptr)
{
if (!ptr) return;
boost::upgrade_lock<boost::shared_mutex> lock(m_cache_lock);
if (--ptr->refCount || ptr->idx >= 0) // tpc is still in use
if (--ptr->refCount == 0)
{
ptr = NULL;
return;
boost::unique_lock<boost::shared_mutex> lock(m_lock);
if (m_doomed_count == m_capacity)
{
TPCollWrapper* x = m_doomed_first;
remove_from_queue(x);
UTIL_THROW_IF2(x->refCount || x == ptr, "TPC was doomed while still in use!");
cache_t::iterator m = m_cache.find(ptr->key);
if (m != m_cache.end() && m->second == ptr)
{ // the cache could have been updated with a new pointer
// for the same phrase already, so we need to check
// if the pointer we cound is the one we want to get rid of,
// hence the second check
// boost::upgrade_to_unique_lock<boost::shared_mutex> xlock(lock);
m_cache.erase(m);
}
delete x;
}
add_to_queue(ptr);
}
#if 0
timespec t; clock_gettime(CLOCK_MONOTONIC,&t);
timespec r; clock_getres(CLOCK_MONOTONIC,&r);
float delta = t.tv_sec - ptr->tstamp.tv_sec;
cerr << "deleting old cache entry after " << delta << " seconds."
<< " clock resolution is " << r.tv_sec << ":" << r.tv_nsec
<< " at " << __FILE__ << ":" << __LINE__ << endl;
#endif
cache_t::iterator m = m_cache.find(ptr->key);
if (m != m_cache.end() && m->second == ptr)
{ // the cache could have been updated with a new pointer
// for the same phrase already, so we need to check
// if the pointer we cound is the one we want to get rid of,
// hence the second check
boost::upgrade_to_unique_lock<boost::shared_mutex> xlock(lock);
m_cache.erase(m);
}
delete ptr;
ptr = NULL;
} // TPCollCache::release(...)
TPCollWrapper::
TPCollWrapper(size_t r, uint64_t k)
: revision(r), key(k), refCount(0), idx(-1)
TPCollWrapper(size_t revision_, uint64_t key_)
: refCount(0), prev(NULL), next(NULL)
, revision(revision_), key(key_)
{ }
TPCollWrapper::
~TPCollWrapper()
{
UTIL_THROW_IF2(this->refCount, "TPCollWrapper refCount > 0!");
assert(this->refCount == 0);
}

View File

@ -2,26 +2,31 @@
#pragma once
#include <time.h>
#include "moses/TargetPhraseCollection.h"
#include <boost/atomic.hpp>
namespace Moses
{
class TPCollCache;
class TPCollWrapper
// wrapper around TargetPhraseCollection that includes reference counts
// and a time stamp for least-recently-used caching of TargetPhraseCollection-s
// wrapper around TargetPhraseCollection with reference counting
// and additional members for caching purposes
: public TargetPhraseCollection
{
friend class TPCollCache;
boost::atomic<uint32_t> refCount; // reference count
TPCollWrapper* prev; // ... in queue of TPCollWrappers used recently
TPCollWrapper* next; // ... in queue of TPCollWrappers used recently
public:
size_t const revision;
// revison; gets changed when the underlying corpus in Mmsapt is updated
mutable boost::shared_mutex lock;
size_t const revision; // rev. No. of the underlying corpus
uint64_t const key; // phrase key
uint32_t refCount; // reference count
#if defined(timespec) // timespec is better, but not available everywhere
timespec tstamp; // last use
#else
timeval tstamp; // last use
#endif
int idx; // position in the history heap
TPCollWrapper(size_t r, uint64_t const k);
~TPCollWrapper();
};
@ -31,31 +36,24 @@ namespace Moses
typedef boost::unordered_map<uint64_t, TPCollWrapper*> cache_t;
typedef std::vector<TPCollWrapper*> history_t;
cache_t m_cache; // maps from phrase ids to target phrase collections
mutable history_t m_history; // heap of live items, least recently used one on top
// mutable history_t m_history; // heap of live items, least recently used one on top
mutable boost::shared_mutex m_cache_lock; // locks m_cache
mutable boost::shared_mutex m_history_lock; // locks m_history
#if 0
// mutable size_t m_tpc_ctr;
// counter of all live item, for debugging. probably obsolete; was used
// to track memory leaks
#endif
TPCollWrapper* encache(TPCollWrapper* const& ptr);
// updates time stamp and position in least-recently-used heap m_history
mutable boost::shared_mutex m_lock; // locks m_cache
TPCollWrapper* m_doomed_first;
TPCollWrapper* m_doomed_last;
uint32_t m_doomed_count; // counter of doomed TPCs
uint32_t m_capacity; // capacity of cache
void add_to_queue(TPCollWrapper* x);
void remove_from_queue(TPCollWrapper* x);
public:
TPCollCache(size_t capacity=1000);
TPCollCache(size_t capacity=10000);
TPCollWrapper*
get(uint64_t key, size_t revision);
void
add(uint64_t key, TPCollWrapper* ptr);
void
release(TPCollWrapper*& tpc);
release(TPCollWrapper* tpc);
};

View File

@ -674,7 +674,13 @@ namespace Moses
// newer than the timestamp of the phrase itself we must update
// the entry.
if (ret) return ret; // yes, was cached => DONE
boost::upgrade_lock<boost::shared_mutex> rlock(ret->lock);
if (ret->GetSize()) return ret;
// new TPC (not found or old one was not up to date)
boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock);
if (ret->GetSize()) return ret;
// check again, another thread may have done the work already
// OK: pt entry NOT found or NOT up to date
// lookup and expansion could be done in parallel threads,
@ -712,7 +718,6 @@ namespace Moses
sort(ppdyn.begin(), ppdyn.end(),sort_by_tgt_id);
}
// now we have two lists of Phrase Pairs, let's merge them
ret = new TPCollWrapper(dyn->revision(), phrasekey);
PhrasePair<Token>::SortByTargetIdSeq sorter;
size_t i = 0; size_t k = 0;
while (i < ppfix.size() && k < ppdyn.size())
@ -724,6 +729,8 @@ namespace Moses
}
while (i < ppfix.size()) ret->Add(mkTPhrase(ttask,src,&ppfix[i++],NULL,dyn));
while (k < ppdyn.size()) ret->Add(mkTPhrase(ttask,src,NULL,&ppdyn[k++],dyn));
// Pruning should not be done here but outside!
if (m_tableLimit) ret->Prune(true, m_tableLimit);
else ret->Prune(true,ret->GetSize());
@ -739,7 +746,6 @@ namespace Moses
}
}
#endif
cache->add(phrasekey, ret);
return ret;
}
@ -918,8 +924,7 @@ namespace Moses
::Release(ttasksptr const& ttask, TargetPhraseCollection*& tpc) const
{
SPTR<TPCollCache> cache = ttask->GetScope()->get<TPCollCache>(cache_key);
TPCollWrapper* foo = static_cast<TPCollWrapper*>(tpc);
if (cache) cache->release(foo);
if (cache) cache->release(static_cast<TPCollWrapper*>(tpc));
tpc = NULL;
}