From e1601ef1541c1b67db224c9d21b41eb9e454ff31 Mon Sep 17 00:00:00 2001 From: Eva Date: Tue, 13 Mar 2012 16:34:48 +0000 Subject: [PATCH] replace parameter --mix-by-averaging with --sparse-average, fix bug in sharding, fix bug in --dump-mixed-weights --- mira/Main.cpp | 154 +++++++++++++++---------- moses/src/FeatureVector.cpp | 28 ++++- moses/src/FeatureVector.h | 12 +- moses/src/ScoreComponentCollection.cpp | 5 + moses/src/ScoreComponentCollection.h | 6 + 5 files changed, 137 insertions(+), 68 deletions(-) diff --git a/mira/Main.cpp b/mira/Main.cpp index 403e11dd6..3c8317471 100644 --- a/mira/Main.cpp +++ b/mira/Main.cpp @@ -123,7 +123,7 @@ int main(int argc, char** argv) { string decode_filename; size_t update_scheme; bool separateUpdates, batchEqualsShard; - bool mixByAveraging; + bool sparseAverage; bool dumpMixedWeights; po::options_description desc("Allowed options"); desc.add_options() @@ -175,8 +175,7 @@ int main(int argc, char** argv) { ("min-oracle-bleu", po::value(&min_oracle_bleu)->default_value(0), "Set a minimum oracle BLEU score") ("min-weight-change", po::value(&min_weight_change)->default_value(0.01), "Set minimum weight change for stopping criterion") ("mira-learning-rate", po::value(&mira_learning_rate)->default_value(1), "Learning rate for MIRA (fixed or flexible)") - ("mixing-frequency", po::value(&mixingFrequency)->default_value(5), "How often per epoch to mix weights, when using mpi") - ("mix-by-averaging", po::value(&mixByAveraging)->default_value(true), "Average weights by the number of processes") + ("mixing-frequency", po::value(&mixingFrequency)->default_value(1), "How often per epoch to mix weights, when using mpi") ("model-hope-fear", po::value(&model_hope_fear)->default_value(false), "Use model, hope and fear translations for optimisation") ("nbest,n", po::value(&n)->default_value(1), "Number of translations in n-best list") ("normalise", po::value(&normaliseWeights)->default_value(false), "Whether to normalise the updated weights before passing them to the decoder") @@ -200,6 +199,7 @@ int main(int argc, char** argv) { ("shuffle", po::value(&shuffle)->default_value(false), "Shuffle input sentences before processing") ("slack-min", po::value(&slack_min)->default_value(0.01), "Minimum slack used") ("slack-step", po::value(&slack_step)->default_value(0), "Increase slack from epoch to epoch by the value provided") + ("sparse-average", po::value(&sparseAverage)->default_value(false), "Average weights by the number of processes") ("stop-weights", po::value(&weightConvergence)->default_value(true), "Stop when weights converge") ("threads", po::value(&threadcount)->default_value(1), "Number of threads used") ("update-scheme", po::value(&update_scheme)->default_value(1), "Update scheme, default: 1") @@ -245,6 +245,11 @@ int main(int argc, char** argv) { exit(1); } + if (sparseAverage && averageWeights) { + cerr << "Parameters --sparse-average 1 and --average-weights 1 are incompatible (not implemented)" << endl; + exit(1); + } + if (mosesConfigFile.empty()) { cerr << "Error: No moses ini file specified" << endl; return 1; @@ -410,11 +415,13 @@ int main(int argc, char** argv) { // Create shards according to the number of processes used vector shard; float shardSize = (float) (order.size()) / size; - VERBOSE(1, "Shard size: " << shardSize << endl); size_t shardStart = (size_t) (shardSize * rank); size_t shardEnd = (size_t) (shardSize * (rank + 1)); - if (rank == size - 1) + if (rank == size - 1) { shardEnd = order.size(); + shardSize = shardEnd - shardStart; + } + VERBOSE(1, "Shard size: " << shardSize << endl); VERBOSE(1, "Rank: " << rank << " Shard start: " << shardStart << " Shard end: " << shardEnd << endl); shard.resize(shardSize); copy(order.begin() + shardStart, order.begin() + shardEnd, shard.begin()); @@ -447,15 +454,11 @@ int main(int argc, char** argv) { } decoder->setWeights(initialWeights); - if (!mixByAveraging && coreWeightMap.size() == 0) { - cerr << "Trying to mix without averaging, but cannot because of core features.." << endl; - exit(1); - } - //Main loop: // print initial weights cerr << "Rank " << rank << ", initial weights: " << initialWeights << endl; ScoreComponentCollection cumulativeWeights; // collect weights per epoch to produce an average + ScoreComponentCollection cumulativeWeightsBinary; size_t numberOfUpdates = 0; size_t numberOfUpdatesThisEpoch = 0; @@ -467,23 +470,25 @@ int main(int argc, char** argv) { float sumOfInputs = 0; size_t numberOfInputs = 0; + ScoreComponentCollection mixedWeights; + ScoreComponentCollection mixedWeightsPrevious; + ScoreComponentCollection mixedWeightsBeforePrevious; ScoreComponentCollection mixedAverageWeights; ScoreComponentCollection mixedAverageWeightsPrevious; ScoreComponentCollection mixedAverageWeightsBeforePrevious; bool stop = false; // int sumStillViolatedConstraints; - float *sendbuf, *recvbuf; - sendbuf = (float *) malloc(sizeof(float)); - recvbuf = (float *) malloc(sizeof(float)); for (size_t epoch = 0; epoch < epochs && !stop; ++epoch) { // sum of violated constraints in an epoch // sumStillViolatedConstraints = 0; numberOfUpdatesThisEpoch = 0; // Sum up weights over one epoch, final average uses weights from last epoch - if (!accumulateWeights) + if (!accumulateWeights) { cumulativeWeights.ZeroAll(); + cumulativeWeightsBinary.ZeroAll(); + } // number of weight dumps this epoch size_t weightEpochDump = 0; @@ -885,6 +890,12 @@ int main(int argc, char** argv) { mosesWeights.L1Normalise(); cumulativeWeights.PlusEquals(mosesWeights); + if (sparseAverage) { + ScoreComponentCollection binary; + binary.SetToBinaryOf(mosesWeights); + cumulativeWeightsBinary.PlusEquals(binary); + } + ++numberOfUpdates; ++numberOfUpdatesThisEpoch; if (averageWeights) { @@ -921,23 +932,26 @@ int main(int argc, char** argv) { size_t mixing_base = mixingFrequency == 0 ? 0 : shard.size() / mixingFrequency; size_t dumping_base = weightDumpFrequency ==0 ? 0 : shard.size() / weightDumpFrequency; + bool mix = evaluateModulo(shardPosition, mixing_base, actualBatchSize); // mix weights? - ScoreComponentCollection mixedWeights; - if (evaluateModulo(shardPosition, mixing_base, actualBatchSize)) { + if (mix) { #ifdef MPI_ENABLE // collect all weights in mixedWeights and divide by number of processes mpi::reduce(world, mosesWeights, mixedWeights, SCCPlus(), 0); + ScoreComponentCollection totalBinary; + if (sparseAverage) { + ScoreComponentCollection binary; + binary.SetToBinaryOf(mosesWeights); + mpi::reduce(world, binary, totalBinary, SCCPlus(), 0); + } if (rank == 0) { // divide by number of processes - if (mixByAveraging) - mixedWeights.DivideEquals(size); - else { - // can only skip this if we have fixed core features, reset them here - ProducerWeightMap::iterator p; - for(p = coreWeightMap.begin(); p!=coreWeightMap.end(); ++p) - mixedWeights.Assign(p->first, p->second); + if (sparseAverage) { + mixedWeights.DivideEquals(totalBinary); } + else + mixedWeights.DivideEquals(size); // normalise weights after averaging if (normaliseWeights) { @@ -958,19 +972,21 @@ int main(int argc, char** argv) { // Dump weights? if (dumpMixedWeights) { - // dump mixed weights instead of average weights - ostringstream filename; - if (epoch < 10) - filename << weightDumpStem << "_0" << epoch; - else - filename << weightDumpStem << "_" << epoch; + if (mix && rank == 0 && !weightDumpStem.empty()) { + // dump mixed weights instead of average weights + ostringstream filename; + if (epoch < 10) + filename << weightDumpStem << "_0" << epoch; + else + filename << weightDumpStem << "_" << epoch; - if (weightDumpFrequency > 1) - filename << "_" << weightEpochDump; + if (weightDumpFrequency > 1) + filename << "_" << weightEpochDump; - cerr << "Dumping mixed weights during epoch " << epoch << " to " << filename.str() << endl << endl; - mixedWeights.Save(filename.str()); - ++weightEpochDump; + cerr << "Dumping mixed weights during epoch " << epoch << " to " << filename.str() << endl << endl; + mixedWeights.Save(filename.str()); + ++weightEpochDump; + } } else { if (evaluateModulo(shardPosition, dumping_base, actualBatchSize)) { @@ -983,30 +999,35 @@ int main(int argc, char** argv) { } } else { if (numberOfUpdatesThisEpoch > 0) { - tmpAverageWeights.DivideEquals(numberOfUpdatesThisEpoch); - proceed = true; + if (sparseAverage) + tmpAverageWeights.DivideEquals(cumulativeWeightsBinary); + else + tmpAverageWeights.DivideEquals(numberOfUpdatesThisEpoch); + proceed = true; } } if (proceed) { #ifdef MPI_ENABLE // average across processes - mpi::reduce(world, tmpAverageWeights, mixedAverageWeights, SCCPlus(), 0); + mpi::reduce(world, tmpAverageWeights, mixedAverageWeights, SCCPlus(), 0); + ScoreComponentCollection totalBinary; + if (sparseAverage) { + ScoreComponentCollection binary; + binary.SetToBinaryOf(mosesWeights); + mpi::reduce(world, binary, totalBinary, SCCPlus(), 0); + } #endif #ifndef MPI_ENABLE mixedAverageWeights = tmpAverageWeights; #endif if (rank == 0 && !weightDumpStem.empty()) { // divide by number of processes - if (mixByAveraging) - mixedAverageWeights.DivideEquals(size); - else { - // can only skip this if we have fixed core features, reset them here - ProducerWeightMap::iterator p; - for(p = coreWeightMap.begin(); p!=coreWeightMap.end(); ++p) - mixedAverageWeights.Assign(p->first, p->second); - } - + if (sparseAverage) + mixedAverageWeights.DivideEquals(totalBinary); + else + mixedAverageWeights.DivideEquals(size); + // normalise weights after averaging if (normaliseWeights) { mixedAverageWeights.L1Normalise(); @@ -1024,11 +1045,11 @@ int main(int argc, char** argv) { filename << "_" << weightEpochDump; } - if (accumulateWeights) { - // cerr << "\nMixed average weights (cumulative) during epoch " << epoch << ": " << mixedAverageWeights << endl; +/* if (accumulateWeights) { + cerr << "\nMixed average weights (cumulative) during epoch " << epoch << ": " << mixedAverageWeights << endl; } else { - // cerr << "\nMixed average weights during epoch " << epoch << ": " << mixedAverageWeights << endl; - } + cerr << "\nMixed average weights during epoch " << epoch << ": " << mixedAverageWeights << endl; + }*/ cerr << "Dumping mixed average weights during epoch " << epoch << " to " << filename.str() << endl << endl; mixedAverageWeights.Save(filename.str()); @@ -1074,14 +1095,21 @@ int main(int argc, char** argv) { if (weightConvergence) { bool reached = true; if (rank == 0 && (epoch >= 2)) { - ScoreComponentCollection firstDiff(mixedAverageWeights); - firstDiff.MinusEquals(mixedAverageWeightsPrevious); - VERBOSE(1, "Average weight changes since previous epoch: " << firstDiff << - " (max: " << firstDiff.GetLInfNorm() << ")" << endl); - ScoreComponentCollection secondDiff(mixedAverageWeights); - secondDiff.MinusEquals(mixedAverageWeightsBeforePrevious); - VERBOSE(1, "Average weight changes since before previous epoch: " << secondDiff << - " (max: " << secondDiff.GetLInfNorm() << ")" << endl << endl); + ScoreComponentCollection firstDiff, secondDiff; + if (dumpMixedWeights) { + firstDiff = mixedWeights; + firstDiff.MinusEquals(mixedWeightsPrevious); + secondDiff = mixedWeights; + secondDiff.MinusEquals(mixedWeightsBeforePrevious); + } + else { + firstDiff = mixedAverageWeights; + firstDiff.MinusEquals(mixedAverageWeightsPrevious); + secondDiff = mixedAverageWeights; + secondDiff.MinusEquals(mixedAverageWeightsBeforePrevious); + } + VERBOSE(1, "Average weight changes since previous epoch: " << firstDiff << " (max: " << firstDiff.GetLInfNorm() << ")" << endl); + VERBOSE(1, "Average weight changes since before previous epoch: " << secondDiff << " (max: " << secondDiff.GetLInfNorm() << ")" << endl << endl); // check whether stopping criterion has been reached // (both difference vectors must have all weight changes smaller than min_weight_change) @@ -1100,6 +1128,8 @@ int main(int argc, char** argv) { } } + mixedWeightsBeforePrevious = mixedWeightsPrevious; + mixedWeightsPrevious = mixedWeights; mixedAverageWeightsBeforePrevious = mixedAverageWeightsPrevious; mixedAverageWeightsPrevious = mixedAverageWeights; #ifdef MPI_ENABLE @@ -1292,16 +1322,16 @@ void decodeHopeOrFear(size_t rank, size_t size, size_t decode, string filename, vector shard; float shardSize = (float) (order.size()) / size; - VERBOSE(1, "Rank " << rank << ", shard size: " << shardSize << endl); size_t shardStart = (size_t) (shardSize * rank); size_t shardEnd = (size_t) (shardSize * (rank + 1)); - if (rank == size - 1) + if (rank == size - 1) { shardEnd = inputSentences.size(); + shardSize = shardEnd - shardStart; + } VERBOSE(1, "Rank " << rank << ", shard start: " << shardStart << " Shard end: " << shardEnd << endl); - shardSize = shardEnd - shardStart; + VERBOSE(1, "Rank " << rank << ", shard size: " << shardSize << endl); shard.resize(shardSize); copy(order.begin() + shardStart, order.begin() + shardEnd, shard.begin()); - VERBOSE(1, "Rank " << rank << ", actual shard size: " << shard.size() << endl); // open files for writing stringstream fname; diff --git a/moses/src/FeatureVector.cpp b/moses/src/FeatureVector.cpp index 40b11b0fd..aef7551f9 100644 --- a/moses/src/FeatureVector.cpp +++ b/moses/src/FeatureVector.cpp @@ -271,12 +271,13 @@ namespace Moses { if (rhs.m_coreFeatures.size() > m_coreFeatures.size()) resize(rhs.m_coreFeatures.size()); for (const_iterator i = rhs.cbegin(); i != rhs.cend(); ++i) - set(i->first, get(i->first) + i->second); - for (size_t i = 0; i < m_coreFeatures.size(); ++i) { + set(i->first, get(i->first) + i->second); +/* for (size_t i = 0; i < m_coreFeatures.size(); ++i) { if (i < rhs.m_coreFeatures.size()) { m_coreFeatures[i] += rhs.m_coreFeatures[i]; - } - } + }*/ + for (size_t i = 0; i < rhs.m_coreFeatures.size(); ++i) + m_coreFeatures[i] += rhs.m_coreFeatures[i]; return *this; } @@ -286,6 +287,25 @@ namespace Moses { set(i->first, get(i->first) + i->second); } + // count non-zero occurrences for all sparse features + void FVector::setToBinaryOf(const FVector& rhs) { + for (const_iterator i = rhs.cbegin(); i != rhs.cend(); ++i) + if (rhs.get(i->first) != 0) + set(i->first, 1); + for (size_t i = 0; i < rhs.m_coreFeatures.size(); ++i) + m_coreFeatures[i] = 1; + } + + // lhs vector is a sum of vectors, rhs vector holds number of non-zero summands + FVector& FVector::divideEquals(const FVector& rhs) { + assert(m_coreFeatures.size() == rhs.m_coreFeatures.size()); + for (const_iterator i = rhs.cbegin(); i != rhs.cend(); ++i) + set(i->first, get(i->first)/rhs.get(i->first)); // divide by number of summands + for (size_t i = 0; i < rhs.m_coreFeatures.size(); ++i) + m_coreFeatures[i] /= rhs.m_coreFeatures[i]; // divide by number of summands + return *this; + } + FVector& FVector::operator-= (const FVector& rhs) { if (rhs.m_coreFeatures.size() > m_coreFeatures.size()) resize(rhs.m_coreFeatures.size()); diff --git a/moses/src/FeatureVector.h b/moses/src/FeatureVector.h index ce116691c..c96283a20 100644 --- a/moses/src/FeatureVector.h +++ b/moses/src/FeatureVector.h @@ -117,7 +117,7 @@ namespace Moses { * Change the number of core features **/ void resize(size_t newsize); - + typedef boost::unordered_map FNVmap; /** Iterators */ typedef FNVmap::iterator iterator; @@ -171,7 +171,7 @@ namespace Moses { FVector& operator/= (const FValue& rhs); FVector& max_equals(const FVector& rhs); - + /** norms and sums */ FValue l1norm() const; FValue l2norm() const; @@ -181,6 +181,8 @@ namespace Moses { /** pretty printing */ std::ostream& print(std::ostream& out) const; + /** additional */ + void logCoreFeatures(size_t baseOfLog); //scale so that abs. value is less than maxvalue void thresholdScale(float maxValue ); @@ -190,6 +192,12 @@ namespace Moses { void sparsePlusEquals(const FVector& rhs); + // vector which, for each element of the original vector, reflects whether an element is zero or non-zero + void setToBinaryOf(const FVector& rhs); + + // divide each element by the number given in the rhs vector + FVector& divideEquals(const FVector& rhs); + #ifdef MPI_ENABLE friend class boost::serialization::access; #endif diff --git a/moses/src/ScoreComponentCollection.cpp b/moses/src/ScoreComponentCollection.cpp index 7a6ce2235..cd9957129 100644 --- a/moses/src/ScoreComponentCollection.cpp +++ b/moses/src/ScoreComponentCollection.cpp @@ -92,6 +92,11 @@ void ScoreComponentCollection::DivideEquals(float scalar) m_scores /= scalar; } +void ScoreComponentCollection::DivideEquals(const ScoreComponentCollection& rhs) +{ + m_scores.divideEquals(rhs.m_scores); +} + void ScoreComponentCollection::MultiplyEquals(const ScoreComponentCollection& rhs) { m_scores *= rhs.m_scores; diff --git a/moses/src/ScoreComponentCollection.h b/moses/src/ScoreComponentCollection.h index 855595e66..9ba3e4948 100644 --- a/moses/src/ScoreComponentCollection.h +++ b/moses/src/ScoreComponentCollection.h @@ -121,6 +121,11 @@ public: return FVector(s_denseVectorSize); } + void SetToBinaryOf(const ScoreComponentCollection& rhs) + { + m_scores.setToBinaryOf(rhs.m_scores); + } + //! Set all values to 0.0 void ZeroAll() { @@ -129,6 +134,7 @@ public: void MultiplyEquals(float scalar); void DivideEquals(float scalar); + void DivideEquals(const ScoreComponentCollection& rhs); void MultiplyEquals(const ScoreComponentCollection& rhs); void MultiplyEquals(const ScoreProducer* sp, float scalar);