mosesdecoder/moses/TranslationModel/CompactPT/BlockHashIndex.cpp

425 lines
12 KiB
C++
Raw Normal View History

// $Id$
// vim:tabstop=2
/***********************************************************************
Moses - factored phrase-based language decoder
Copyright (C) 2006 University of Edinburgh
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
***********************************************************************/
2012-08-06 22:07:43 +04:00
#include "ThrowingFwrite.h"
#include "BlockHashIndex.h"
2012-08-04 17:39:30 +04:00
#include "CmphStringVectorAdapter.h"
#ifdef HAVE_CMPH
#include "cmph.h"
2012-08-04 17:39:30 +04:00
#endif
namespace Moses
{
#ifdef WITH_THREADS
BlockHashIndex::BlockHashIndex(size_t orderBits, size_t fingerPrintBits,
size_t threadsNum)
: m_orderBits(orderBits), m_fingerPrintBits(fingerPrintBits),
2012-08-04 17:39:30 +04:00
m_fileHandle(0), m_fileHandleStart(0), m_size(0),
m_lastSaved(-1), m_lastDropped(-1), m_numLoadedRanges(0),
2012-08-04 17:39:30 +04:00
m_threadPool(threadsNum) {
#ifndef HAVE_CMPH
std::cerr << "minphr: CMPH support not compiled in." << std::endl;
exit(1);
#endif
}
#else
BlockHashIndex::BlockHashIndex(size_t orderBits, size_t fingerPrintBits)
: m_orderBits(orderBits), m_fingerPrintBits(fingerPrintBits),
2012-08-04 17:39:30 +04:00
m_fileHandle(0), m_fileHandleStart(0), m_size(0),
m_lastSaved(-1), m_lastDropped(-1), m_numLoadedRanges(0) {
#ifndef HAVE_CMPH
std::cerr << "minphr: CMPH support not compiled in." << std::endl;
exit(1);
#endif
}
#endif
BlockHashIndex::~BlockHashIndex()
{
2012-08-04 17:39:30 +04:00
#ifdef HAVE_CMPH
for(std::vector<void*>::iterator it = m_hashes.begin();
it != m_hashes.end(); it++)
if(*it != 0)
2012-08-04 17:39:30 +04:00
cmph_destroy((cmph_t*)*it);
for(std::vector<PairedPackedArray<>*>::iterator it = m_arrays.begin();
it != m_arrays.end(); it++)
if(*it != 0)
delete *it;
2012-08-04 17:39:30 +04:00
#endif
}
size_t BlockHashIndex::GetHash(const char* key)
{
std::string keyStr(key);
size_t i = std::distance(m_landmarks.begin(),
std::upper_bound(m_landmarks.begin(),
m_landmarks.end(), keyStr)) - 1;
if(i == 0ul-1)
return GetSize();
size_t pos = GetHash(i, key);
if(pos != GetSize())
return (1ul << m_orderBits) * i + pos;
else
return GetSize();
}
size_t BlockHashIndex::GetFprint(const char* key) const
{
size_t hash;
MurmurHash3_x86_32(key, std::strlen(key), 100000, &hash);
hash &= (1ul << m_fingerPrintBits) - 1;
return hash;
}
size_t BlockHashIndex::GetHash(size_t i, const char* key)
{
2012-08-14 13:37:21 +04:00
#ifdef WITH_THREADS
boost::mutex::scoped_lock lock(m_mutex);
#endif
if(m_hashes[i] == 0)
LoadRange(i);
2012-08-04 17:39:30 +04:00
#ifdef HAVE_CMPH
size_t idx = cmph_search((cmph_t*)m_hashes[i], key, (cmph_uint32) strlen(key));
2012-08-04 17:39:30 +04:00
#else
2012-08-14 13:37:21 +04:00
assert(0);
size_t idx = 0;
2012-08-04 17:39:30 +04:00
#endif
std::pair<size_t, size_t> orderPrint = m_arrays[i]->Get(idx, m_orderBits, m_fingerPrintBits);
m_clocks[i] = clock();
if(GetFprint(key) == orderPrint.second)
return orderPrint.first;
else
return GetSize();
}
size_t BlockHashIndex::GetHash(std::string key)
{
return GetHash(key.c_str());
}
size_t BlockHashIndex::operator[](std::string key)
{
return GetHash(key);
}
size_t BlockHashIndex::operator[](char* key)
{
return GetHash(key);
}
size_t BlockHashIndex::Save(std::string filename)
{
std::FILE* mphf = std::fopen(filename.c_str(), "w");
size_t size = Save(mphf);
std::fclose(mphf);
return size;
}
void BlockHashIndex::BeginSave(std::FILE * mphf)
{
m_fileHandle = mphf;
ThrowingFwrite(&m_orderBits, sizeof(size_t), 1, m_fileHandle);
ThrowingFwrite(&m_fingerPrintBits, sizeof(size_t), 1, m_fileHandle);
m_fileHandleStart = std::ftell(m_fileHandle);
size_t relIndexPos = 0;
ThrowingFwrite(&relIndexPos, sizeof(size_t), 1, m_fileHandle);
}
void BlockHashIndex::SaveRange(size_t i)
{
2012-08-04 17:39:30 +04:00
#ifdef HAVE_CMPH
if(m_seekIndex.size() <= i)
m_seekIndex.resize(i+1);
m_seekIndex[i] = std::ftell(m_fileHandle) - m_fileHandleStart;
2012-08-04 17:39:30 +04:00
cmph_dump((cmph_t*)m_hashes[i], m_fileHandle);
m_arrays[i]->Save(m_fileHandle);
#endif
}
void BlockHashIndex::SaveLastRange()
{
#ifdef WITH_THREADS
boost::mutex::scoped_lock lock(m_mutex);
#endif
while(!m_queue.empty() && m_lastSaved + 1 == -m_queue.top())
{
size_t current = -m_queue.top();
m_queue.pop();
SaveRange(current);
m_lastSaved = current;
}
}
void BlockHashIndex::DropRange(size_t i)
{
2012-08-04 17:39:30 +04:00
#ifdef HAVE_CMPH
if(m_hashes[i] != 0)
{
2012-08-04 17:39:30 +04:00
cmph_destroy((cmph_t*)m_hashes[i]);
m_hashes[i] = 0;
}
if(m_arrays[i] != 0)
{
delete m_arrays[i];
m_arrays[i] = 0;
m_clocks[i] = 0;
}
m_numLoadedRanges--;
2012-08-04 17:39:30 +04:00
#endif
}
void BlockHashIndex::DropLastRange()
{
#ifdef WITH_THREADS
boost::mutex::scoped_lock lock(m_mutex);
#endif
while(m_lastDropped != m_lastSaved)
DropRange(++m_lastDropped);
}
#ifdef WITH_THREADS
void BlockHashIndex::WaitAll()
{
m_threadPool.Stop(true);
}
#endif
size_t BlockHashIndex::FinalizeSave()
{
#ifdef WITH_THREADS
m_threadPool.Stop(true);
#endif
SaveLastRange();
size_t relIndexPos = std::ftell(m_fileHandle) - m_fileHandleStart;
std::fseek(m_fileHandle, m_fileHandleStart, SEEK_SET);
ThrowingFwrite(&relIndexPos, sizeof(size_t), 1, m_fileHandle);
std::fseek(m_fileHandle, m_fileHandleStart + relIndexPos, SEEK_SET);
m_landmarks.save(m_fileHandle);
size_t seekIndexSize = m_seekIndex.size();
ThrowingFwrite(&seekIndexSize, sizeof(size_t), 1, m_fileHandle);
ThrowingFwrite(&m_seekIndex[0], sizeof(size_t), seekIndexSize, m_fileHandle);
ThrowingFwrite(&m_size, sizeof(size_t), 1, m_fileHandle);
size_t fileHandleStop = std::ftell(m_fileHandle);
return fileHandleStop - m_fileHandleStart + sizeof(m_orderBits)
+ sizeof(m_fingerPrintBits);
}
size_t BlockHashIndex::Save(std::FILE * mphf)
{
m_queue = std::priority_queue<int>();
BeginSave(mphf);
for(size_t i = 0; i < m_hashes.size(); i++)
SaveRange(i);
return FinalizeSave();
}
size_t BlockHashIndex::LoadIndex(std::FILE* mphf)
{
m_fileHandle = mphf;
size_t beginning = std::ftell(mphf);
size_t read = 0;
read += std::fread(&m_orderBits, sizeof(size_t), 1, mphf);
read += std::fread(&m_fingerPrintBits, sizeof(size_t), 1, mphf);
m_fileHandleStart = std::ftell(m_fileHandle);
size_t relIndexPos;
read += std::fread(&relIndexPos, sizeof(size_t), 1, mphf);
std::fseek(m_fileHandle, m_fileHandleStart + relIndexPos, SEEK_SET);
m_landmarks.load(mphf);
size_t seekIndexSize;
read += std::fread(&seekIndexSize, sizeof(size_t), 1, m_fileHandle);
m_seekIndex.resize(seekIndexSize);
read += std::fread(&m_seekIndex[0], sizeof(size_t), seekIndexSize, m_fileHandle);
m_hashes.resize(seekIndexSize, 0);
m_clocks.resize(seekIndexSize, 0);
m_arrays.resize(seekIndexSize, 0);
read += std::fread(&m_size, sizeof(size_t), 1, m_fileHandle);
size_t end = std::ftell(mphf);
return end - beginning;
}
void BlockHashIndex::LoadRange(size_t i)
{
2012-08-04 17:39:30 +04:00
#ifdef HAVE_CMPH
std::fseek(m_fileHandle, m_fileHandleStart + m_seekIndex[i], SEEK_SET);
cmph_t* hash = cmph_load(m_fileHandle);
m_arrays[i] = new PairedPackedArray<>(0, m_orderBits,
m_fingerPrintBits);
m_arrays[i]->Load(m_fileHandle);
2012-08-04 17:39:30 +04:00
m_hashes[i] = (void*)hash;
m_clocks[i] = clock();
m_numLoadedRanges++;
2012-08-04 17:39:30 +04:00
#endif
}
size_t BlockHashIndex::Load(std::string filename)
{
std::FILE* mphf = std::fopen(filename.c_str(), "r");
size_t size = Load(mphf);
std::fclose(mphf);
return size;
}
size_t BlockHashIndex::Load(std::FILE * mphf)
{
size_t byteSize = LoadIndex(mphf);
size_t end = std::ftell(mphf);
for(size_t i = 0; i < m_seekIndex.size(); i++)
LoadRange(i);
std::fseek(m_fileHandle, end, SEEK_SET);
return byteSize;
}
size_t BlockHashIndex::GetSize() const
{
return m_size;
}
void BlockHashIndex::KeepNLastRanges(float ratio, float tolerance)
{
#ifdef WITH_THREADS
boost::mutex::scoped_lock lock(m_mutex);
#endif
size_t n = m_hashes.size() * ratio;
2012-08-14 13:37:21 +04:00
size_t max = n * (1 + tolerance);
if(m_numLoadedRanges > max)
{
typedef std::vector<std::pair<clock_t, size_t> > LastLoaded;
LastLoaded lastLoaded;
for(size_t i = 0; i < m_hashes.size(); i++)
if(m_hashes[i] != 0)
lastLoaded.push_back(std::make_pair(m_clocks[i], i));
std::sort(lastLoaded.begin(), lastLoaded.end());
for(LastLoaded::reverse_iterator it = lastLoaded.rbegin() + size_t(n * (1 - tolerance));
it != lastLoaded.rend(); it++)
DropRange(it->second);
}
}
2012-08-04 17:39:30 +04:00
void BlockHashIndex::CalcHash(size_t current, void* source_void)
{
#ifdef HAVE_CMPH
cmph_io_adapter_t* source = (cmph_io_adapter_t*) source_void;
cmph_config_t *config = cmph_config_new(source);
cmph_config_set_algo(config, CMPH_CHD);
cmph_t* hash = cmph_new(config);
PairedPackedArray<> *pv =
new PairedPackedArray<>(source->nkeys, m_orderBits, m_fingerPrintBits);
size_t i = 0;
source->rewind(source->data);
2012-08-14 16:27:41 +04:00
std::string lastKey = "";
2012-08-04 17:39:30 +04:00
while(i < source->nkeys)
{
unsigned keylen;
char* key;
source->read(source->data, &key, &keylen);
std::string temp(key, keylen);
2012-08-14 16:27:41 +04:00
source->dispose(source->data, key, keylen);
if(lastKey > temp) {
if(source->nkeys != 2 || temp != "###DUMMY_KEY###") {
std::cerr << "ERROR: Input file does not appear to be sorted with LC_ALL=C sort" << std::endl;
std::cerr << "1: " << lastKey << std::endl;
std::cerr << "2: " << temp << std::endl;
abort();
}
}
lastKey = temp;
size_t fprint = GetFprint(temp.c_str());
size_t idx = cmph_search(hash, temp.c_str(),
2012-08-04 17:39:30 +04:00
(cmph_uint32) temp.size());
2012-08-14 16:27:41 +04:00
2012-08-04 17:39:30 +04:00
pv->Set(idx, i, fprint, m_orderBits, m_fingerPrintBits);
i++;
}
cmph_config_destroy(config);
#ifdef WITH_THREADS
boost::mutex::scoped_lock lock(m_mutex);
#endif
if(m_hashes.size() <= current)
{
m_hashes.resize(current + 1, 0);
m_arrays.resize(current + 1, 0);
m_clocks.resize(current + 1, 0);
}
m_hashes[current] = (void*)hash;
m_arrays[current] = pv;
m_clocks[current] = clock();
m_queue.push(-current);
#endif
}
#ifdef HAVE_CMPH
void* BlockHashIndex::vectorAdapter(std::vector<std::string>& v)
{
return (void*)CmphVectorAdapter(v);
}
void* BlockHashIndex::vectorAdapter(StringVector<unsigned, size_t, std::allocator>& sv)
2012-08-04 17:39:30 +04:00
{
return (void*)CmphStringVectorAdapter(sv);
}
void* BlockHashIndex::vectorAdapter(StringVector<unsigned, size_t, MmapAllocator>& sv)
2012-08-04 17:39:30 +04:00
{
return (void*)CmphStringVectorAdapter(sv);
}
#endif
}