replace parameter --mix-by-averaging with --sparse-average, fix bug in sharding, fix bug in --dump-mixed-weights

This commit is contained in:
Eva 2012-03-13 16:34:48 +00:00
parent 104bb20882
commit e1601ef154
5 changed files with 137 additions and 68 deletions

View File

@ -123,7 +123,7 @@ int main(int argc, char** argv) {
string decode_filename;
size_t update_scheme;
bool separateUpdates, batchEqualsShard;
bool mixByAveraging;
bool sparseAverage;
bool dumpMixedWeights;
po::options_description desc("Allowed options");
desc.add_options()
@ -175,8 +175,7 @@ int main(int argc, char** argv) {
("min-oracle-bleu", po::value<float>(&min_oracle_bleu)->default_value(0), "Set a minimum oracle BLEU score")
("min-weight-change", po::value<float>(&min_weight_change)->default_value(0.01), "Set minimum weight change for stopping criterion")
("mira-learning-rate", po::value<float>(&mira_learning_rate)->default_value(1), "Learning rate for MIRA (fixed or flexible)")
("mixing-frequency", po::value<size_t>(&mixingFrequency)->default_value(5), "How often per epoch to mix weights, when using mpi")
("mix-by-averaging", po::value<bool>(&mixByAveraging)->default_value(true), "Average weights by the number of processes")
("mixing-frequency", po::value<size_t>(&mixingFrequency)->default_value(1), "How often per epoch to mix weights, when using mpi")
("model-hope-fear", po::value<bool>(&model_hope_fear)->default_value(false), "Use model, hope and fear translations for optimisation")
("nbest,n", po::value<size_t>(&n)->default_value(1), "Number of translations in n-best list")
("normalise", po::value<bool>(&normaliseWeights)->default_value(false), "Whether to normalise the updated weights before passing them to the decoder")
@ -200,6 +199,7 @@ int main(int argc, char** argv) {
("shuffle", po::value<bool>(&shuffle)->default_value(false), "Shuffle input sentences before processing")
("slack-min", po::value<float>(&slack_min)->default_value(0.01), "Minimum slack used")
("slack-step", po::value<float>(&slack_step)->default_value(0), "Increase slack from epoch to epoch by the value provided")
("sparse-average", po::value<bool>(&sparseAverage)->default_value(false), "Average weights by the number of processes")
("stop-weights", po::value<bool>(&weightConvergence)->default_value(true), "Stop when weights converge")
("threads", po::value<int>(&threadcount)->default_value(1), "Number of threads used")
("update-scheme", po::value<size_t>(&update_scheme)->default_value(1), "Update scheme, default: 1")
@ -245,6 +245,11 @@ int main(int argc, char** argv) {
exit(1);
}
if (sparseAverage && averageWeights) {
cerr << "Parameters --sparse-average 1 and --average-weights 1 are incompatible (not implemented)" << endl;
exit(1);
}
if (mosesConfigFile.empty()) {
cerr << "Error: No moses ini file specified" << endl;
return 1;
@ -410,11 +415,13 @@ int main(int argc, char** argv) {
// Create shards according to the number of processes used
vector<size_t> shard;
float shardSize = (float) (order.size()) / size;
VERBOSE(1, "Shard size: " << shardSize << endl);
size_t shardStart = (size_t) (shardSize * rank);
size_t shardEnd = (size_t) (shardSize * (rank + 1));
if (rank == size - 1)
if (rank == size - 1) {
shardEnd = order.size();
shardSize = shardEnd - shardStart;
}
VERBOSE(1, "Shard size: " << shardSize << endl);
VERBOSE(1, "Rank: " << rank << " Shard start: " << shardStart << " Shard end: " << shardEnd << endl);
shard.resize(shardSize);
copy(order.begin() + shardStart, order.begin() + shardEnd, shard.begin());
@ -447,15 +454,11 @@ int main(int argc, char** argv) {
}
decoder->setWeights(initialWeights);
if (!mixByAveraging && coreWeightMap.size() == 0) {
cerr << "Trying to mix without averaging, but cannot because of core features.." << endl;
exit(1);
}
//Main loop:
// print initial weights
cerr << "Rank " << rank << ", initial weights: " << initialWeights << endl;
ScoreComponentCollection cumulativeWeights; // collect weights per epoch to produce an average
ScoreComponentCollection cumulativeWeightsBinary;
size_t numberOfUpdates = 0;
size_t numberOfUpdatesThisEpoch = 0;
@ -467,23 +470,25 @@ int main(int argc, char** argv) {
float sumOfInputs = 0;
size_t numberOfInputs = 0;
ScoreComponentCollection mixedWeights;
ScoreComponentCollection mixedWeightsPrevious;
ScoreComponentCollection mixedWeightsBeforePrevious;
ScoreComponentCollection mixedAverageWeights;
ScoreComponentCollection mixedAverageWeightsPrevious;
ScoreComponentCollection mixedAverageWeightsBeforePrevious;
bool stop = false;
// int sumStillViolatedConstraints;
float *sendbuf, *recvbuf;
sendbuf = (float *) malloc(sizeof(float));
recvbuf = (float *) malloc(sizeof(float));
for (size_t epoch = 0; epoch < epochs && !stop; ++epoch) {
// sum of violated constraints in an epoch
// sumStillViolatedConstraints = 0;
numberOfUpdatesThisEpoch = 0;
// Sum up weights over one epoch, final average uses weights from last epoch
if (!accumulateWeights)
if (!accumulateWeights) {
cumulativeWeights.ZeroAll();
cumulativeWeightsBinary.ZeroAll();
}
// number of weight dumps this epoch
size_t weightEpochDump = 0;
@ -885,6 +890,12 @@ int main(int argc, char** argv) {
mosesWeights.L1Normalise();
cumulativeWeights.PlusEquals(mosesWeights);
if (sparseAverage) {
ScoreComponentCollection binary;
binary.SetToBinaryOf(mosesWeights);
cumulativeWeightsBinary.PlusEquals(binary);
}
++numberOfUpdates;
++numberOfUpdatesThisEpoch;
if (averageWeights) {
@ -921,23 +932,26 @@ int main(int argc, char** argv) {
size_t mixing_base = mixingFrequency == 0 ? 0 : shard.size() / mixingFrequency;
size_t dumping_base = weightDumpFrequency ==0 ? 0 : shard.size() / weightDumpFrequency;
bool mix = evaluateModulo(shardPosition, mixing_base, actualBatchSize);
// mix weights?
ScoreComponentCollection mixedWeights;
if (evaluateModulo(shardPosition, mixing_base, actualBatchSize)) {
if (mix) {
#ifdef MPI_ENABLE
// collect all weights in mixedWeights and divide by number of processes
mpi::reduce(world, mosesWeights, mixedWeights, SCCPlus(), 0);
ScoreComponentCollection totalBinary;
if (sparseAverage) {
ScoreComponentCollection binary;
binary.SetToBinaryOf(mosesWeights);
mpi::reduce(world, binary, totalBinary, SCCPlus(), 0);
}
if (rank == 0) {
// divide by number of processes
if (mixByAveraging)
mixedWeights.DivideEquals(size);
else {
// can only skip this if we have fixed core features, reset them here
ProducerWeightMap::iterator p;
for(p = coreWeightMap.begin(); p!=coreWeightMap.end(); ++p)
mixedWeights.Assign(p->first, p->second);
if (sparseAverage) {
mixedWeights.DivideEquals(totalBinary);
}
else
mixedWeights.DivideEquals(size);
// normalise weights after averaging
if (normaliseWeights) {
@ -958,19 +972,21 @@ int main(int argc, char** argv) {
// Dump weights?
if (dumpMixedWeights) {
// dump mixed weights instead of average weights
ostringstream filename;
if (epoch < 10)
filename << weightDumpStem << "_0" << epoch;
else
filename << weightDumpStem << "_" << epoch;
if (mix && rank == 0 && !weightDumpStem.empty()) {
// dump mixed weights instead of average weights
ostringstream filename;
if (epoch < 10)
filename << weightDumpStem << "_0" << epoch;
else
filename << weightDumpStem << "_" << epoch;
if (weightDumpFrequency > 1)
filename << "_" << weightEpochDump;
if (weightDumpFrequency > 1)
filename << "_" << weightEpochDump;
cerr << "Dumping mixed weights during epoch " << epoch << " to " << filename.str() << endl << endl;
mixedWeights.Save(filename.str());
++weightEpochDump;
cerr << "Dumping mixed weights during epoch " << epoch << " to " << filename.str() << endl << endl;
mixedWeights.Save(filename.str());
++weightEpochDump;
}
}
else {
if (evaluateModulo(shardPosition, dumping_base, actualBatchSize)) {
@ -983,30 +999,35 @@ int main(int argc, char** argv) {
}
} else {
if (numberOfUpdatesThisEpoch > 0) {
tmpAverageWeights.DivideEquals(numberOfUpdatesThisEpoch);
proceed = true;
if (sparseAverage)
tmpAverageWeights.DivideEquals(cumulativeWeightsBinary);
else
tmpAverageWeights.DivideEquals(numberOfUpdatesThisEpoch);
proceed = true;
}
}
if (proceed) {
#ifdef MPI_ENABLE
// average across processes
mpi::reduce(world, tmpAverageWeights, mixedAverageWeights, SCCPlus(), 0);
mpi::reduce(world, tmpAverageWeights, mixedAverageWeights, SCCPlus(), 0);
ScoreComponentCollection totalBinary;
if (sparseAverage) {
ScoreComponentCollection binary;
binary.SetToBinaryOf(mosesWeights);
mpi::reduce(world, binary, totalBinary, SCCPlus(), 0);
}
#endif
#ifndef MPI_ENABLE
mixedAverageWeights = tmpAverageWeights;
#endif
if (rank == 0 && !weightDumpStem.empty()) {
// divide by number of processes
if (mixByAveraging)
mixedAverageWeights.DivideEquals(size);
else {
// can only skip this if we have fixed core features, reset them here
ProducerWeightMap::iterator p;
for(p = coreWeightMap.begin(); p!=coreWeightMap.end(); ++p)
mixedAverageWeights.Assign(p->first, p->second);
}
if (sparseAverage)
mixedAverageWeights.DivideEquals(totalBinary);
else
mixedAverageWeights.DivideEquals(size);
// normalise weights after averaging
if (normaliseWeights) {
mixedAverageWeights.L1Normalise();
@ -1024,11 +1045,11 @@ int main(int argc, char** argv) {
filename << "_" << weightEpochDump;
}
if (accumulateWeights) {
// cerr << "\nMixed average weights (cumulative) during epoch " << epoch << ": " << mixedAverageWeights << endl;
/* if (accumulateWeights) {
cerr << "\nMixed average weights (cumulative) during epoch " << epoch << ": " << mixedAverageWeights << endl;
} else {
// cerr << "\nMixed average weights during epoch " << epoch << ": " << mixedAverageWeights << endl;
}
cerr << "\nMixed average weights during epoch " << epoch << ": " << mixedAverageWeights << endl;
}*/
cerr << "Dumping mixed average weights during epoch " << epoch << " to " << filename.str() << endl << endl;
mixedAverageWeights.Save(filename.str());
@ -1074,14 +1095,21 @@ int main(int argc, char** argv) {
if (weightConvergence) {
bool reached = true;
if (rank == 0 && (epoch >= 2)) {
ScoreComponentCollection firstDiff(mixedAverageWeights);
firstDiff.MinusEquals(mixedAverageWeightsPrevious);
VERBOSE(1, "Average weight changes since previous epoch: " << firstDiff <<
" (max: " << firstDiff.GetLInfNorm() << ")" << endl);
ScoreComponentCollection secondDiff(mixedAverageWeights);
secondDiff.MinusEquals(mixedAverageWeightsBeforePrevious);
VERBOSE(1, "Average weight changes since before previous epoch: " << secondDiff <<
" (max: " << secondDiff.GetLInfNorm() << ")" << endl << endl);
ScoreComponentCollection firstDiff, secondDiff;
if (dumpMixedWeights) {
firstDiff = mixedWeights;
firstDiff.MinusEquals(mixedWeightsPrevious);
secondDiff = mixedWeights;
secondDiff.MinusEquals(mixedWeightsBeforePrevious);
}
else {
firstDiff = mixedAverageWeights;
firstDiff.MinusEquals(mixedAverageWeightsPrevious);
secondDiff = mixedAverageWeights;
secondDiff.MinusEquals(mixedAverageWeightsBeforePrevious);
}
VERBOSE(1, "Average weight changes since previous epoch: " << firstDiff << " (max: " << firstDiff.GetLInfNorm() << ")" << endl);
VERBOSE(1, "Average weight changes since before previous epoch: " << secondDiff << " (max: " << secondDiff.GetLInfNorm() << ")" << endl << endl);
// check whether stopping criterion has been reached
// (both difference vectors must have all weight changes smaller than min_weight_change)
@ -1100,6 +1128,8 @@ int main(int argc, char** argv) {
}
}
mixedWeightsBeforePrevious = mixedWeightsPrevious;
mixedWeightsPrevious = mixedWeights;
mixedAverageWeightsBeforePrevious = mixedAverageWeightsPrevious;
mixedAverageWeightsPrevious = mixedAverageWeights;
#ifdef MPI_ENABLE
@ -1292,16 +1322,16 @@ void decodeHopeOrFear(size_t rank, size_t size, size_t decode, string filename,
vector<size_t> shard;
float shardSize = (float) (order.size()) / size;
VERBOSE(1, "Rank " << rank << ", shard size: " << shardSize << endl);
size_t shardStart = (size_t) (shardSize * rank);
size_t shardEnd = (size_t) (shardSize * (rank + 1));
if (rank == size - 1)
if (rank == size - 1) {
shardEnd = inputSentences.size();
shardSize = shardEnd - shardStart;
}
VERBOSE(1, "Rank " << rank << ", shard start: " << shardStart << " Shard end: " << shardEnd << endl);
shardSize = shardEnd - shardStart;
VERBOSE(1, "Rank " << rank << ", shard size: " << shardSize << endl);
shard.resize(shardSize);
copy(order.begin() + shardStart, order.begin() + shardEnd, shard.begin());
VERBOSE(1, "Rank " << rank << ", actual shard size: " << shard.size() << endl);
// open files for writing
stringstream fname;

View File

@ -271,12 +271,13 @@ namespace Moses {
if (rhs.m_coreFeatures.size() > m_coreFeatures.size())
resize(rhs.m_coreFeatures.size());
for (const_iterator i = rhs.cbegin(); i != rhs.cend(); ++i)
set(i->first, get(i->first) + i->second);
for (size_t i = 0; i < m_coreFeatures.size(); ++i) {
set(i->first, get(i->first) + i->second);
/* for (size_t i = 0; i < m_coreFeatures.size(); ++i) {
if (i < rhs.m_coreFeatures.size()) {
m_coreFeatures[i] += rhs.m_coreFeatures[i];
}
}
}*/
for (size_t i = 0; i < rhs.m_coreFeatures.size(); ++i)
m_coreFeatures[i] += rhs.m_coreFeatures[i];
return *this;
}
@ -286,6 +287,25 @@ namespace Moses {
set(i->first, get(i->first) + i->second);
}
// count non-zero occurrences for all sparse features
void FVector::setToBinaryOf(const FVector& rhs) {
for (const_iterator i = rhs.cbegin(); i != rhs.cend(); ++i)
if (rhs.get(i->first) != 0)
set(i->first, 1);
for (size_t i = 0; i < rhs.m_coreFeatures.size(); ++i)
m_coreFeatures[i] = 1;
}
// lhs vector is a sum of vectors, rhs vector holds number of non-zero summands
FVector& FVector::divideEquals(const FVector& rhs) {
assert(m_coreFeatures.size() == rhs.m_coreFeatures.size());
for (const_iterator i = rhs.cbegin(); i != rhs.cend(); ++i)
set(i->first, get(i->first)/rhs.get(i->first)); // divide by number of summands
for (size_t i = 0; i < rhs.m_coreFeatures.size(); ++i)
m_coreFeatures[i] /= rhs.m_coreFeatures[i]; // divide by number of summands
return *this;
}
FVector& FVector::operator-= (const FVector& rhs) {
if (rhs.m_coreFeatures.size() > m_coreFeatures.size())
resize(rhs.m_coreFeatures.size());

View File

@ -117,7 +117,7 @@ namespace Moses {
* Change the number of core features
**/
void resize(size_t newsize);
typedef boost::unordered_map<FName,FValue,FNameHash, FNameEquals> FNVmap;
/** Iterators */
typedef FNVmap::iterator iterator;
@ -171,7 +171,7 @@ namespace Moses {
FVector& operator/= (const FValue& rhs);
FVector& max_equals(const FVector& rhs);
/** norms and sums */
FValue l1norm() const;
FValue l2norm() const;
@ -181,6 +181,8 @@ namespace Moses {
/** pretty printing */
std::ostream& print(std::ostream& out) const;
/** additional */
void logCoreFeatures(size_t baseOfLog);
//scale so that abs. value is less than maxvalue
void thresholdScale(float maxValue );
@ -190,6 +192,12 @@ namespace Moses {
void sparsePlusEquals(const FVector& rhs);
// vector which, for each element of the original vector, reflects whether an element is zero or non-zero
void setToBinaryOf(const FVector& rhs);
// divide each element by the number given in the rhs vector
FVector& divideEquals(const FVector& rhs);
#ifdef MPI_ENABLE
friend class boost::serialization::access;
#endif

View File

@ -92,6 +92,11 @@ void ScoreComponentCollection::DivideEquals(float scalar)
m_scores /= scalar;
}
void ScoreComponentCollection::DivideEquals(const ScoreComponentCollection& rhs)
{
m_scores.divideEquals(rhs.m_scores);
}
void ScoreComponentCollection::MultiplyEquals(const ScoreComponentCollection& rhs)
{
m_scores *= rhs.m_scores;

View File

@ -121,6 +121,11 @@ public:
return FVector(s_denseVectorSize);
}
void SetToBinaryOf(const ScoreComponentCollection& rhs)
{
m_scores.setToBinaryOf(rhs.m_scores);
}
//! Set all values to 0.0
void ZeroAll()
{
@ -129,6 +134,7 @@ public:
void MultiplyEquals(float scalar);
void DivideEquals(float scalar);
void DivideEquals(const ScoreComponentCollection& rhs);
void MultiplyEquals(const ScoreComponentCollection& rhs);
void MultiplyEquals(const ScoreProducer* sp, float scalar);