From afabe4e7e9c2e97d7d234d39cd76a37bf5a3af11 Mon Sep 17 00:00:00 2001 From: Marcin Junczys-Dowmunt Date: Sun, 24 Jun 2018 12:47:33 -0700 Subject: [PATCH] try to test nccl --- CMakeLists.txt | 2 +- src/training/graph_group_sync.cpp | 291 ++++++++++++++++++++---------- src/training/graph_group_sync.h | 4 +- 3 files changed, 201 insertions(+), 96 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c5f0a5f..ecf8210b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ set(EXT_LIBS ${EXT_LIBS} ${CMAKE_DL_LIBS}) if(COMPILE_CUDA) find_package(CUDA "8.0" REQUIRED) if(CUDA_FOUND) - set(EXT_LIBS ${EXT_LIBS} ${CUDA_curand_LIBRARY} ${CUDA_cusparse_LIBRARY}) + set(EXT_LIBS ${EXT_LIBS} ${CUDA_curand_LIBRARY} ${CUDA_cusparse_LIBRARY} /usr/local/cuda/lib64/libnccl_static.a) if(USE_CUDNN) find_package(CUDNN "7.0") diff --git a/src/training/graph_group_sync.cpp b/src/training/graph_group_sync.cpp index 34784196..459fd876 100644 --- a/src/training/graph_group_sync.cpp +++ b/src/training/graph_group_sync.cpp @@ -2,6 +2,9 @@ #include "functional/functional.h" #include "tensors/tensor_operators.h" +#include "cuda_runtime.h" +#include "nccl.h" + namespace marian { void SyncGraphGroup::setScheduler(Ptr scheduler) { @@ -52,129 +55,226 @@ void SyncGraphGroup::foreachDevice(const std::function& task) t.join(); } +void SyncGraphGroup::initialize(const std::vector>& batches) { + // Initialize 0th graph with random weights in one forward step + { + THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]); + graphs_[0]->forward();); + + // Copy weights from 0th graph to all other graphs + // to have equal weights across devices + ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1); + for(size_t i = 1; i < graphs_.size(); ++i) { + auto init = [&](size_t i) { + // initialize i-th graph and weights + builders_[i]->build(graphs_[i], batches[0]); + graphs_[i]->forward(); + // overwrite weights of i-th graph with weights from 0th graph + graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals()); + }; + pool.enqueue(init, i); + } + } + + // Initialize sharded parameter storage. For n devices + // each device stores 1/n-th of parameters. + // We also create sharded gradients and temporary storage. + if(params_.size() == 0) { + int totalSize = graphs_[0]->params()->vals()->size(); + shardSize_ = ceil(totalSize / (float)devices_.size()); + + int pos = 0; + for(auto graph : graphs_) { + int __size__ = std::min(shardSize_, totalSize); + + auto paramsAlloc = New(graph->getBackend()); + paramsAllocs_.push_back(paramsAlloc); + + size_t chunks = movingAvg_ ? 3 : 2; + paramsAlloc->reserveExact(chunks * __size__ * sizeof(float)); + + Tensor param, tmp, paramAvg; + + // set parameters to actual value from 0th graph + paramsAlloc->allocate(param, {1, __size__}); + params_.push_back(param); + param->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__)); + + paramsAlloc->allocate(tmp, {1, __size__}); + tmpTensors_.push_back(tmp); + + if(movingAvg_) { + paramsAlloc->allocate(paramAvg, {1, __size__}); + paramAvg->copyFrom(param); + paramsAvg_.push_back(paramAvg); + } + + // move to next shard + pos += __size__; + totalSize -= __size__; + } + } +} + +class NCCL { +private: + ncclComm_t* comms_; + cudaStream_t* streams_; + int* devs_; + +public: + void init(size_t devNum) { + comms_ = new ncclComm_t[devNum]; + streams_ = new cudaStream_t[devNum]; + devs_ = new int[devNum]; + + for(int i = 0; i < devNum; ++i) { + devs_[i] = i; + cudaSetDevice(i); + cudaStreamCreate(&streams_[i]); + } + ncclCommInitAll(comms_, devNum, devs_); + } + + void scatterReduce(const std::vector>& graphs, + const std::vector& pos, + const std::vector& sizes) { + ncclGroupStart(); + for (int i = 0; i < graphs.size(); ++i) { + + const void* sendbuff = (const void*)graphs[i]->params()->grads()->data(); + auto subgrad = graphs[i]->params()->grads()->subtensor(pos[i], sizes[i]); + void* recvbuff = subgrad->data(); + + ncclReduceScatter(sendbuff, + recvbuff, + sizes[0], + ncclFloat, + ncclSum, + comms_[i], + streams_[i]); + } + ncclGroupEnd(); + + for (int i = 0; i < graphs.size(); ++i) { + cudaSetDevice(i); + cudaStreamSynchronize(streams_[i]); + } + } + + void allGather(const std::vector>& graphs, + const std::vector& params) { + + ncclGroupStart(); + for (int i = 0; i < graphs.size(); ++i) { + + const void* sendbuff = (const void*)params[i]->data(); + size_t sendcount = params[0]->size(); + + void* recvbuff = (void*)graphs[i]->params()->grads()->data(); + + ncclAllGather(sendbuff, + recvbuff, + sendcount, + ncclFloat, + comms_[i], + streams_[i]); + } + ncclGroupEnd(); + + for (int i = 0; i < graphs.size(); ++i) { + cudaSetDevice(i); + cudaStreamSynchronize(streams_[i]); + } + } +}; + void SyncGraphGroup::execute(const std::vector>& batches) { // if there are fewer batches than we need, split last batch into right number // of pieces and replace last batch with the splits. - std::vector> newBatches = batches; - if(newBatches.size() < numBatches()) { - size_t splitFill = numBatches() - newBatches.size() + 1; - auto fillerBatches = newBatches.back()->split(splitFill); - newBatches.back() = fillerBatches[0]; - for(int i = 1; i < splitFill; ++i) - newBatches.push_back(fillerBatches[i]); - } + float div = batches.size(); // no. of batches + // do not average gradients if cost type is sum. + if(options_->get("cost-type") == "ce-sum") + div = 1; std::vector>> delayedBatches; + size_t devs = devices_.size(); + for(int i = 0; i < delay_; ++i) { - delayedBatches.emplace_back(); - size_t devs = devices_.size(); - for(int j = 0; j < devs; ++j) { - delayedBatches.back().push_back(newBatches[i * devs + j]); + if(i * devs < batches.size()) { + delayedBatches.emplace_back(); + for(int j = 0; j < devs; ++j) { + size_t index = i * devs + j; + if(index < batches.size()) + delayedBatches.back().push_back(batches[i * devs + j]); + else + delayedBatches.back().push_back(nullptr); + } } } + NCCL nccl; + nccl.init(devices_.size()); + + std::vector positions; + std::vector sizes; + + std::vector costs(devices_.size(), 0.f); size_t t = 1; - for(const auto& batches : delayedBatches) { + for(const auto& curBatches : delayedBatches) { if(first_) { - { - // Initialize 0th graph with random weights in one forward step - THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]); - graphs_[0]->forward();); + initialize(curBatches); - // Copy weights from 0th graph to all other graphs - // to have equal weights across devices - ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1); - for(size_t i = 1; i < graphs_.size(); ++i) { - auto init = [&](size_t i) { - // initialize i-th graph and weights - builders_[i]->build(graphs_[i], batches[0]); - graphs_[i]->forward(); - // overwrite weights of i-th graph with weights from 0th graph - graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals()); - }; - pool.enqueue(init, i); - } + int pos = 0; + for(int idx = 0; idx < devices_.size(); ++idx) { + positions.push_back(pos); + sizes.push_back(params_[idx]->size()); } - // Initialize sharded parameter storage. For n devices - // each device stores 1/n-th of parameters. - // We also create sharded gradients and temporary storage. - if(params_.size() == 0) { - int totalSize = graphs_[0]->params()->vals()->size(); - shardSize_ = ceil(totalSize / (float)devices_.size()); - - int pos = 0; - for(auto graph : graphs_) { - int __size__ = std::min(shardSize_, totalSize); - - auto paramsAlloc = New(graph->getBackend()); - paramsAllocs_.push_back(paramsAlloc); - - size_t chunks = movingAvg_ ? 3 : 2; - paramsAlloc->reserveExact(chunks * __size__ * sizeof(float)); - - Tensor param, tmp, paramAvg; - - // set parameters to actual value from 0th graph - paramsAlloc->allocate(param, {1, __size__}); - params_.push_back(param); - param->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__)); - - paramsAlloc->allocate(tmp, {1, __size__}); - tmpTensors_.push_back(tmp); - - if(movingAvg_) { - paramsAlloc->allocate(paramAvg, {1, __size__}); - paramAvg->copyFrom(param); - paramsAvg_.push_back(paramAvg); - } - - // move to next shard - pos += __size__; - totalSize -= __size__; - } - } first_ = false; } - // execute single forward/backward step - auto taskForwardBackward = [this, &costs, batches, t](size_t idx, int pos) { + // Execute single forward/backward step + auto taskForwardBackward = [this, &costs, curBatches, t](size_t idx, int pos) { auto graph = graphs_[idx]; - auto batch = batches[idx]; + auto batch = curBatches[idx]; - auto costNode = builders_[idx]->build(graph, batch); - graph->forward(); - costs[idx] += costNode->scalar(); - graph->backward(t == 1); + if(batch) { + auto costNode = builders_[idx]->build(graph, batch); + graph->forward(); + costs[idx] += costNode->scalar(); + + // only reset gradients to 0 if t == 1 + graph->backward(t == 1); + } }; - // device index corresponds to shard index - auto taskGather = [this, batches](size_t idx, int pos) { + // Gather gradients from different devices into current gradient shards + auto taskGather = [this, curBatches, div](size_t idx, int pos) { int shardSize = params_[idx]->size(); - float div = devices_.size(); // no. of GPUs - // do not average gradients if cost type is sum. - if(options_->get("cost-type") == "ce-sum") - div = 1; + auto batch = curBatches[idx]; + if(batch) { + auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize); - auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize); + // collect and sum gradients + // to be replaced with ncclScatterReduce + for(auto graph : graphs_) { + if(graph != graphs_[idx]) { + auto subGrad = graph->params()->grads()->subtensor(pos, shardSize); + tmpTensors_[idx]->copyFrom(subGrad); - // collect and sum gradients - // to be replaced with ncclScatterReduce - for(auto graph : graphs_) { - if(graph != graphs_[idx]) { - auto subGrad = graph->params()->grads()->subtensor(pos, shardSize); - tmpTensors_[idx]->copyFrom(subGrad); - - using namespace functional; - Element(_1 = _1 + (_2 / div), curGrad, tmpTensors_[idx]); + using namespace functional; + Element(_1 = _1 + (_2 / div), curGrad, tmpTensors_[idx]); + } } } }; + // Update parameter shard with gradient shard auto taskUpdate = [this](size_t idx, int pos) { int shardSize = params_[idx]->size(); auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize); @@ -185,6 +285,7 @@ void SyncGraphGroup::execute(const std::vector>& batches) { paramsAvg_[idx], params_[idx], scheduler_->numberOfBatches()); }; + // Update all graphs with parameter shard auto taskBroadcast = [this](size_t idx, int pos) { int shardSize = params_[idx]->size(); auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize); @@ -198,10 +299,12 @@ void SyncGraphGroup::execute(const std::vector>& batches) { foreachDevice(taskForwardBackward); - if(t == delay_) { - foreachDevice(taskGather); + if(t == delayedBatches.size()) { + nccl.scatterReduce(graphs_, positions, sizes); + //foreachDevice(taskGather); foreachDevice(taskUpdate); - foreachDevice(taskBroadcast); + nccl.allGather(graphs_, params_); + //foreachDevice(taskBroadcast); } t++; diff --git a/src/training/graph_group_sync.h b/src/training/graph_group_sync.h index 049db269..09a15f88 100644 --- a/src/training/graph_group_sync.h +++ b/src/training/graph_group_sync.h @@ -32,11 +32,13 @@ private: float mvDecay_{1e-4}; size_t delay_{1}; + void initialize(const std::vector>& batches); + void updateMovingAverage(Tensor paramsAvg, Tensor params, size_t batches); void fetchParams(Tensor oldParams, const std::vector& params); - void execute(const std::vector>& batch); + void execute(const std::vector>& batches); void foreachDevice(const std::function&);