Completely rewritten. Now multi-threaded.

This commit is contained in:
Ulrich Germann 2014-03-11 13:57:42 +00:00
parent 76eb3d56b9
commit c02fbf7664

View File

@ -1,29 +1,32 @@
// -*- c++ -*-
// Program to extract word cooccurrence counts from a memory-mapped word-aligned bitext
// stores the counts lexicon in the format for mm2dTable<uint32_t> (ug_mm_2d_table.h)
// Program to extract word cooccurrence counts from a memory-mapped
// word-aligned bitext stores the counts lexicon in the format for
// mm2dTable<uint32_t> (ug_mm_2d_table.h)
//
// (c) 2010-2012 Ulrich Germann
// to do: multi-threading
#include <queue>
#include <iomanip>
#include <vector>
#include <iterator>
#include <sstream>
#include <algorithm>
#include <boost/program_options.hpp>
#include <boost/dynamic_bitset.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/foreach.hpp>
#include <boost/thread.hpp>
#include <boost/math/distributions/binomial.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include "moses/TranslationModel/UG/generic/program_options/ug_get_options.h"
// #include "ug_translation_finder.h"
// #include "ug_sorters.h"
// #include "ug_corpus_sampling.h"
#include "ug_mm_2d_table.h"
#include "ug_mm_ttrack.h"
#include "ug_corpus_token.h"
#include "ug_corpus_token.h"
using namespace std;
using namespace ugdiss;
@ -32,164 +35,297 @@ using namespace boost::math;
typedef mm2dTable<id_type,id_type,uint32_t,uint32_t> LEX_t;
typedef SimpleWordId Token;
id_type first_rare_id=500;
vector<vector<uint32_t> > JFREQ; // joint count table for frequent L1 words
vector<map<id_type,uint32_t> > JRARE; // joint count table for rare L1 words
vector<vector<uint32_t> > CFREQ; // cooc count table for frequent L1 words
vector<map<id_type,uint32_t> > CRARE; // cooc count table for rare L1 words
// DECLARATIONS
void interpret_args(int ac, char* av[]);
mmTtrack<Token> T1,T2;
mmTtrack<char> Tx;
TokenIndex V1,V2;
string bname,cfgFile,L1,L2,oname,cooc;
typedef pair<id_type,id_type> wpair;
struct Count
{
uint32_t a;
uint32_t c;
Count() : a(0), c(0) {};
Count(uint32_t ax, uint32_t cx) : a(ax), c(cx) {}
};
// DECLARATIONS
void interpret_args(int ac, char* av[]);
bool
operator<(pair<id_type,Count> const& a,
pair<id_type,Count> const& b)
{
return a.first < b.first;
}
typedef boost::unordered_map<wpair,Count> countmap_t;
typedef vector<vector<pair<id_type,Count> > > countlist_t;
vector<countlist_t> XLEX;
class Counter
{
public:
countmap_t CNT;
countlist_t & LEX;
size_t offset;
size_t skip;
Counter(countlist_t& lex, size_t o, size_t s)
: LEX(lex), offset(o), skip(s) {}
void processSentence(id_type sid);
void operator()();
};
string bname,cfgFile,L1,L2,oname,cooc;
int verbose;
size_t truncat;
size_t num_threads;
void
Counter::
operator()()
{
for (size_t sid = offset; sid < min(truncat,T1.size()); sid += skip)
processSentence(sid);
LEX.resize(V1.ksize());
for (countmap_t::const_iterator c = CNT.begin(); c != CNT.end(); ++c)
{
pair<id_type,Count> foo(c->first.second,c->second);
LEX.at(c->first.first).push_back(foo);
}
typedef vector<pair<id_type,Count> > v_t;
BOOST_FOREACH(v_t& v, LEX)
sort(v.begin(),v.end());
}
struct lexsorter
{
vector<countlist_t> const& v;
id_type wid;
lexsorter(vector<countlist_t> const& vx, id_type widx)
: v(vx),wid(widx) {}
bool operator()(pair<uint32_t,uint32_t> const& a,
pair<uint32_t,uint32_t> const& b) const
{
return (v.at(a.first).at(wid).at(a.second).first >
v.at(b.first).at(wid).at(b.second).first);
}
};
void
writeTableHeader(ostream& out)
{
filepos_type idxOffset=0;
numwrite(out,idxOffset); // blank for the time being
numwrite(out,id_type(V1.ksize()));
numwrite(out,id_type(V2.ksize()));
}
void writeTable(ostream* aln_out, ostream* coc_out)
{
vector<uint32_t> m1a(V1.ksize(),0); // marginals L1
vector<uint32_t> m2a(V2.ksize(),0); // marginals L2
vector<uint32_t> m1c(V1.ksize(),0); // marginals L1
vector<uint32_t> m2c(V2.ksize(),0); // marginals L2
vector<id_type> idxa(V1.ksize()+1,0);
vector<id_type> idxc(V1.ksize()+1,0);
if (aln_out) writeTableHeader(*aln_out);
if (coc_out) writeTableHeader(*coc_out);
size_t CellCountA=0,CellCountC=0;
for (size_t id1 = 0; id1 < V1.ksize(); ++id1)
{
idxa[id1] = CellCountA;
idxc[id1] = CellCountC;
lexsorter sorter(XLEX,id1);
vector<pair<uint32_t,uint32_t> > H; H.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
{
if (id1 < XLEX.at(i).size() && XLEX[i][id1].size())
H.push_back(pair<uint32_t,uint32_t>(i,0));
}
if (!H.size()) continue;
make_heap(H.begin(),H.end(),sorter);
while (H.size())
{
id_type id2 = XLEX[H[0].first][id1][H[0].second].first;
uint32_t aln = XLEX[H[0].first][id1][H[0].second].second.a;
uint32_t coc = XLEX[H[0].first][id1][H[0].second].second.c;
pop_heap(H.begin(),H.end(),sorter);
++H.back().second;
if (H.back().second == XLEX[H.back().first][id1].size())
H.pop_back();
else
push_heap(H.begin(),H.end(),sorter);
while (H.size() &&
XLEX[H[0].first][id1].at(H[0].second).first == id2)
{
aln += XLEX[H[0].first][id1][H[0].second].second.a;
coc += XLEX[H[0].first][id1][H[0].second].second.c;
pop_heap(H.begin(),H.end(),sorter);
++H.back().second;
if (H.back().second == XLEX[H.back().first][id1].size())
H.pop_back();
else
push_heap(H.begin(),H.end(),sorter);
}
if (aln_out)
{
++CellCountA;
numwrite(*aln_out,id2);
numwrite(*aln_out,aln);
m1a[id1] += aln;
m2a[id2] += aln;
}
if (coc_out && coc)
{
++CellCountC;
numwrite(*coc_out,id2);
numwrite(*coc_out,coc);
m1c[id1] += coc;
m2c[id2] += coc;
}
}
}
idxa.back() = CellCountA;
idxc.back() = CellCountC;
if (aln_out)
{
filepos_type idxOffsetA = aln_out->tellp();
BOOST_FOREACH(id_type foo, idxa)
numwrite(*aln_out,foo);
aln_out->write(reinterpret_cast<char const*>(&m1a[0]),m1a.size()*4);
aln_out->write(reinterpret_cast<char const*>(&m2a[0]),m2a.size()*4);
aln_out->seekp(0);
numwrite(*aln_out,idxOffsetA);
}
if (coc_out)
{
filepos_type idxOffsetC = coc_out->tellp();
BOOST_FOREACH(id_type foo, idxc)
numwrite(*coc_out,foo);
coc_out->write(reinterpret_cast<char const*>(&m1c[0]),m1c.size()*4);
coc_out->write(reinterpret_cast<char const*>(&m2c[0]),m2c.size()*4);
coc_out->seekp(0);
numwrite(*coc_out,idxOffsetC);
}
}
void
Counter::
processSentence(id_type sid)
{
Token const* s1 = T1.sntStart(sid);
Token const* e1 = T1.sntEnd(sid);
Token const* s2 = T2.sntStart(sid);
Token const* e2 = T2.sntEnd(sid);
char const* p = Tx.sntStart(sid);
char const* q = Tx.sntEnd(sid);
ushort r,c;
bitvector check1(T1.sntLen(sid)), check2(T2.sntLen(sid));
check1.set();
check2.set();
vector<ushort> cnt1(V1.ksize(),0);
vector<ushort> cnt2(V2.ksize(),0);
boost::unordered_set<pair<id_type,id_type> > mycoocs;
for (Token const* x = s1; x < e1; ++x)
++cnt1.at(x->id());
for (Token const* x = s2; x < e2; ++x)
++cnt2.at(x->id());
for (Token const* x = s1; x < e1; ++x) ++cnt1[x->id()];
for (Token const* x = s2; x < e2; ++x) ++cnt2[x->id()];
boost::unordered_set<wpair> seen;
bitvector check1(T1.sntLen(sid)); check1.set();
bitvector check2(T2.sntLen(sid)); check2.set();
// count links
char const* p = Tx.sntStart(sid);
char const* q = Tx.sntEnd(sid);
ushort r,c;
// cout << sid << " " << q-p << endl;
while (p < q)
{
p = binread(p,r);
p = binread(p,c);
// cout << sid << " " << r << "-" << c << endl;
assert(r < check1.size());
assert(c < check2.size());
assert(s1+r < e1);
assert(s2+c < e2);
check1.reset(r);
check2.reset(c);
id_type id1 = (s1+r)->id();
id_type id2 = (s2+c)->id();
if (id1 < first_rare_id)
JFREQ[id1][id2]++;
else
JRARE[id1][id2]++;
if (cooc.size())
mycoocs.insert(pair<id_type,id_type>(id1,id2));
wpair k(id1,id2);
Count& cnt = CNT[k];
cnt.a++;
if (seen.insert(k).second)
cnt.c += cnt1[id1] * cnt2[id2];
}
// count unaliged words
for (size_t i = check1.find_first(); i < check1.size(); i = check1.find_next(i))
{
id_type id1 = (s1+i)->id();
if (id1 < first_rare_id) JFREQ[id1][0]++;
else JRARE[id1][0]++;
}
for (size_t i = check2.find_first(); i < check2.size(); i = check2.find_next(i))
JFREQ[0][(s2+i)->id()]++;
if (cooc.size())
{
typedef boost::unordered_set<pair<id_type,id_type> >::iterator iter;
for (iter m = mycoocs.begin(); m != mycoocs.end(); ++m)
if (m->first < first_rare_id)
CFREQ[m->first][m->second] += cnt1[m->first] * cnt2[m->second];
else
CRARE[m->first][m->second] += cnt1[m->first] * cnt2[m->second];
}
for (size_t i = check1.find_first();
i < check1.size();
i = check1.find_next(i))
CNT[wpair((s1+i)->id(),0)].a++;
for (size_t i = check2.find_first();
i < check2.size();
i = check2.find_next(i))
CNT[wpair(0,(s2+i)->id())].a++;
}
// void
// count_coocs(id_type sid)
// writeTable(string ofname,
// vector<vector<uint32_t> >& FREQ,
// vector<map<id_type,uint32_t> >& RARE)
// {
// Token const* s1 = T1.sntStart(sid);
// Token const* e1 = T1.sntEnd(sid);
// ofstream out(ofname.c_str());
// filepos_type idxOffset=0;
// Token const* s2 = T2.sntStart(sid);
// Token const* e2 = T2.sntEnd(sid);
// vector<uint32_t> m1; // marginals L1
// vector<uint32_t> m2; // marginals L2
// m1.resize(max(first_rare_id,V1.getNumTokens()),0);
// m2.resize(V2.getNumTokens(),0);
// vector<id_type> index(V1.getNumTokens()+1,0);
// numwrite(out,idxOffset); // blank for the time being
// numwrite(out,id_type(m1.size()));
// numwrite(out,id_type(m2.size()));
// for (Token const* x = s1; x < e1; ++x)
// id_type cellCount=0;
// id_type stop = min(first_rare_id,id_type(m1.size()));
// for (id_type id1 = 0; id1 < stop; ++id1)
// {
// if (x->id() < first_rare_id)
// {
// vector<uint32_t>& v = CFREQ[x->id()];
// for (Token const* y = s2; y < e2; ++y)
// ++v[y->id()];
// }
// else
// {
// map<id_type,uint32_t>& m = CRARE[x->id()];
// for (Token const* y = s2; y < e2; ++y)
// ++m[y->id()];
// }
// index[id1] = cellCount;
// vector<uint32_t> const& v = FREQ[id1];
// for (id_type id2 = 0; id2 < id_type(v.size()); ++id2)
// {
// if (!v[id2]) continue;
// cellCount++;
// numwrite(out,id2);
// out.write(reinterpret_cast<char const*>(&v[id2]),sizeof(uint32_t));
// m1[id1] += v[id2];
// m2[id2] += v[id2];
// }
// }
// for (id_type id1 = stop; id1 < id_type(m1.size()); ++id1)
// {
// index[id1] = cellCount;
// map<id_type,uint32_t> const& M = RARE[id1];
// for (map<id_type,uint32_t>::const_iterator m = M.begin(); m != M.end(); ++m)
// {
// if (m->second == 0) continue;
// cellCount++;
// numwrite(out,m->first);
// out.write(reinterpret_cast<char const*>(&m->second),sizeof(float));
// m1[id1] += m->second;
// m2[m->first] += m->second;
// }
// }
// index[m1.size()] = cellCount;
// idxOffset = out.tellp();
// for (size_t i = 0; i < index.size(); ++i)
// numwrite(out,index[i]);
// out.write(reinterpret_cast<char const*>(&m1[0]),m1.size()*sizeof(float));
// out.write(reinterpret_cast<char const*>(&m2[0]),m2.size()*sizeof(float));
// // re-write the file header
// out.seekp(0);
// numwrite(out,idxOffset);
// out.close();
// }
void
writeTable(string ofname, vector<vector<uint32_t> >& FREQ,
vector<map<id_type,uint32_t> >& RARE)
{
ofstream out(ofname.c_str());
filepos_type idxOffset=0;
vector<uint32_t> m1; // marginals L1
vector<uint32_t> m2; // marginals L2
m1.resize(max(first_rare_id,V1.getNumTokens()),0);
m2.resize(V2.getNumTokens(),0);
vector<id_type> index(V1.getNumTokens()+1,0);
numwrite(out,idxOffset); // blank for the time being
numwrite(out,id_type(m1.size()));
numwrite(out,id_type(m2.size()));
id_type cellCount=0;
id_type stop = min(first_rare_id,id_type(m1.size()));
for (id_type id1 = 0; id1 < stop; ++id1)
{
index[id1] = cellCount;
vector<uint32_t> const& v = FREQ[id1];
for (id_type id2 = 0; id2 < id_type(v.size()); ++id2)
{
if (!v[id2]) continue;
cellCount++;
numwrite(out,id2);
out.write(reinterpret_cast<char const*>(&v[id2]),sizeof(uint32_t));
m1[id1] += v[id2];
m2[id2] += v[id2];
}
}
for (id_type id1 = stop; id1 < id_type(m1.size()); ++id1)
{
index[id1] = cellCount;
map<id_type,uint32_t> const& M = RARE[id1];
for (map<id_type,uint32_t>::const_iterator m = M.begin(); m != M.end(); ++m)
{
if (m->second == 0) continue;
cellCount++;
numwrite(out,m->first);
out.write(reinterpret_cast<char const*>(&m->second),sizeof(float));
m1[id1] += m->second;
m2[m->first] += m->second;
}
}
index[m1.size()] = cellCount;
idxOffset = out.tellp();
for (size_t i = 0; i < index.size(); ++i)
numwrite(out,index[i]);
out.write(reinterpret_cast<char const*>(&m1[0]),m1.size()*sizeof(float));
out.write(reinterpret_cast<char const*>(&m2[0]),m2.size()*sizeof(float));
// re-write the file header
out.seekp(0);
numwrite(out,idxOffset);
out.close();
}
int
main(int argc, char* argv[])
{
@ -201,22 +337,21 @@ main(int argc, char* argv[])
Tx.open(bname+L1+"-"+L2+".mam");
V1.open(bname+L1+".tdx");
V2.open(bname+L2+".tdx");
JFREQ.resize(first_rare_id,vector<uint32_t>(V2.ksize(),0));
JRARE.resize(V1.ksize());
CFREQ.resize(first_rare_id,vector<uint32_t>(V2.ksize(),0));
CRARE.resize(V1.ksize());
for (size_t sid = 0; sid < T1.size(); ++sid)
{
if (sid%10000 == 0) cerr << sid << endl;
processSentence(sid);
}
if (oname.size()) writeTable(oname,JFREQ,JRARE);
if (cooc.size()) writeTable(cooc,CFREQ,CRARE);
exit(0);
if (!truncat) truncat = T1.size();
XLEX.resize(num_threads);
vector<boost::shared_ptr<boost::thread> > workers(num_threads);
for (size_t i = 0; i < num_threads; ++i)
workers[i].reset(new boost::thread(Counter(XLEX[i],i,num_threads)));
for (size_t i = 0; i < workers.size(); ++i)
workers[i]->join();
// cerr << "done counting" << endl;
ofstream aln_out,coc_out;
if (oname.size()) aln_out.open(oname.c_str());
if (cooc.size()) coc_out.open(cooc.c_str());
writeTable(oname.size() ? &aln_out : NULL,
cooc.size() ? &coc_out : NULL);
if (oname.size()) aln_out.close();
if (cooc.size()) coc_out.close();
}
void
@ -234,6 +369,12 @@ interpret_args(int ac, char* av[])
("oname,o", po::value<string>(&oname),"output file name")
("cooc,c", po::value<string>(&cooc),
"file name for raw co-occurrence counts")
("verbose,v", po::value<int>(&verbose)->default_value(0)->implicit_value(1),
"verbosity level")
("threads,t", po::value<size_t>(&num_threads)->default_value(4),
"count in <N> parallel threads")
("truncate,n", po::value<size_t>(&truncat)->default_value(0),
"truncate corpus to <N> sentences (for debugging)")
;
h.add_options()
@ -253,6 +394,7 @@ interpret_args(int ac, char* av[])
cout << o << endl;
exit(0);
}
num_threads = min(num_threads,24UL);
}