try to test nccl

This commit is contained in:
Marcin Junczys-Dowmunt 2018-06-24 12:47:33 -07:00
parent ff4725063e
commit afabe4e7e9
3 changed files with 201 additions and 96 deletions

View File

@ -38,7 +38,7 @@ set(EXT_LIBS ${EXT_LIBS} ${CMAKE_DL_LIBS})
if(COMPILE_CUDA)
find_package(CUDA "8.0" REQUIRED)
if(CUDA_FOUND)
set(EXT_LIBS ${EXT_LIBS} ${CUDA_curand_LIBRARY} ${CUDA_cusparse_LIBRARY})
set(EXT_LIBS ${EXT_LIBS} ${CUDA_curand_LIBRARY} ${CUDA_cusparse_LIBRARY} /usr/local/cuda/lib64/libnccl_static.a)
if(USE_CUDNN)
find_package(CUDNN "7.0")

View File

@ -2,6 +2,9 @@
#include "functional/functional.h"
#include "tensors/tensor_operators.h"
#include "cuda_runtime.h"
#include "nccl.h"
namespace marian {
void SyncGraphGroup::setScheduler(Ptr<Scheduler> scheduler) {
@ -52,129 +55,226 @@ void SyncGraphGroup::foreachDevice(const std::function<void(size_t, int)>& task)
t.join();
}
void SyncGraphGroup::initialize(const std::vector<Ptr<data::Batch>>& batches) {
// Initialize 0th graph with random weights in one forward step
{
THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]);
graphs_[0]->forward(););
// Copy weights from 0th graph to all other graphs
// to have equal weights across devices
ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1);
for(size_t i = 1; i < graphs_.size(); ++i) {
auto init = [&](size_t i) {
// initialize i-th graph and weights
builders_[i]->build(graphs_[i], batches[0]);
graphs_[i]->forward();
// overwrite weights of i-th graph with weights from 0th graph
graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals());
};
pool.enqueue(init, i);
}
}
// Initialize sharded parameter storage. For n devices
// each device stores 1/n-th of parameters.
// We also create sharded gradients and temporary storage.
if(params_.size() == 0) {
int totalSize = graphs_[0]->params()->vals()->size();
shardSize_ = ceil(totalSize / (float)devices_.size());
int pos = 0;
for(auto graph : graphs_) {
int __size__ = std::min(shardSize_, totalSize);
auto paramsAlloc = New<TensorAllocator>(graph->getBackend());
paramsAllocs_.push_back(paramsAlloc);
size_t chunks = movingAvg_ ? 3 : 2;
paramsAlloc->reserveExact(chunks * __size__ * sizeof(float));
Tensor param, tmp, paramAvg;
// set parameters to actual value from 0th graph
paramsAlloc->allocate(param, {1, __size__});
params_.push_back(param);
param->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__));
paramsAlloc->allocate(tmp, {1, __size__});
tmpTensors_.push_back(tmp);
if(movingAvg_) {
paramsAlloc->allocate(paramAvg, {1, __size__});
paramAvg->copyFrom(param);
paramsAvg_.push_back(paramAvg);
}
// move to next shard
pos += __size__;
totalSize -= __size__;
}
}
}
class NCCL {
private:
ncclComm_t* comms_;
cudaStream_t* streams_;
int* devs_;
public:
void init(size_t devNum) {
comms_ = new ncclComm_t[devNum];
streams_ = new cudaStream_t[devNum];
devs_ = new int[devNum];
for(int i = 0; i < devNum; ++i) {
devs_[i] = i;
cudaSetDevice(i);
cudaStreamCreate(&streams_[i]);
}
ncclCommInitAll(comms_, devNum, devs_);
}
void scatterReduce(const std::vector<Ptr<ExpressionGraph>>& graphs,
const std::vector<size_t>& pos,
const std::vector<size_t>& sizes) {
ncclGroupStart();
for (int i = 0; i < graphs.size(); ++i) {
const void* sendbuff = (const void*)graphs[i]->params()->grads()->data();
auto subgrad = graphs[i]->params()->grads()->subtensor(pos[i], sizes[i]);
void* recvbuff = subgrad->data();
ncclReduceScatter(sendbuff,
recvbuff,
sizes[0],
ncclFloat,
ncclSum,
comms_[i],
streams_[i]);
}
ncclGroupEnd();
for (int i = 0; i < graphs.size(); ++i) {
cudaSetDevice(i);
cudaStreamSynchronize(streams_[i]);
}
}
void allGather(const std::vector<Ptr<ExpressionGraph>>& graphs,
const std::vector<Tensor>& params) {
ncclGroupStart();
for (int i = 0; i < graphs.size(); ++i) {
const void* sendbuff = (const void*)params[i]->data();
size_t sendcount = params[0]->size();
void* recvbuff = (void*)graphs[i]->params()->grads()->data();
ncclAllGather(sendbuff,
recvbuff,
sendcount,
ncclFloat,
comms_[i],
streams_[i]);
}
ncclGroupEnd();
for (int i = 0; i < graphs.size(); ++i) {
cudaSetDevice(i);
cudaStreamSynchronize(streams_[i]);
}
}
};
void SyncGraphGroup::execute(const std::vector<Ptr<data::Batch>>& batches) {
// if there are fewer batches than we need, split last batch into right number
// of pieces and replace last batch with the splits.
std::vector<Ptr<data::Batch>> newBatches = batches;
if(newBatches.size() < numBatches()) {
size_t splitFill = numBatches() - newBatches.size() + 1;
auto fillerBatches = newBatches.back()->split(splitFill);
newBatches.back() = fillerBatches[0];
for(int i = 1; i < splitFill; ++i)
newBatches.push_back(fillerBatches[i]);
}
float div = batches.size(); // no. of batches
// do not average gradients if cost type is sum.
if(options_->get<std::string>("cost-type") == "ce-sum")
div = 1;
std::vector<std::vector<Ptr<data::Batch>>> delayedBatches;
size_t devs = devices_.size();
for(int i = 0; i < delay_; ++i) {
delayedBatches.emplace_back();
size_t devs = devices_.size();
for(int j = 0; j < devs; ++j) {
delayedBatches.back().push_back(newBatches[i * devs + j]);
if(i * devs < batches.size()) {
delayedBatches.emplace_back();
for(int j = 0; j < devs; ++j) {
size_t index = i * devs + j;
if(index < batches.size())
delayedBatches.back().push_back(batches[i * devs + j]);
else
delayedBatches.back().push_back(nullptr);
}
}
}
NCCL nccl;
nccl.init(devices_.size());
std::vector<size_t> positions;
std::vector<size_t> sizes;
std::vector<float> costs(devices_.size(), 0.f);
size_t t = 1;
for(const auto& batches : delayedBatches) {
for(const auto& curBatches : delayedBatches) {
if(first_) {
{
// Initialize 0th graph with random weights in one forward step
THREAD_GUARD(builders_[0]->build(graphs_[0], batches[0]);
graphs_[0]->forward(););
initialize(curBatches);
// Copy weights from 0th graph to all other graphs
// to have equal weights across devices
ThreadPool pool(graphs_.size() - 1, graphs_.size() - 1);
for(size_t i = 1; i < graphs_.size(); ++i) {
auto init = [&](size_t i) {
// initialize i-th graph and weights
builders_[i]->build(graphs_[i], batches[0]);
graphs_[i]->forward();
// overwrite weights of i-th graph with weights from 0th graph
graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals());
};
pool.enqueue(init, i);
}
int pos = 0;
for(int idx = 0; idx < devices_.size(); ++idx) {
positions.push_back(pos);
sizes.push_back(params_[idx]->size());
}
// Initialize sharded parameter storage. For n devices
// each device stores 1/n-th of parameters.
// We also create sharded gradients and temporary storage.
if(params_.size() == 0) {
int totalSize = graphs_[0]->params()->vals()->size();
shardSize_ = ceil(totalSize / (float)devices_.size());
int pos = 0;
for(auto graph : graphs_) {
int __size__ = std::min(shardSize_, totalSize);
auto paramsAlloc = New<TensorAllocator>(graph->getBackend());
paramsAllocs_.push_back(paramsAlloc);
size_t chunks = movingAvg_ ? 3 : 2;
paramsAlloc->reserveExact(chunks * __size__ * sizeof(float));
Tensor param, tmp, paramAvg;
// set parameters to actual value from 0th graph
paramsAlloc->allocate(param, {1, __size__});
params_.push_back(param);
param->copyFrom(graphs_[0]->params()->vals()->subtensor(pos, __size__));
paramsAlloc->allocate(tmp, {1, __size__});
tmpTensors_.push_back(tmp);
if(movingAvg_) {
paramsAlloc->allocate(paramAvg, {1, __size__});
paramAvg->copyFrom(param);
paramsAvg_.push_back(paramAvg);
}
// move to next shard
pos += __size__;
totalSize -= __size__;
}
}
first_ = false;
}
// execute single forward/backward step
auto taskForwardBackward = [this, &costs, batches, t](size_t idx, int pos) {
// Execute single forward/backward step
auto taskForwardBackward = [this, &costs, curBatches, t](size_t idx, int pos) {
auto graph = graphs_[idx];
auto batch = batches[idx];
auto batch = curBatches[idx];
auto costNode = builders_[idx]->build(graph, batch);
graph->forward();
costs[idx] += costNode->scalar();
graph->backward(t == 1);
if(batch) {
auto costNode = builders_[idx]->build(graph, batch);
graph->forward();
costs[idx] += costNode->scalar();
// only reset gradients to 0 if t == 1
graph->backward(t == 1);
}
};
// device index corresponds to shard index
auto taskGather = [this, batches](size_t idx, int pos) {
// Gather gradients from different devices into current gradient shards
auto taskGather = [this, curBatches, div](size_t idx, int pos) {
int shardSize = params_[idx]->size();
float div = devices_.size(); // no. of GPUs
// do not average gradients if cost type is sum.
if(options_->get<std::string>("cost-type") == "ce-sum")
div = 1;
auto batch = curBatches[idx];
if(batch) {
auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize);
auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize);
// collect and sum gradients
// to be replaced with ncclScatterReduce
for(auto graph : graphs_) {
if(graph != graphs_[idx]) {
auto subGrad = graph->params()->grads()->subtensor(pos, shardSize);
tmpTensors_[idx]->copyFrom(subGrad);
// collect and sum gradients
// to be replaced with ncclScatterReduce
for(auto graph : graphs_) {
if(graph != graphs_[idx]) {
auto subGrad = graph->params()->grads()->subtensor(pos, shardSize);
tmpTensors_[idx]->copyFrom(subGrad);
using namespace functional;
Element(_1 = _1 + (_2 / div), curGrad, tmpTensors_[idx]);
using namespace functional;
Element(_1 = _1 + (_2 / div), curGrad, tmpTensors_[idx]);
}
}
}
};
// Update parameter shard with gradient shard
auto taskUpdate = [this](size_t idx, int pos) {
int shardSize = params_[idx]->size();
auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize);
@ -185,6 +285,7 @@ void SyncGraphGroup::execute(const std::vector<Ptr<data::Batch>>& batches) {
paramsAvg_[idx], params_[idx], scheduler_->numberOfBatches());
};
// Update all graphs with parameter shard
auto taskBroadcast = [this](size_t idx, int pos) {
int shardSize = params_[idx]->size();
auto curGrad = graphs_[idx]->params()->grads()->subtensor(pos, shardSize);
@ -198,10 +299,12 @@ void SyncGraphGroup::execute(const std::vector<Ptr<data::Batch>>& batches) {
foreachDevice(taskForwardBackward);
if(t == delay_) {
foreachDevice(taskGather);
if(t == delayedBatches.size()) {
nccl.scatterReduce(graphs_, positions, sizes);
//foreachDevice(taskGather);
foreachDevice(taskUpdate);
foreachDevice(taskBroadcast);
nccl.allGather(graphs_, params_);
//foreachDevice(taskBroadcast);
}
t++;

View File

@ -32,11 +32,13 @@ private:
float mvDecay_{1e-4};
size_t delay_{1};
void initialize(const std::vector<Ptr<data::Batch>>& batches);
void updateMovingAverage(Tensor paramsAvg, Tensor params, size_t batches);
void fetchParams(Tensor oldParams, const std::vector<Tensor>& params);
void execute(const std::vector<Ptr<data::Batch>>& batch);
void execute(const std::vector<Ptr<data::Batch>>& batches);
void foreachDevice(const std::function<void(size_t,int)>&);