Merged PR 17430: Refactors MPI interfaces and adds different types of gradient exchanges

* Refactors MPI-related code
* Adds node-local updates with occasional inter-node updates
* decouples batch-reading across nodes
This commit is contained in:
Martin Junczys-Dowmunt 2021-02-08 05:27:49 +00:00
parent 24e2e01027
commit 5aeea4e066
49 changed files with 889 additions and 399 deletions

View File

@ -8,6 +8,24 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
### Added
- Local/global sharding with MPI training via `--sharding local`
- fp16 support for factors.
- Correct training with fp16 via `--fp16`.
- Dynamic cost-scaling with `--cost-scaling`.
- Dynamic gradient-scaling with `--dynamic-gradient-scaling`.
### Fixed
- Support for CUDA 11.
- General improvements and fixes for MPI handling, was essentially non-functional before (syncing, random seeds, deadlocks during saving, validation etc.)
- Allow to compile -DUSE_MPI=on with -DUSE_STATIC_LIBS=on although MPI gets still linked dynamically since it has so many dependencies.
### Changed
- Change compile options a la -DCOMPILE_CUDA_SM35 to -DCOMPILE_KEPLER, -DCOMPILE_MAXWELL,
-DCOMPILE_PASCAL, -DCOMPILE_VOLTA, -DCOMPILE_TURING and -DCOMPILE_AMPERE
- Disable -DCOMPILE_KEPLER, -DCOMPILE_MAXWELL by default.
- Dropped support for legacy graph groups.
## [1.10.0] - 2021-02-06
### Added
@ -26,7 +44,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Decoding multi-source models in marian-server with --tsv
- GitHub workflows on Ubuntu, Windows, and MacOS
- LSH indexing to replace short list
- ONNX support for transformer models
- ONNX support for transformer models (very experimental)
- Add topk operator like PyTorch's topk
- Use *cblas_sgemm_batch* instead of a for loop of *cblas_sgemm* on CPU as the batched_gemm implementation
- Supporting relative paths in shortlist and sqlite options

View File

@ -273,53 +273,61 @@ if(CUDA_FOUND)
# We want to compile as many targets as possible but different CUDA versions support different targets.
# Let's instead enable options based on what cuda version we have.
if((CUDA_VERSION VERSION_EQUAL "9.0" OR CUDA_VERSION VERSION_GREATER "9.0") AND CUDA_VERSION VERSION_LESS "11.0")
option(COMPILE_CUDA_SM35 "Compile GPU version with SM35 support" ON)
option(COMPILE_CUDA_SM50 "Compile GPU version with SM50 support" ON)
option(COMPILE_CUDA_SM60 "Compile GPU version with SM60 support" ON)
option(COMPILE_CUDA_SM70 "Compile GPU version with SM70 support" ON)
option(COMPILE_KEPLER "Compile GPU version with SM35 support" OFF)
option(COMPILE_MAXWELL "Compile GPU version with SM50 support" OFF)
option(COMPILE_PASCAL "Compile GPU version with SM60 support" ON)
option(COMPILE_VOLTA "Compile GPU version with SM70 support" ON)
endif()
if((CUDA_VERSION VERSION_EQUAL "10.0" OR CUDA_VERSION VERSION_GREATER "10.0") AND CUDA_VERSION VERSION_LESS "11.0")
option(COMPILE_CUDA_SM35 "Compile GPU version with SM35 support" ON)
option(COMPILE_CUDA_SM50 "Compile GPU version with SM50 support" ON)
option(COMPILE_CUDA_SM60 "Compile GPU version with SM60 support" ON)
option(COMPILE_CUDA_SM70 "Compile GPU version with SM70 support" ON)
option(COMPILE_CUDA_SM75 "Compile GPU version with SM75 support" ON)
option(COMPILE_KEPLER "Compile GPU version with SM35 support" OFF)
option(COMPILE_MAXWELL "Compile GPU version with SM50 support" OFF)
option(COMPILE_PASCAL "Compile GPU version with SM60 support" ON)
option(COMPILE_VOLTA "Compile GPU version with SM70 support" ON)
option(COMPILE_TURING "Compile GPU version with SM75 support" ON)
endif()
if(CUDA_VERSION VERSION_EQUAL "11.0" OR CUDA_VERSION VERSION_GREATER "11.0")
option(COMPILE_CUDA_SM35 "Compile GPU version with SM35 support" OFF) # deprecated for CUDA 11
option(COMPILE_CUDA_SM50 "Compile GPU version with SM50 support" OFF) # deprecated for CUDA 11
option(COMPILE_CUDA_SM60 "Compile GPU version with SM60 support" ON)
option(COMPILE_CUDA_SM70 "Compile GPU version with SM70 support" ON)
option(COMPILE_CUDA_SM75 "Compile GPU version with SM75 support" ON)
option(COMPILE_CUDA_SM80 "Compile GPU version with SM80 support" ON)
option(COMPILE_KEPLER "Compile GPU version with SM35 support" OFF) # deprecated for CUDA 11
option(COMPILE_MAXWELL "Compile GPU version with SM50 support" OFF) # deprecated for CUDA 11
option(COMPILE_PASCAL "Compile GPU version with SM60 support" ON)
option(COMPILE_VOLTA "Compile GPU version with SM70 support" ON)
option(COMPILE_TURING "Compile GPU version with SM75 support" ON)
option(COMPILE_AMPERE "Compile GPU version with SM80 support" ON)
LIST(APPEND COMPUTE -Wno-deprecated-gpu-targets)
endif()
if(COMPILE_CUDA_SM35)
LIST(APPEND COMPUTE -arch=sm_35; -gencode=arch=compute_35,code=sm_35;) # Tesla K40 and above
endif(COMPILE_CUDA_SM35)
if(COMPILE_CUDA_SM50)
if(COMPILE_KEPLER)
message(STATUS "Compiling code for Kepler GPUs")
LIST(APPEND COMPUTE -gencode=arch=compute_35,code=sm_35;) # Tesla K40 and above
endif(COMPILE_KEPLER)
if(COMPILE_MAXWELL)
message(STATUS "Compiling code for Maxwell GPUs")
LIST(APPEND COMPUTE -gencode=arch=compute_50,code=sm_50; -gencode=arch=compute_52,code=sm_52;) # Maxwell GPUs
endif(COMPILE_CUDA_SM50)
if(COMPILE_CUDA_SM60)
endif(COMPILE_MAXWELL)
if(COMPILE_PASCAL)
message(STATUS "Compiling code for Pascal GPUs")
LIST(APPEND COMPUTE -gencode=arch=compute_60,code=sm_60; -gencode=arch=compute_61,code=sm_61;) # Pascal GPUs
endif(COMPILE_CUDA_SM60)
if(COMPILE_CUDA_SM70)
LIST(APPEND COMPUTE -gencode=arch=compute_70,code=sm_70; -gencode=arch=compute_70,code=compute_70) # Volta GPUs
endif(COMPILE_CUDA_SM70)
endif(COMPILE_PASCAL)
if(COMPILE_VOLTA)
message(STATUS "Compiling code for Volta GPUs")
LIST(APPEND COMPUTE -arch=sm_70; -gencode=arch=compute_70,code=sm_70; -gencode=arch=compute_70,code=compute_70) # Volta GPUs
endif(COMPILE_VOLTA)
if(CUDA_VERSION VERSION_EQUAL "10.0" OR CUDA_VERSION VERSION_GREATER "10.0")
if(COMPILE_CUDA_SM75)
if(COMPILE_TURING)
message(STATUS "Compiling code for Turing GPUs")
LIST(APPEND COMPUTE -gencode=arch=compute_75,code=sm_75; -gencode=arch=compute_75,code=compute_75) # Turing GPUs
endif(COMPILE_CUDA_SM75)
endif(COMPILE_TURING)
endif()
if(CUDA_VERSION VERSION_EQUAL "11.0" OR CUDA_VERSION VERSION_GREATER "11.0")
if(COMPILE_CUDA_SM80)
if(COMPILE_AMPERE)
message(STATUS "Compiling code for Ampere GPUs")
LIST(APPEND COMPUTE -gencode=arch=compute_80,code=sm_80; -gencode=arch=compute_80,code=compute_80) # Ampere GPUs
endif(COMPILE_CUDA_SM80)
endif(COMPILE_AMPERE)
endif()
if(USE_STATIC_LIBS)
set(EXT_LIBS ${EXT_LIBS} ${CUDA_curand_LIBRARY} ${CUDA_cusparse_LIBRARY} ${CUDA_CUBLAS_LIBRARIES})
set(CUDA_LIBS ${CUDA_curand_LIBRARY} ${CUDA_cusparse_LIBRARY} ${CUDA_CUBLAS_LIBRARIES})
set(EXT_LIBS ${EXT_LIBS} ${CUDA_curand_LIBRARY} ${CUDA_CUBLAS_LIBRARIES} ${CUDA_cusparse_LIBRARY})
set(CUDA_LIBS ${CUDA_curand_LIBRARY} ${CUDA_CUBLAS_LIBRARIES} ${CUDA_cusparse_LIBRARY})
find_library(CUDA_culibos_LIBRARY NAMES culibos PATHS ${CUDA_TOOLKIT_ROOT_DIR}/lib64 ${CUDA_TOOLKIT_ROOT_DIR}/lib/x64)
# The cuLIBOS library does not seem to exist in Windows CUDA toolkit installs
if(CUDA_culibos_LIBRARY)
@ -424,17 +432,6 @@ if(NOT WIN32)
endif(Tcmalloc_FOUND)
endif()
###############################################################################
# Find MPI
if(USE_MPI)
find_package(MPI 2.0)
if(MPI_FOUND)
include_directories(${MPI_INCLUDE_PATH})
set(EXT_LIBS ${EXT_LIBS} ${MPI_LIBRARIES})
add_definitions(-DMPI_FOUND=1)
endif(MPI_FOUND)
endif(USE_MPI)
###############################################################################
# Find BLAS library
if(COMPILE_CPU)
@ -513,7 +510,7 @@ if(USE_MPI)
find_package(MPI 2.0 REQUIRED)
if(MPI_FOUND)
include_directories(${MPI_INCLUDE_PATH})
set(EXT_LIBS "${EXT_LIBS} ${MPI_LIBRARIES}")
set(EXT_LIBS ${EXT_LIBS} ${MPI_LIBRARIES})
if(USE_STATIC_LIBS) # alternatively this could install OpenMPI like NCCL and link against that statically with greater control
message(WARNING "MPI implementations are notoriously difficult to link statically, linking ${MPI_LIBRARIES} dynamically despite -DUSE_STATIC_LIBS=on")
endif(USE_STATIC_LIBS)

View File

@ -153,32 +153,37 @@ if(CUDA_FOUND)
# disables compilation for sm_30 to avoid ptxas warning... that is general Kepler support. But K80s are supported for instance by sm_35
set(GENCODE "")
if(COMPILE_CUDA_SM35)
if(CUDA_VERSION VERSION_EQUAL "11.0" OR CUDA_VERSION VERSION_GREATER "11.0")
set(GENCODE "${GENCODE} -Wno-deprecated-gpu-targets")
endif()
if(COMPILE_KEPLER)
set(GENCODE "${GENCODE} -gencode=arch=compute_35,code=sm_35")
endif(COMPILE_CUDA_SM35)
if(COMPILE_CUDA_SM50)
endif(COMPILE_KEPLER)
if(COMPILE_MAXWELL)
set(GENCODE "${GENCODE} -gencode=arch=compute_50,code=sm_50")
endif(COMPILE_CUDA_SM50)
if(COMPILE_CUDA_SM60)
endif(COMPILE_MAXWELL)
if(COMPILE_PASCAL)
set(GENCODE "${GENCODE} -gencode=arch=compute_60,code=sm_60 -gencode=arch=compute_61,code=sm_61")
endif(COMPILE_CUDA_SM60)
if(COMPILE_CUDA_SM70)
set(GENCODE "${GENCODE} -gencode=arch=compute_70,code=sm_70")
endif(COMPILE_CUDA_SM70)
if(COMPILE_CUDA_SM75)
endif(COMPILE_PASCAL)
if(COMPILE_VOLTA)
set(GENCODE "${GENCODE} -arch=sm_70 -gencode=arch=compute_70,code=sm_70")
endif(COMPILE_VOLTA)
if(COMPILE_TURING)
set(GENCODE "${GENCODE} -gencode=arch=compute_75,code=sm_75; -gencode=arch=compute_75,code=compute_75")
endif(COMPILE_CUDA_SM75)
if(COMPILE_CUDA_SM80)
endif(COMPILE_TURING)
if(COMPILE_AMPERE)
set(GENCODE "${GENCODE} -gencode=arch=compute_80,code=sm_80; -gencode=arch=compute_80,code=compute_80")
endif(COMPILE_CUDA_SM80)
endif(COMPILE_AMPERE)
# install nccl in ${CMAKE_BINARY_DIR}/local similar to /usr/local linux installation
# Using $(MAKE) instead of $CMAKE_MAKE_PROGRAM in order to make parallelization in NCCL compilation work with make -j16.
# Apparently this does not get properly propagated otherwise and builts with only a single thread/process.
ExternalProject_Add(nccl_install
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/nccl
BINARY_DIR ${CMAKE_CURRENT_SOURCE_DIR}/nccl
CONFIGURE_COMMAND ""
BUILD_COMMAND
${CMAKE_MAKE_PROGRAM} -f ${CMAKE_CURRENT_SOURCE_DIR}/nccl/Makefile src.build
"\$(MAKE)" -f ${CMAKE_CURRENT_SOURCE_DIR}/nccl/Makefile src.build
BUILDDIR=${CMAKE_BINARY_DIR}/local CUDA_HOME=${CUDA_TOOLKIT_ROOT_DIR}
CUDA8_GENCODE=${GENCODE} CXX=${CMAKE_CXX_COMPILER} CXX_FLAGS=${NCCL_FLAGS}
INSTALL_COMMAND "")

View File

@ -11,7 +11,6 @@
int main(int argc, char** argv) {
using namespace marian;
auto options = parseOptions(argc, argv, cli::mode::training);
// --sync-sgd always selects SyncGraphGroup

View File

@ -787,9 +787,17 @@ void ConfigParser::addSuboptionsDevices(cli::CLIWrapper& cli) {
cli.add<size_t>("--num-devices",
"Number of GPUs to use for this process. Defaults to length(devices) or 1");
#ifdef USE_NCCL
if(mode_ == cli::mode::training)
if(mode_ == cli::mode::training) {
cli.add<bool>("--no-nccl",
"Disable inter-GPU communication via NCCL");
cli.add<std::string>("--sharding",
"When using NCCL and MPI for multi-process training use 'global' (default, less memory usage) "
"or 'local' (more memory usage but faster) sharding",
{"global"});
cli.add<std::string/*SchedulerPeriod*/>("--sync-freq",
"When sharding is local sync all shards across processes once every n steps (possible units u=updates, t=target labels, e=epochs)",
"200u");
}
#endif
#ifdef CUDA_FOUND
cli.add<size_t>("--cpu-threads",

View File

@ -10,11 +10,19 @@ template <class T> using hash = std::hash<T>;
// This combinator is based on boost::hash_combine, but uses
// std::hash as the hash implementation. Used as a drop-in
// replacement for boost::hash_combine.
template <class T>
inline void hash_combine(std::size_t& seed, T const& v) {
hash<T> hasher;
seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
hash<T> hasher;
seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
}
// Hash a whole chunk of memory, mostly used for diagnostics
template <class T>
inline size_t hashMem(const T* beg, size_t len) {
size_t seed = 0;
for(auto it = beg; it < beg + len; ++it)
hash_combine(seed, *it);
return seed;
}
}

View File

@ -24,6 +24,12 @@
#include <cuda.h> // required to see CUDA_VERSION
#if (CUDA_VERSION > 9000 && (__CUDA_ARCH__ >= 600 || !defined(__CUDA_ARCH__)))
#define COMPILE_FP16 1 // we are in GPU code and we know what to do with FP16 code
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4505) // "unreferenced local function has been removed" in cuda_fp16.hpp
#endif
#include <cuda_fp16.h>
#include "functional/defs.h"
#else
#define COMPILE_FP16 0 // we are in GPU code, but compute capability is too low to use FP16
#endif
@ -31,6 +37,12 @@
#include <cuda.h> // required to see CUDA_VERSION
#if (CUDA_VERSION > 9000)
#define COMPILE_FP16 1
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4505) // "unreferenced local function has been removed" in cuda_fp16.hpp
#endif
#include <cuda_fp16.h>
#include "functional/defs.h"
#else
#define COMPILE_FP16 0
#endif
@ -219,6 +231,37 @@ struct float32x8 {
#endif
#endif
#if COMPILE_FP16
// @TODO: check what intrinsics are actually available.
struct halfx2 {
private:
__half2 h2_;
public:
DEVICE halfx2() {}
DEVICE halfx2(const __half2& h2) : h2_(h2) {}
DEVICE halfx2(const __half& h) : h2_(h, h) {}
DEVICE halfx2(const __half& h1, const __half& h2) : h2_(h1, h2) {}
DEVICE_INLINE operator const __half2&() const { return h2_; }
DEVICE_INLINE operator __half2&() { return h2_; }
DEVICE_INLINE __half operator[] (size_t i) const {
return *(((__half*)&h2_) + i); // potentially undefined, but efficient. In practice __m128 is an array of floats
}
friend std::ostream& operator<<(std::ostream& out, halfx2 h2) {
__half* a = (__half*)&h2;
out << "[" << (float)a[0];
for(int i = 1; i < 2; i++)
out << " " << (float)a[i];
out << "]";
return out;
}
};
#endif
// Internal to types.h, don't use. Use test functions below.
enum class TypeClass : size_t { // size_type has 8 bytes, so we can have 16 fields here, currently using 5. Extend to the left for back-compat.
// built-in type classes

View File

@ -179,7 +179,7 @@ private:
maxBatchSize = stats_->findBatchSize(lengths, cachedStatsIter);
// this optimization makes no difference indeed
#if 1 // sanity check: would we find the same entry if searching from the start?
#if 0 // sanity check: would we find the same entry if searching from the start?
auto it = stats_->lower_bound(lengths);
auto maxBatchSize1 = stats_->findBatchSize(lengths, it);
ABORT_IF(maxBatchSize != maxBatchSize1, "findBatchSize iter caching logic is borked");

View File

@ -11,16 +11,17 @@
namespace marian {
namespace data {
Corpus::Corpus(Ptr<Options> options, bool translate /*= false*/)
: CorpusBase(options, translate),
Corpus::Corpus(Ptr<Options> options, bool translate /*= false*/, size_t seed /*= Config:seed*/)
: CorpusBase(options, translate, seed),
shuffleInRAM_(options_->get<bool>("shuffle-in-ram", false)),
allCapsEvery_(options_->get<size_t>("all-caps-every", 0)),
titleCaseEvery_(options_->get<size_t>("english-title-case-every", 0)) {}
Corpus::Corpus(std::vector<std::string> paths,
std::vector<Ptr<Vocab>> vocabs,
Ptr<Options> options)
: CorpusBase(paths, vocabs, options),
Ptr<Options> options,
size_t seed /*= Config:seed*/)
: CorpusBase(paths, vocabs, options, seed),
shuffleInRAM_(options_->get<bool>("shuffle-in-ram", false)),
allCapsEvery_(options_->get<size_t>("all-caps-every", 0)),
titleCaseEvery_(options_->get<size_t>("english-title-case-every", 0)) {}

View File

@ -34,11 +34,14 @@ private:
public:
// @TODO: check if translate can be replaced by an option in options
Corpus(Ptr<Options> options, bool translate = false);
Corpus(Ptr<Options> options,
bool translate = false,
size_t seed = Config::seed);
Corpus(std::vector<std::string> paths,
std::vector<Ptr<Vocab>> vocabs,
Ptr<Options> options);
Ptr<Options> options,
size_t seed = Config::seed);
/**
* @brief Iterates sentence tuples in the corpus.

View File

@ -35,8 +35,9 @@ const SentenceTuple& CorpusIterator::dereference() const {
// weighting.
CorpusBase::CorpusBase(const std::vector<std::string>& paths,
const std::vector<Ptr<Vocab>>& vocabs,
Ptr<Options> options)
: DatasetBase(paths, options),
Ptr<Options> options,
size_t seed)
: DatasetBase(paths, options), RNGEngine(seed),
vocabs_(vocabs),
maxLength_(options_->get<size_t>("max-length")),
maxLengthCrop_(options_->get<bool>("max-length-crop")),
@ -61,8 +62,8 @@ CorpusBase::CorpusBase(const std::vector<std::string>& paths,
initEOS(/*training=*/true);
}
CorpusBase::CorpusBase(Ptr<Options> options, bool translate)
: DatasetBase(options),
CorpusBase::CorpusBase(Ptr<Options> options, bool translate, size_t seed)
: DatasetBase(options), RNGEngine(seed),
maxLength_(options_->get<size_t>("max-length")),
maxLengthCrop_(options_->get<bool>("max-length-crop")),
rightLeft_(options_->get<bool>("right-left")),

View File

@ -514,11 +514,14 @@ class CorpusBase : public DatasetBase<SentenceTuple, CorpusIterator, CorpusBatch
public:
typedef SentenceTuple Sample;
CorpusBase(Ptr<Options> options, bool translate = false);
CorpusBase(Ptr<Options> options,
bool translate = false,
size_t seed = Config::seed);
CorpusBase(const std::vector<std::string>& paths,
const std::vector<Ptr<Vocab>>& vocabs,
Ptr<Options> options);
const std::vector<Ptr<Vocab>>& vocabs,
Ptr<Options> options,
size_t seed = Config::seed);
virtual ~CorpusBase() {}
virtual std::vector<Ptr<Vocab>>& getVocabs() = 0;

View File

@ -5,15 +5,15 @@
namespace marian {
namespace data {
CorpusSQLite::CorpusSQLite(Ptr<Options> options, bool translate /*= false*/)
: CorpusBase(options, translate), seed_(Config::seed) {
CorpusSQLite::CorpusSQLite(Ptr<Options> options, bool translate /*= false*/, size_t seed /*= Config:seed*/)
: CorpusBase(options, translate, seed), seed_(seed) {
fillSQLite();
}
CorpusSQLite::CorpusSQLite(const std::vector<std::string>& paths,
const std::vector<Ptr<Vocab>>& vocabs,
Ptr<Options> options)
: CorpusBase(paths, vocabs, options), seed_(Config::seed) {
Ptr<Options> options, size_t seed)
: CorpusBase(paths, vocabs, options, seed), seed_(seed) {
fillSQLite();
}

View File

@ -44,11 +44,12 @@ private:
public:
// @TODO: check if translate can be replaced by an option in options
CorpusSQLite(Ptr<Options> options, bool translate = false);
CorpusSQLite(Ptr<Options> options, bool translate = false, size_t seed = Config::seed);
CorpusSQLite(const std::vector<std::string>& paths,
const std::vector<Ptr<Vocab>>& vocabs,
Ptr<Options> options);
Ptr<Options> options,
size_t seed = Config::seed);
Sample next() override;

View File

@ -17,7 +17,8 @@ protected:
public:
RNGEngine() : eng_((unsigned int)Config::seed) {}
RNGEngine(size_t seed) : eng_((unsigned int)seed) {}
std::string getRNGState() {
std::ostringstream oss;
oss << eng_;

View File

@ -27,7 +27,7 @@ public:
// Prepare scheduler with validators
auto trainState = New<TrainingState>(options_->get<float>("learn-rate"));
auto scheduler = New<Scheduler>(options_, trainState);
auto scheduler = New<Scheduler>(options_, trainState, nullptr);
scheduler->addValidator(New<MNISTAccuracyValidator>(options_));
// Multi-node training

View File

@ -354,7 +354,6 @@ template <>
struct Ops<float32x8> {
typedef float Single;
static inline float32x8 loop8(const std::function<float(const float&)>& f, const float32x8& x) {
float32x8 out;
for(int i = 0; i < 8; i++)
@ -471,8 +470,6 @@ struct Ops<float32x8> {
#ifdef __CUDACC__
#if COMPILE_FP16
// only compile with fp16 support for compute_70, i.e. VOLTA 100 and above.
#include <cuda_fp16.h>
namespace marian {
namespace functional {
@ -490,7 +487,12 @@ struct Ops<half> {
static DEVICE_INLINE half sqrt(const half& x) { return hsqrt(x); }
static DEVICE_INLINE half neg(const half& x) { return -x; }
static DEVICE_INLINE half abs(const half& x) { return fabs((float)x); }// @TODO half has this information somewhere in the struct, right?
#if CUDA_VERSION < 11000
static DEVICE_INLINE half abs(const half& x) { return fabs((float)x); }
#else
static DEVICE_INLINE half abs(const half& x) { return __habs(x); }
#endif
static DEVICE_INLINE half sgn(const half& x) { half zero = 0.f; return (zero < x) - (x < zero); } // @TODO half has this information somewhere in the struct, right?
static DEVICE_INLINE half round(const half& x) { return hrint(x); }
@ -549,12 +551,11 @@ struct Ops<half> {
}
static DEVICE_INLINE half relu(const half& x) {
const half zero = 0.f;
return x > zero ? x : zero;
return max(x, zero);
}
static DEVICE_INLINE half reluBack(const half& x) {
const half zero = 0.f;
const half one = 1.f;
return x > zero ? one : zero;
return geq(x, zero);
}
static DEVICE_INLINE half prelu(const half& x, const half& y) {
@ -576,6 +577,118 @@ struct Ops<half> {
};
// Specialization for halfx2
template <>
struct Ops<halfx2> {
static DEVICE_INLINE halfx2 sin(const halfx2& x) { return h2sin(x); }
static DEVICE_INLINE halfx2 cos(const halfx2& x) { return h2cos(x); }
static DEVICE_INLINE halfx2 tan(const halfx2& x) { return h2sin(x) / h2cos(x); }
static DEVICE_INLINE halfx2 log(const halfx2& x) { return h2log(x); }
static DEVICE_INLINE halfx2 exp(const halfx2& x) { return h2exp(x); }
static DEVICE_INLINE halfx2 sqr(const halfx2& x) { return __hmul2(x, x); }
static DEVICE_INLINE halfx2 sqrt(const halfx2& x) { return h2sqrt(x); }
static DEVICE_INLINE halfx2 neg(const halfx2& x) { return __hneg2(x); }
#if CUDA_VERSION < 11000
static DEVICE_INLINE halfx2 abs(const halfx2& x) { return {Ops<half>::abs(x[0]), Ops<half>::abs(x[1])}; }
#else
static DEVICE_INLINE halfx2 abs(const halfx2& x) { return __habs2(x); }
#endif
static DEVICE_INLINE halfx2 sgn(const halfx2& x) { halfx2 zero(0.f, 0.f); return __hsub2(__hlt2(zero, x), __hlt2(x, zero)); }
static DEVICE_INLINE halfx2 round(const halfx2& x) { return h2rint(x); }
static DEVICE_INLINE halfx2 floor(const halfx2& x) { return h2floor(x); }
static DEVICE_INLINE halfx2 ceil(const halfx2& x) { return h2ceil(x); }
static DEVICE_INLINE halfx2 add(const halfx2& x, const halfx2& y) { return __hadd2(x, y); }
static DEVICE_INLINE halfx2 sub(const halfx2& x, const halfx2& y) { return __hsub2(x, y); }
static DEVICE_INLINE halfx2 mul(const halfx2& x, const halfx2& y) { return __hmul2(x, y); }
static DEVICE_INLINE halfx2 div(const halfx2& x, const halfx2& y) { return __h2div(x, y); }
static DEVICE_INLINE halfx2 pow(const halfx2& x, const halfx2& y) { return exp(mul(y, log(x))); }
// Neural Networks specific functions
// @TODO: this is unsafe
static DEVICE_INLINE halfx2 sigmoid(const halfx2& x) {
halfx2 e = exp(x);
halfx2 one(1.f, 1.f);
return div(e, add(one, e));
}
static DEVICE_INLINE halfx2 tanh(const halfx2& x) {
// tanh(x) = 2 * sigmoid(2 * x) - 1
const halfx2 one(1.f);
const halfx2 two(2.f);
return sub(mul(two, sigmoid(mul(two, x))), one);
}
static DEVICE_INLINE halfx2 max(const halfx2& x, const halfx2& y) {
return { Ops<half>::max(x[0], y[0]), Ops<half>::max(x[1], y[1]) };
}
static DEVICE_INLINE halfx2 min(const halfx2& x, const halfx2& y) {
return { Ops<half>::min(x[0], y[0]), Ops<half>::min(x[1], y[1]) };
}
static DEVICE_INLINE halfx2 negate(const halfx2& x) { return { Ops<half>::negate(x[0]), Ops<half>::negate(x[1]) }; }
static DEVICE_INLINE halfx2 eq(const halfx2& x, const halfx2& y) { return __heq2(x, y); }
static DEVICE_INLINE halfx2 neq(const halfx2& x, const halfx2& y) { return __hne2(x, y); }
static DEVICE_INLINE halfx2 gt(const halfx2& x, const halfx2& y) { return __hgt2(x, y); }
static DEVICE_INLINE halfx2 lt(const halfx2& x, const halfx2& y) { return __hlt2(x, y); }
static DEVICE_INLINE halfx2 geq(const halfx2& x, const halfx2& y) { return __hge2(x, y); }
static DEVICE_INLINE halfx2 leq(const halfx2& x, const halfx2& y) { return __hle2(x, y); }
static DEVICE_INLINE halfx2 and_(const halfx2& x, const halfx2& y) { return {x[0] && y[0], x[1] && y[1]}; } // 'and' is used by gcc
static DEVICE_INLINE halfx2 or_(const halfx2& x, const halfx2& y) { return {x[0] || y[0], x[1] || y[1]}; } // 'or' is used by gcc
static DEVICE_INLINE halfx2 log1p(const halfx2& x) {
return log(add(x, halfx2(1.f))); // probably acceptable loss of precision, it's half anyway
}
static DEVICE_INLINE halfx2 if_then_else(const halfx2& x, const halfx2& y, const halfx2& z) {
return {x[0] ? y[0] : z[0], x[1] ? y[1] : z[1]};
}
static DEVICE_INLINE halfx2 logaddexp(const halfx2& x, const halfx2& y) {
// Note: This may not be ideal for CUDA; cf. CNTK implementation
auto a = add(y, log1p(exp(sub(x, y))));
auto b = add(x, log1p(exp(sub(y, x))));
return if_then_else(lt(x, y), a, b);
}
static DEVICE_INLINE halfx2 clip(const halfx2& x, const halfx2& y) {
return if_then_else(geq(abs(x), y), mul(sgn(x), y), x);
}
// derivative of Clip, cut-off function
static DEVICE_INLINE halfx2 bump(const halfx2& x, const halfx2& y) {
const halfx2 zero(0.f);
const halfx2 one(1.f);
return if_then_else(geq(abs(x), y), zero, one);
}
static DEVICE_INLINE halfx2 relu(const halfx2& x) {
const halfx2 zero(0.f);
return max(x, zero);
}
static DEVICE_INLINE halfx2 reluBack(const halfx2& x) {
const halfx2 zero(0.f);
return geq(x, zero);
}
static DEVICE_INLINE halfx2 prelu(const halfx2& x, const halfx2& y) {
const halfx2 zero(0.f);
return if_then_else(gt(x, zero), x , mul(x, y));
}
static DEVICE_INLINE halfx2 preluBack(const halfx2& x, const halfx2& y) {
const halfx2 zero(0.f);
const halfx2 one(1.f);
return if_then_else(gt(x, zero), one, y);
}
};
} // end namespace functional
} // end namespace marian

View File

@ -60,7 +60,7 @@ struct BinaryFunctor {
struct name { \
template <typename ElementType> \
HOST_DEVICE_INLINE static ElementType apply(const ElementType& x, \
const ElementType& y) \
const ElementType& y) \
{ return func; } \
static std::string n() { return #name; } \
}; \
@ -89,9 +89,9 @@ struct TernaryFunctor {
template <class Arg1, class Arg2, class Arg3>
TernaryFunctor(Arg1 arg1, Arg2 arg2, Arg3 arg3) : x(arg1), y(arg2), z(arg3) {}
template <typename... Args>
HOST_DEVICE_INLINE float operator()(Args&&... args) {
return Function::apply(x(args...), y(args...), z(args...));
template <typename T, typename... Args>
HOST_DEVICE_INLINE T operator()(T arg, Args&&... args) {
return Function::apply(x(arg, args...), y(arg, args...), z(arg, args...));
}
};
@ -100,8 +100,8 @@ struct TernaryFunctor {
struct name { \
template <typename ElementType> \
HOST_DEVICE_INLINE static ElementType apply(ElementType x, \
ElementType y, \
ElementType z) \
ElementType y, \
ElementType z) \
{ return func; } \
}; \
} \

View File

@ -16,11 +16,10 @@ inline marian::Shape adapt(const marian::Shape& shape) {
return shape;
}
// modify last shape dimension to automatically map to a larger stride. We are moving now by 4 floats
// at once and need to stop earlier. This is a shallow typecast to bascially an array of 4 floats.
#ifndef __CUDACC__ // vectorized types not available from .cu files
// modify last shape dimension to automatically map to a larger stride. We are moving now by 4 floats
// at once and need to stop earlier. This is a shallow typecast to bascially an array of 4 floats.
template <>
inline marian::Shape adapt<float32x4>(const marian::Shape& shape) {
ABORT_IF(shape[-1] % 4 != 0,
@ -31,7 +30,9 @@ inline marian::Shape adapt<float32x4>(const marian::Shape& shape) {
x4Shape.set(-1, shape[-1] / 4);
return x4Shape;
}
#ifdef __AVX__
// as above, but for a stride of 8, since we are processing 8 floats at once
template <>
inline marian::Shape adapt<float32x8>(const marian::Shape& shape) {
ABORT_IF(shape[-1] % 8 != 0,
@ -45,6 +46,20 @@ inline marian::Shape adapt<float32x8>(const marian::Shape& shape) {
#endif
#endif
#if COMPILE_FP16
// as above, but for a stride of 2, since we are processing 2 half floats at once. Works on GPU.
template <>
inline marian::Shape adapt<halfx2>(const marian::Shape& shape) {
ABORT_IF(shape[-1] % 2 != 0,
"Last dim ({}) is not a multiple of 2 while converting to Tensor<halfx2>",
shape[-1]);
marian::Shape x2Shape = shape;
x2Shape.set(-1, shape[-1] / 2);
return x2Shape;
}
#endif
template <typename T, const int D>
struct View {
T* data_;

View File

@ -213,24 +213,7 @@ Ptr<NodeInitializer> fromTensor(Tensor externalTensor) {
// Computes Google's sinusoidal position embeddings
Ptr<NodeInitializer> sinusoidalPositionEmbeddings(int start) {
return fromLambda([start](Tensor t) {
int dimEmb = t->shape()[-1];
int dimWords = (int)t->size() / dimEmb;
float numTimescales = (float)dimEmb / 2;
float logTimescaleIncrement = std::log(10000.f) / (numTimescales - 1.f);
std::vector<float> vPos(dimEmb * dimWords, 0);
for(int p = start; p < dimWords + start; ++p) {
for(int i = 0; i < numTimescales; ++i) {
float v = p * std::exp(i * -logTimescaleIncrement);
vPos[(p - start) * dimEmb + i ] = std::sin(v);
vPos[(p - start) * dimEmb + (int)numTimescales + i] = std::cos(v); // @TODO: is int vs. float correct for num_timescales?
}
}
t->set(vPos);
}, Type::float32);
return fromLambda([start](Tensor t) { SinusoidalPositionEmbeddings(t, start); });
}
// computes the equivalent of Python's range()

View File

@ -52,7 +52,9 @@ namespace marian {
auto factorMask = constant(maskedFactoredLabels.masks); // [B... flattened] loss values get multiplied with 0 for labels that don't have this factor
auto factorLogits = logits_[g]; // [B... * Ug] label-wise loss values (not aggregated yet)
// For each location in [B...] select [indices[B...]]. If not using factor, select [0] and mask it out next.
auto factorLoss = cast(lossFn(factorLogits->loss(), factorIndices), loss ? loss->value_type() : Type::float32); // [B... x 1]
auto factorLoss = lossFn(factorLogits->loss(), factorIndices); // [B... x 1]
if(loss)
factorLoss = cast(factorLoss, loss->value_type());
factorLoss = factorLoss * cast(reshape(factorMask, factorLoss->shape()), factorLoss->value_type()); // mask out factor for words that do not have that factor
loss = loss ? (loss + factorLoss) : factorLoss; // [B... x 1]
}
@ -71,9 +73,7 @@ namespace marian {
Expr Logits::getFactoredLogits(size_t groupIndex, Ptr<data::Shortlist> shortlist /*= nullptr*/, const std::vector<IndexType>& hypIndices /*= {}*/, size_t beamSize /*= 0*/) const {
ABORT_IF(empty(), "Attempted to read out logits on empty Logits object");
auto computeType = graph()->getDefaultElementType(); // make sure to use the right compute type here
// loss might be float32, cast to whatever type we are using for computation
auto sel = cast(logits_[groupIndex]->loss(), computeType); // [localBeamSize, 1, dimBatch, dimFactorVocab]
auto sel = logits_[groupIndex]->loss(); // [localBeamSize, 1, dimBatch, dimFactorVocab]
// normalize for decoding:
// - all secondary factors: subtract their max
@ -84,15 +84,16 @@ namespace marian {
else {
auto numGroups = getNumFactorGroups();
for (size_t g = 1; g < numGroups; g++) {
auto factorMaxima = max(cast(logits_[g]->loss(), computeType), -1); // we cast since loss is likely ce-loss which has type float32
auto factorMaxima = max(logits_[g]->loss(), -1); // we cast since loss is likely ce-loss which has type float32
auto factorMasks = constant(getFactorMasks(g, shortlist ? shortlist->indices() : std::vector<WordIndex>()));
sel = sel + factorMaxima * factorMasks; // those lemmas that don't have a factor get multiplied with 0
sel = sel + cast(factorMaxima, sel->value_type()) * cast(factorMasks, sel->value_type()); // those lemmas that don't have a factor get multiplied with 0
}
}
// if selIdx are given, then we must reshuffle accordingly
if (!hypIndices.empty()) // use the same function that shuffles decoder state
sel = rnn::State::select(sel, hypIndices, (int)beamSize, /*isBatchMajor=*/false);
return sel;
}

View File

@ -11,9 +11,10 @@ void ExponentialSmoothing::updateAvgParams(Tensor paramsAvg, Tensor params, size
double beta = 1. - mvDecayBy_;
// correction term if batch size is different from what mvDecayBy_ was specified for
if (refBatchTrgWords_) {
if (refBatchTrgWords_ > 0 && actualBatchTrgWords > refBatchTrgWords_) {
LOG_ONCE(info, "Exponential smoothing gets automatically adjusted as if update size was {} target words", refBatchTrgWords_);
beta = pow(beta, (double)actualBatchTrgWords / (double)refBatchTrgWords_);
batches = (size_t)((double)batches * (double)actualBatchTrgWords / (double)refBatchTrgWords_);
}
// reduce effect of decay parameter in early training stages

View File

@ -188,7 +188,7 @@ void OptimizerBase::load(std::vector<io::Item>& items,
if(!opt->baseAlloc_) {
LOG_ONCE(info, "Allocating memory for general optimizer shards");
opt->baseAlloc_ = New<TensorAllocator>(backends[localDeviceIndex]);
opt->baseAlloc_->reserveExact(numShards * size);
opt->baseAlloc_->reserveExact(std::vector<size_t>(numShards, size));
}
int elements = (int)size / (int)sizeOf(iAvg.type);
opt->baseAlloc_->allocate(opt->avg_, {1, elements}, iAvg.type);
@ -371,8 +371,11 @@ void Adam::updateImpl(Tensor params, Tensor grads, size_t actualMBSize) {
double T = 1, Tref = 1;
if(OptimizerBase::refMBWordsParam_ > 0) {
T = (double)actualMBSize;
Tref = (double)refMBWordsParam_;
T = (double)actualMBSize;
if(actualMBSize > refBatchTrgWords_)
Tref = (double)refMBWordsParam_;
else
Tref = T;
}
// adjust for minibatch-size changes if Adam parameters are given a reference size (else do nothing)
@ -396,8 +399,18 @@ void Adam::updateImpl(Tensor params, Tensor grads, size_t actualMBSize) {
// numerators. Divide by T to convert ce-sum gradient to avg gradient.
using namespace functional;
#if 0 // why the division by T or T^2 here? It's T=1 without mb-ref anyway and we have the adjustment above, also converges a lot(!) slower with T != 1
Element(_1 = ((float)beta1 * _1) + float((1 - beta1) / T ) * _2, mt_, grads); // momentum smoothing. At steady state: =smoothed avg gradient
Element(_1 = ((float)beta2 * _1) + float((1 - beta2) / T / T) * (_2 * _2), vt_, grads); // RMS normalization. At steady state: =mean square of the avg gradients
#else
Element(_1 = ((float)beta1 * _1) + float((1 - beta1)) * _2, mt_, grads); // momentum smoothing. At steady state: =smoothed avg gradient
Element(_1 = ((float)beta2 * _1) + float((1 - beta2)) * (_2 * _2), vt_, grads); // RMS normalization. At steady state: =mean square of the avg gradients
#endif
// make sure eps_ does not drop below minimum value, this is important
// when training with mixed precision. Otherwise we divide by 0.
// We multiply the minimum by 2 in order to step away from the abyss.
eps_ = std::max(NumericLimits<float>(params->type()).min * 2.f, eps_);
// make sure eps_ does not drop below minimum value, this is important
// when training with mixed precision. Otherwise we divide by 0.
@ -409,7 +422,7 @@ void Adam::updateImpl(Tensor params, Tensor grads, size_t actualMBSize) {
Element(_1 -= etaf // learning-rate: x_t = x_{t-1} - \eta * (...)
* (( ( _2 / denom1f) // momentum-smoothed per-sample gradient: m_{t-1}
/ (sqrt(_3 / denom2f) + eps_)) // normalize by RMS: \sqrt(v_{t-1})
+ decayf * _1), // weight-decay: w * x_{t-1}
+ (decayf * _1)), // weight-decay: w * x_{t-1}
params, // =_1
mt_, // =_2
vt_ // =_3
@ -476,8 +489,8 @@ void Adam::load(std::vector<io::Item>& items,
});
scatterFn(iVt,
[&](size_t id, const char* begin, const char* end) {
auto opt = std::dynamic_pointer_cast<Adam>(opts[id]);
[&](size_t localDeviceIndex, const char* begin, const char* end) {
auto opt = std::dynamic_pointer_cast<Adam>(opts[localDeviceIndex]);
opt->vt_->set(begin, end, iVt.type);
});
}

View File

@ -115,6 +115,14 @@ public:
// Usually we will call this twice, to swap in and to swap out.
void swapWithSmoothed(Tensor params);
// return stateful optimizer shards, for base that's only averaged parameters
virtual std::vector<Tensor> getShards() {
if(avg_)
return { avg_ };
else
return { };
}
protected:
virtual void updateImpl(Tensor params, Tensor grads, size_t actualMBSize) = 0;
virtual void resetStats() = 0;
@ -190,6 +198,12 @@ public:
eps_ = params[0];
}
std::vector<Tensor> getShards() override {
auto shards = OptimizerBase::getShards();
shards.push_back(gt_);
return shards;
}
private:
void updateImpl(Tensor params, Tensor grads, size_t actualMBSize) override;
void resetStats() override;
@ -221,6 +235,13 @@ public:
const GatherStateFunc& gatherFn,
bool isMainProcess) override;
std::vector<Tensor> getShards() override {
auto shards = OptimizerBase::getShards();
shards.push_back(mt_);
shards.push_back(vt_);
return shards;
}
private:
void updateImpl(Tensor params, Tensor grads, size_t actualMBSize) override;
void resetStats() override;

View File

@ -31,7 +31,7 @@ struct GRUFastNodeOp : public NaryNodeOp {
outputs.push_back(nullptr);
}
return {NodeOp(GRUFastBackward(outputs, inputs, adj_, final_))};
return {NodeOp(GRUFastBackward(graph()->allocator(), outputs, inputs, adj_, final_))};
}
// do not check if node is trainable

View File

@ -2,7 +2,6 @@
#ifdef CUDA_FOUND
#include "tensors/gpu/backend.h"
#pragma warning(disable:4505) // "unreferenced local function has been removed" in cuda\v9.2\include\cuda_fp16.hpp
#endif
#include "tensors/cpu/backend.h"

View File

@ -82,19 +82,17 @@ void element(const Functor& functor, marian::Tensor out, Tensors... tensors) {
template <class Functor, class... Tensors>
void elementFloat(const Functor& functor, marian::Tensor out, Tensors... tensors) {
#ifndef __CUDACC__
std::vector<marian::Tensor> ts({tensors...});
std::vector<marian::Tensor> ts({out, tensors...});
bool div8 = true;
bool div4 = true;
if(out->shape()[-1] % 8 != 0)
div8 = false;
if(out->shape()[-1] % 4 != 0)
div4 = false;
for(auto t : ts) {
if(t->shape()[-1] % 8 != 0)
div8 = false;
if(t->shape()[-1] % 4 != 0)
if(t->shape()[-1] % 4 != 0) {
div4 = false;
break;
}
}
if(div8) {

View File

@ -69,7 +69,6 @@ void Prod(marian::Tensor C,
#endif
}
// dummy implementation, computeType doesn't do anything on CPU
void Prod(marian::Tensor C,
const marian::Tensor& A,

View File

@ -20,7 +20,7 @@ namespace marian {
namespace cpu {
void IsNaN(const Tensor /*in*/, Ptr<Allocator> /*allocator*/, bool& /*isNaN*/, bool& /*isInf*/) {
void IsNaN(const Tensor /*in*/, Ptr<Allocator> /*allocator*/, bool& /*isNaN*/, bool& /*isInf*/) {
ABORT("Not implemented");
}
@ -802,7 +802,8 @@ void GRUFastForward(Tensor out_, std::vector<Tensor> inputs, bool final) {
}
}
void GRUFastBackward(std::vector<Tensor> outputs,
void GRUFastBackward(Ptr<Allocator> /*allocator*/,
std::vector<Tensor> outputs,
std::vector<Tensor> inputs,
Tensor adj_,
bool final) {
@ -1552,6 +1553,21 @@ void LSTMOutputBackward(std::vector<Tensor> outputs,
}
}
void SinusoidalPositionEmbeddings(marian::Tensor t, int start) {
int dimEmb = t->shape()[-1];
int dimWords = (int)t->size() / dimEmb;
float numTimescales = (float)dimEmb / 2;
float logTimescaleIncrement = std::log(10000.f) / (numTimescales - 1.f);
for(int j = 0; j < dimWords; ++j) {
for(int i = 0; i < dimEmb; ++i) {
float v = (j + start) * std::exp((i % (int)numTimescales) * -logTimescaleIncrement);
t->data()[j * dimEmb + i] = i < (int)numTimescales ? std::sin(v) : std::cos(v);
}
}
}
void HighwayForward(Tensor out,
const Tensor in1,
const Tensor in2,

View File

@ -4,17 +4,6 @@
#include <cuda_runtime.h>
#if COMPILE_FP16
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4505) // unreferenced local function has been removed
#endif
#include <cuda_fp16.h>
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#endif
// template <> inline bool matchType<__half>(Type type) { return type == Type::float16; }
// template <> inline std::string request<__half>() { return "float16"; }

View File

@ -65,7 +65,13 @@ void Element(Functor functor, Tensor out, Tensors... tensors) {
ElementTyped<float>(functor, out, tensors...);
} else if(out->type() == Type::float16) {
#if COMPILE_FP16
ElementTyped<__half>(functor, out, tensors...);
std::vector<marian::Tensor> ts({out, tensors...});
bool div2 = std::all_of(ts.cbegin(), ts.cend(), [](marian::Tensor t){ return t->shape()[-1] % 2 == 0; });
if(div2) {
ElementTyped<halfx2>(functor, out, tensors...);
} else {
ElementTyped<half>(functor, out, tensors...);
}
#else
ABORT("FP16 not supported with chosen current hardware or CUDA version");
#endif

View File

@ -700,6 +700,54 @@ void Softmax(Tensor out, Tensor in) {
}
}
template <typename T>
__global__ void gSinusoidalPositionEmbeddings(T* out,
functional::Shape outShape,
int start) {
using namespace functional;
int rows = outShape.elements() / outShape.back();
int cols = outShape.back();
float numTimescales = (float)cols / 2.f;
float logTimescaleIncrement = Ops<float>::log(10000.f) / (numTimescales - 1.f);
for(int bid = 0; bid < rows; bid += gridDim.x) { // loop over blocks of rows
int j = bid + blockIdx.x; // blockIdx.x - row index (within block of rows)
if(j < rows) { // compute softmax over one row, row elements distributed over threads
T* outRow = out + j * cols; // pointer to row data
for(int tid = 0; tid < cols; tid += blockDim.x) {
int i = tid + threadIdx.x;
if(i < cols) {
float v = (float)(j + start) * Ops<float>::exp((float)(i % (int)numTimescales) * -logTimescaleIncrement);
outRow[i] = (T)(i < (int)numTimescales ? Ops<float>::sin(v) : Ops<float>::cos(v));
}
}
}
}
}
void SinusoidalPositionEmbeddings(Tensor out, int start) {
cudaSetDevice(out->getDeviceId().no);
size_t rows = out->shape().elements() / out->shape().back();
size_t cols = out->shape().back();
int blocks = std::min(MAX_BLOCKS, (int)rows);
int threads = std::min(MAX_THREADS, (int)cols);
if(out->type() == Type::float32) {
gSinusoidalPositionEmbeddings<float><<<blocks, threads>>>(out->data<float>(), out->shape(), start);
#if COMPILE_FP16
} else if (out->type() == Type::float16) {
gSinusoidalPositionEmbeddings<half><<<blocks, threads>>>(out->data<half>(), out->shape(), start);
#endif
} else {
ABORT("SinusoidalPositionEmbeddings not implemented for type {}", out->type());
}
}
// @TODO: refactor to reuse code from softmax, add comments
template <typename T, typename AccType = float>
__global__ void gLogSoftmax(T* out,
@ -1398,11 +1446,12 @@ __global__ void gGRUFastBackward(T* outState,
T* rowOutState = outState + j * cols;
T* rowOutXW = outXW + j * cols * 3;
T* rowOutSU = outSU + j * cols * 3;
T* rowOutB = outB ? outB + j * cols * 3 : nullptr;
const T* rowState = state + j * cols;
const T* rowXW = xW + j * cols * 3;
const T* rowSU = sU + j * cols * 3;
const T* rowAdj = adj + j * cols;
const T* rowXW = xW + j * cols * 3;
const T* rowSU = sU + j * cols * 3;
const T* rowAdj = adj + j * cols;
for(int tid = 0; tid < cols; tid += blockDim.x) {
int i = tid + threadIdx.x;
@ -1438,7 +1487,7 @@ __global__ void gGRUFastBackward(T* outState,
if(outSU)
rowOutSU[i] += dfdxW_r;
if(outB)
atomics::atomicAdd(outB + i, dfdxW_r); // @TODO: get rid of atomicAdd everywhere
rowOutB[i] += dfdxW_r;
// df/d(xW_z) ...
T dfdxW_z = m * ((T)1.f - z) * z * (rowState[i] - h) * adj;
@ -1447,7 +1496,7 @@ __global__ void gGRUFastBackward(T* outState,
if(outSU)
rowOutSU[k] += dfdxW_z;
if(outB)
atomics::atomicAdd(outB + k, dfdxW_z);
rowOutB[k] += dfdxW_z;
// df/d(xW_x) ...
T dfdxW_x = m * t * adj;
@ -1457,16 +1506,17 @@ __global__ void gGRUFastBackward(T* outState,
rowOutSU[l] += dfdxW_x * r;
if(outB)
if(final)
atomics::atomicAdd(outB + l, dfdxW_x * r);
rowOutB[l] += dfdxW_x * r;
else
atomics::atomicAdd(outB + l, dfdxW_x);
rowOutB[l] += dfdxW_x;
}
}
}
}
}
void GRUFastBackward(std::vector<Tensor> outputs,
void GRUFastBackward(Ptr<Allocator> allocator,
std::vector<Tensor> outputs,
std::vector<Tensor> inputs,
Tensor adj,
bool final) {
@ -1478,12 +1528,26 @@ void GRUFastBackward(std::vector<Tensor> outputs,
int blocks = std::min(MAX_BLOCKS, rows);
int threads = std::min(MAX_THREADS, cols);
Tensor tempGradBias, tempOnes;
MemoryPiece::PtrType tempGradBiasMemory, tempOnesMemory;
if(outputs[3]) {
Shape memShape = {rows, outputs[3]->shape()[-1]};
tempGradBiasMemory = allocator->alloc(memShape.elements() * sizeOf(outputs[3]->type()));
tempGradBias = TensorBase::New(tempGradBiasMemory, memShape, outputs[3]->type(), outputs[3]->getBackend());
tempGradBias->set(0.f);
tempOnesMemory = allocator->alloc(rows * sizeOf(outputs[3]->type()));
tempOnes = TensorBase::New(tempOnesMemory, Shape({1, rows}), outputs[3]->type(), outputs[3]->getBackend());
tempOnes->set(1.f);
}
if(adj->type() == Type::float32) {
gGRUFastBackward<<<blocks, threads>>>(
outputs[0] ? outputs[0]->data<float>() : 0, // state - adj
outputs[1] ? outputs[1]->data<float>() : 0, // xW - adj
outputs[2] ? outputs[2]->data<float>() : 0, // sU - adj
outputs[3] ? outputs[3]->data<float>() : 0, // b - adj
outputs[3] ? tempGradBias->data<float>() : 0, // b - adj
inputs[0]->data<float>(), // state
inputs[1]->data<float>(), // xW
inputs[2]->data<float>(), // sU
@ -1499,7 +1563,7 @@ void GRUFastBackward(std::vector<Tensor> outputs,
outputs[0] ? outputs[0]->data<half>() : 0, // state - adj
outputs[1] ? outputs[1]->data<half>() : 0, // xW - adj
outputs[2] ? outputs[2]->data<half>() : 0, // sU - adj
outputs[3] ? outputs[3]->data<half>() : 0, // b - adj
outputs[3] ? tempGradBias->data<half>() : 0, // b - adj
inputs[0]->data<half>(), // state
inputs[1]->data<half>(), // xW
inputs[2]->data<half>(), // sU
@ -1513,6 +1577,17 @@ void GRUFastBackward(std::vector<Tensor> outputs,
} else {
ABORT("gGRUFastBackward not implemented for type {}", adj->type());
}
// We use this go get rid of the atomicAdd and perform a reduce of the gradients afterwards.
// This is much faster for fp16 which seems to have a broken atomicAdd implementation.
// We reduce bias gradients with a matrix multiply, but use a 32-bit compute type.
// This preserves precision with larger batches where all batch entries reduce into a single vector.
// See also AffineNodeOp where we do the same for biases
if(outputs[3]) {
gpu::Prod(outputs[3], tempOnes, tempGradBias, false, false, 1, 1, Type::float32); // beta set to one to add
allocator->free(tempGradBiasMemory);
allocator->free(tempOnesMemory);
}
}
template <typename T, typename AccType = float>

View File

@ -138,5 +138,14 @@ void TensorBase::set(const io::Item& item) {
memory_->data<char>());
}
size_t TensorBase::hash() {
io::Item temp;
size_t seed = 0;
get(temp, "temp");
for(auto c : temp.bytes)
util::hash_combine(seed, c);
return seed;
}
} // namespace marian

View File

@ -320,6 +320,8 @@ public:
DISPATCH_BY_TYPE2(type_, debug, precision, dispCols);
}
size_t hash();
};
typedef TensorBase::PtrType Tensor;

View File

@ -140,6 +140,8 @@ static inline void Dropout(Tensor tensor, float dropProb) {
Bernoulli(tensor, keepProb, scale, /*shift=*/0.f);
}
DISPATCH2(SinusoidalPositionEmbeddings, marian::Tensor, int);
#ifdef CUDA_FOUND
namespace gpu {
void Deconcatenate(std::vector<marian::Tensor>& outputs,
@ -286,7 +288,8 @@ DISPATCH3(GRUFastForward, marian::Tensor, std::vector<marian::Tensor>, bool)
#ifdef CUDA_FOUND
namespace gpu {
void GRUFastBackward(std::vector<marian::Tensor> outputs,
void GRUFastBackward(Ptr<Allocator> allocator,
std::vector<marian::Tensor> outputs,
std::vector<marian::Tensor> inputs,
marian::Tensor adj,
bool final);
@ -294,22 +297,24 @@ void GRUFastBackward(std::vector<marian::Tensor> outputs,
#endif
namespace cpu {
void GRUFastBackward(std::vector<marian::Tensor> outputs,
void GRUFastBackward(Ptr<Allocator> allocator,
std::vector<marian::Tensor> outputs,
std::vector<marian::Tensor> inputs,
marian::Tensor adj,
bool final);
}
static inline void GRUFastBackward(std::vector<marian::Tensor> outputs,
static inline void GRUFastBackward(Ptr<Allocator> allocator,
std::vector<marian::Tensor> outputs,
std::vector<marian::Tensor> inputs,
marian::Tensor adj,
bool final = false) {
#ifdef CUDA_FOUND
if(adj->getBackend()->getDeviceId().type == DeviceType::gpu)
gpu::GRUFastBackward(outputs, inputs, adj, final);
gpu::GRUFastBackward(allocator, outputs, inputs, adj, final);
else
#endif
cpu::GRUFastBackward(outputs, inputs, adj, final);
cpu::GRUFastBackward(allocator, outputs, inputs, adj, final);
}
// clang-format off

View File

@ -43,10 +43,10 @@ void tests(DeviceType device, Type floatType = Type::float32) {
values.clear();
std::vector<T> vA({1, -2, 3, -4});
auto a = graph->constant({2, 2, 1}, inits::fromVector(vA));
auto a = graph->constant({2, 2}, inits::fromVector(vA));
auto compare = [&](Expr res, std::function<float(float)> f) -> bool {
if (res->shape() != Shape({ 2, 2, 1 }))
if (res->shape() != Shape({ 2, 2 }))
return false;
res->val()->get(values);
std::vector<float> ref{f(vA[0]), f(vA[1]), f(vA[2]), f(vA[3])};
@ -129,14 +129,14 @@ void tests(DeviceType device, Type floatType = Type::float32) {
std::vector<T> vA({1, -2, 3, -4});
std::vector<T> vB({0.5, 1.5});
auto a = graph->constant({2, 2, 1}, inits::fromVector(vA));
auto b = graph->constant({2, 1}, inits::fromVector(vB));
auto a = graph->constant({2, 2}, inits::fromVector(vA));
auto b = graph->constant({2}, inits::fromVector(vB));
// Two lambdas below differ in the use of floatEqual or floatApprox and
// are not merged because MSVC compiler returns C2446: no conversion from
// lambda_x to lambda_y
auto compare = [&](Expr res, std::function<float(float,float)> f) -> bool {
if (res->shape() != Shape({ 2, 2, 1 }))
if (res->shape() != Shape({ 2, 2 }))
return false;
res->val()->get(values);
std::vector<float> ref{f(vA[0], vB[0]), f(vA[1], vB[1]), f(vA[2], vB[0]), f(vA[3], vB[1])};
@ -144,7 +144,7 @@ void tests(DeviceType device, Type floatType = Type::float32) {
};
auto compareApprox = [&](Expr res, std::function<float(float, float)> f) -> bool {
if(res->shape() != Shape({2, 2, 1}))
if(res->shape() != Shape({2, 2}))
return false;
res->val()->get(values);
std::vector<float> ref{f(vA[0], vB[0]), f(vA[1], vB[1]), f(vA[2], vB[0]), f(vA[3], vB[1])};

View File

@ -11,9 +11,19 @@
namespace marian {
ShardingMode getShardingMode(Ptr<Options> options, Ptr<IMPIWrapper> mpi) {
auto shardOpt = options->get<std::string>("sharding", "global");
if(shardOpt == "global" || !mpi || (mpi && mpi->numMPIProcesses() == 1))
return ShardingMode::global;
else if(shardOpt == "local")
return ShardingMode::local;
else
ABORT("Unknown sharding mode {}", shardOpt);
}
Ptr<ICommunicator> createCommunicator(
const std::vector<Ptr<ExpressionGraph>>& graphs,
bool noNccl, Ptr<IMPIWrapper> mpi) {
bool noNccl, ShardingMode shardingMode, Ptr<IMPIWrapper> mpi) {
mpi;
#if defined(CUDA_FOUND) && defined(USE_NCCL)
if(noNccl) {
@ -38,9 +48,9 @@ Ptr<ICommunicator> createCommunicator(
}
// the actual implementation is inside communicator.cu
return New<NCCLCommunicator>(graphs, mpi);
return New<NCCLCommunicator>(graphs, shardingMode, mpi);
#else // no CUDA or no NCCL
noNccl; // (unused)
noNccl; shardingMode; // (unused)
return New<DefaultCommunicator>(graphs, mpi);
#endif
}

View File

@ -24,8 +24,12 @@
namespace marian {
enum struct ShardingMode : size_t { global, local };
struct/*interface*/ IMPIWrapper; // @TODO: Should we use a separate header, or move this declaration up here?
ShardingMode getShardingMode(Ptr<Options> options, Ptr<IMPIWrapper> mpi);
// This interface implements the cross-GPU operations for distributed training within a single box.
class ICommunicator {
protected:
@ -50,10 +54,8 @@ public:
virtual void scatterReduceAndResetGrads() const = 0; // reduce param gradients and scatter into gradient shards
virtual void allGatherParams() const = 0; // redistribute value shards into param values
#if 0 // disabled for now, not being called anywhere in the code
virtual void swapParams(const std::vector<Tensor>& paramShards) const = 0;
#endif
virtual void broadcastParams(bool average = false) const = 0; // average corresponding parameters across all workers
virtual void broadcastShards(const std::vector<Ptr<OptimizerBase>>& opts, bool average = false) const = 0;
virtual void scatterState(const io::Item& data, const OptimizerBase::ScatterStateSetFunc& setFn) const = 0;
virtual io::Item gatherState(const OptimizerBase::GatherStateGetFunc& getFn) const = 0;
@ -72,10 +74,11 @@ struct MPI_Status { int MPI_SOURCE; };
#define MPI_ANY_SOURCE ((size_t)-2)
#define MPI_STATUS_IGNORE ((MPI_Status*)nullptr)
#endif
struct/*interface*/ IMPIWrapper
{
struct/*interface*/ IMPIWrapper {
virtual size_t myMPIRank() const = 0;
virtual size_t numMPIProcesses() const = 0;
virtual bool isMainProcess() const { return myMPIRank() == 0; }
virtual void barrier(MPI_Comm comm = MPI_COMM_WORLD) const = 0;
virtual void bCast(void* buf, size_t count, MPI_Datatype datatype, size_t rootRank = 0, MPI_Comm comm = MPI_COMM_WORLD) const = 0;
virtual void sSend(void* buf, size_t count, MPI_Datatype datatype, size_t destRank, int tag, MPI_Comm comm = MPI_COMM_WORLD) const = 0;
@ -163,7 +166,7 @@ public:
// determine the (max) shard size
// All shards except the last one have this size.
// Presently, even all shards must have identical size, due to a limitation in NCCL we have not yet worked around.
// Presently, all shards must have identical size, due to a limitation in NCCL we have not yet worked around.
size_t shardSize() const {
size_t numShards = graphs_.size();
size_t size = (dataSize() + numShards - 1) / numShards;
@ -272,6 +275,24 @@ public:
foreach(gather);
}
void broadcastParams(bool average = false) const override {
ABORT_IF(average, "Parameter averaging not implemented in DefaultCommunicator::broadcastParams");
// Copy parameters from first graph
auto copyFromFirst = [this](size_t idx, size_t /*begin*/, size_t /*end*/) {
if(idx != 0)
graphs_[idx]->params()->vals()->copyFrom(graphs_[0]->params()->vals());
return true; // dummy success
};
foreach(copyFromFirst);
}
virtual void broadcastShards(const std::vector<Ptr<OptimizerBase>>& opts, bool average = false) const override {
opts; average;
ABORT("DefaultCommunicator::broadcastShards not implemented");
}
void scatterState(const io::Item& data, const OptimizerBase::ScatterStateSetFunc& setFn) const override {
size_t dataSize = data.size();
size_t numLocalDevices = graphs_.size();
@ -296,6 +317,6 @@ public:
Ptr<ICommunicator> createCommunicator(
const std::vector<Ptr<ExpressionGraph>>& graphs,
bool noNccl, Ptr<IMPIWrapper> mpi);
bool noNccl, ShardingMode shardingMode, Ptr<IMPIWrapper> mpi);
} // namespace marian

View File

@ -1,12 +1,18 @@
// Note: This must only be included if defined(CUDA_FOUND) && defined(USE_NCCL)
#include "training/communicator.h"
#include "3rd_party/threadpool.h"
#include "tensors/tensor_operators.h"
#include "tensors/gpu/cuda_helpers.h"
#include "common/timer.h"
// Generated by NCCL make files in build/nccl/include;
// include dir has been set in CMake files. NCCL add version number etc.
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4505) // "unreferenced local function has been removed" in cuda_fp16.hpp
#endif
#include "nccl.h"
#include <cuda_runtime.h>
@ -23,7 +29,11 @@ namespace marian {
class NCCLCommunicator : public ICommunicator {
private:
std::vector<ncclComm_t> comms_; // [device index]
ShardingMode shardingMode_{ShardingMode::global};
std::vector<ncclComm_t> globalComms_; // [device index]
std::vector<ncclComm_t> localComms_; // [device index]
std::vector<cudaStream_t> streams_; // [device index]
std::vector<int> devices_; // [device index]
Ptr<IMPIWrapper> mpi_; // (may be null)
@ -55,18 +65,26 @@ private:
return mpi_ ? mpi_->idStr() : "";
}
size_t myNcclRank(size_t localDeviceIndex) const { // map local device index to a global rank
if (mpi_)
return mpi_->myMPIRank() * devices_.size() + localDeviceIndex;
else
return localDeviceIndex;
size_t numLocalRanks() const {
return devices_.size();
}
size_t myLocalRank(size_t localDeviceIndex) const { // map local device index to a global rank
return localDeviceIndex; // do nothing
}
size_t numNcclRanks() const { // total number of devices across all MPI processes
if (mpi_)
return mpi_->numMPIProcesses() * devices_.size();
return mpi_->numMPIProcesses() * numLocalRanks();
else
return devices_.size();
return numLocalRanks();
}
size_t myNcclRank(size_t localDeviceIndex) const { // map local device index to a global rank
if (mpi_)
return mpi_->myMPIRank() * numLocalRanks() + myLocalRank(localDeviceIndex);
else
return myLocalRank(localDeviceIndex);
}
size_t dataSize() const { // total number of floats that comprise the concatenated parameter and gradient vector
@ -77,7 +95,7 @@ private:
// All shards except the last one have this size.
// Presently, even all shards must have identical size, due to a limitation in NCCL we have not yet worked around.
size_t shardSize() const {
size_t numShards = numNcclRanks();
size_t numShards = shardingMode_ == ShardingMode::global ? numNcclRanks() : numLocalRanks();
size_t size = (dataSize() + numShards - 1) / numShards;
#if 1 // for now, all shards must have the same size, since NCCL does not allow a sub-slice for the last shard
ABORT_IF(size * numShards != dataSize(), "presently, all shards must have the same size");
@ -96,7 +114,7 @@ private:
// determine the index range (begin, end) of a shard
std::pair<size_t, size_t> localShardRange(size_t localDeviceIndex) const {
return ncclRankShardRange(myNcclRank(localDeviceIndex));
return ncclRankShardRange(shardingMode_ == ShardingMode::global ? myNcclRank(localDeviceIndex) : myLocalRank(localDeviceIndex));
}
static std::string ncclVersionString() {
@ -110,31 +128,16 @@ private:
mpi_->barrier();
}
// helper class to temporarily block a UNIX signal
class BlockSignal {
typedef std::function<void(int, const sigset_t*, sigset_t*)> SigMaskFn;
SigMaskFn sigMaskFn_; // function to set the mask, thread or proc
sigset_t oldSigSet_; // old set to restore the signal
public:
BlockSignal(int signal, const SigMaskFn& sigMaskFn) : sigMaskFn_(sigMaskFn) {
sigset_t newSigSet;
sigemptyset(&newSigSet);
sigaddset(&newSigSet, signal); // block signal by setting it in the blocked-signal mask
sigMaskFn_(SIG_BLOCK, &newSigSet, &oldSigSet_);
}
~BlockSignal() {
sigMaskFn_(SIG_BLOCK, &oldSigSet_, nullptr); // restore old signal mask
}
};
public:
// a NCCLCommunicator is bound to a set of graphs, one per GPU device
// If MPI is used, then each MPI process has an instance of this class for its specific
// set of GPU devices, which are communicating with each other. The total number of GPUs
// involved in the NCCL communication setup is (#MPI processes) x (#GPUs per process).
NCCLCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, Ptr<IMPIWrapper> mpi)
NCCLCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, ShardingMode shardingMode, Ptr<IMPIWrapper> mpi)
: ICommunicator(graphs),
comms_(graphs.size()),
shardingMode_(shardingMode),
globalComms_(graphs.size()),
localComms_(graphs.size()),
streams_(graphs.size()),
devices_(graphs.size()),
mpi_(mpi),
@ -157,41 +160,63 @@ public:
CUDA_CHECK(cudaStreamCreate(&streams_[i]));
}
#if 0 // Using NCCL 2.8.3 now
// Note: due to a bug in NCCL 2.3.5, NCCL's allocation of shared memory intermittently fails with
// Failed, NCCL error 2 'unhandled system error' - ncclGroupEnd()
// include/shm.h:26 NCCL WARN Unable to allocate shared memory (4263936 bytes) : Interrupted system call
// This is caused by SIGPROF signals being raised, causing EINTR, which NCCL does not handle.
// Reported as Issue #137 on the NCCL Github, and supposedly fixed for 2.3.7 (to be verified).
// To work around, we disable the SIGPROF signal during NCCL initialization.
#define SIG_BAD 27 // SIGPROF
BlockSignal blockThread(SIG_BAD, pthread_sigmask); // Note: I don't know yet which of these two makes the difference.
BlockSignal blockProc(SIG_BAD, sigprocmask); // So for now just block both.
#endif
// set up NCCL
// Since we want to use MPI, we cannot use NCCL's handy convenience function. Instead, we must go the laborious route.
// cf. https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/index.html#multidevprothrd
// generate NCCL unique ID at one process and broadcast to all
ncclUniqueId uniqueId = { 0 };
if (!mpi_ || mpi->myMPIRank() == 0)
NCCL_CHECK(ncclGetUniqueId(&uniqueId));
if(!mpi_) // without MPI local and global is the same, so only handle global
shardingMode_ = ShardingMode::global;
if(mpi_) {
static_assert(sizeof(uniqueId) == NCCL_UNIQUE_ID_BYTES, "wrong NCCL_UNIQUE_ID_BYTES??"); // (this value is used in NVidia examples)
mpi_->bCast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0);
}
mpiBarrier();
LOG(info, "[comm] Using {} sharding", shardingMode_ == ShardingMode::global ? "global" : "local");
mpiBarrier();
groupStart();
for (int localDeviceIndex = 0; localDeviceIndex < devices_.size(); localDeviceIndex++) {
CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex]));
NCCL_CHECK(ncclCommInitRank(&comms_[localDeviceIndex], numNcclRanks(), uniqueId, myNcclRank(localDeviceIndex)));
// Creating unique ids for NCCL as well as communicators, if global, we only need one unique id and broadcast it to all processes
if(shardingMode_ == ShardingMode::global) {
ncclUniqueId uniqueId = { 0 };
if (!mpi_ || mpi_->myMPIRank() == 0) {
NCCL_CHECK(ncclGetUniqueId(&uniqueId));
}
if(mpi_) {
static_assert(sizeof(uniqueId) == NCCL_UNIQUE_ID_BYTES, "wrong NCCL_UNIQUE_ID_BYTES??"); // (this value is used in NVidia examples)
mpi_->bCast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0);
}
groupStart();
for(int localDeviceIndex = 0; localDeviceIndex < numLocalRanks(); localDeviceIndex++) {
CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex]));
NCCL_CHECK(ncclCommInitRank(&globalComms_[localDeviceIndex], numNcclRanks(), uniqueId, myNcclRank(localDeviceIndex)));
}
groupEnd();
} else {
ABORT_IF(shardingMode_ == ShardingMode::local && !mpi_, "Local/global sharding only implemented for MPI, global is same as local with no MPI");
std::vector<ncclUniqueId> globalUniqueIds(numLocalRanks(), {0}); // one per local device, binds numProcesses shards at the same location in the processes together
std::vector<ncclUniqueId> localUniqueIds(mpi_->numMPIProcesses(), {0}); // one per process, binds all shards within one process, stays local to process
if(mpi->myMPIRank() == 0) {
for(auto& id : globalUniqueIds)
NCCL_CHECK(ncclGetUniqueId(&id));
for(auto& id : localUniqueIds)
NCCL_CHECK(ncclGetUniqueId(&id));
}
for(auto& id : globalUniqueIds)
mpi_->bCast(&id, sizeof(id), MPI_BYTE, 0);
for(auto& id : localUniqueIds)
mpi_->bCast(&id, sizeof(id), MPI_BYTE, 0);
groupStart();
for(int localDeviceIndex = 0; localDeviceIndex < numLocalRanks(); localDeviceIndex++) {
CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex]));
NCCL_CHECK(ncclCommInitRank(&globalComms_[localDeviceIndex], mpi_->numMPIProcesses(), globalUniqueIds[localDeviceIndex], mpi_->myMPIRank()));
NCCL_CHECK(ncclCommInitRank( &localComms_[localDeviceIndex], numLocalRanks(), localUniqueIds[mpi_->myMPIRank()], localDeviceIndex));
}
groupEnd();
}
groupEnd();
mpiBarrier(); // (synchronize the log messages)
LOG(info, "[comm] NCCLCommunicator constructed successfully");
LOG(info, "[comm] NCCLCommunicators constructed successfully");
mpiBarrier(); // (synchronize the log messages)
}
@ -199,7 +224,9 @@ public:
for(int i = 0; i < devices_.size(); ++i) {
cudaSetDevice(devices_[i]);
cudaStreamDestroy(streams_[i]);
ncclCommDestroy(comms_[i]);
ncclCommDestroy(globalComms_[i]);
if(shardingMode_ == ShardingMode::local)
ncclCommDestroy(localComms_[i]);
}
}
@ -251,7 +278,13 @@ public:
if(grads->type() == Type::float16)
ncclFloatType = ncclFloat16;
NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, comms_[i], streams_[i]));
if(shardingMode_ == ShardingMode::global) {
NCCL_CHECK(ncclAllReduce(grads->data(), grads->data(), grads->size(), ncclFloatType, ncclSum, globalComms_[i], streams_[i])); // apparently this is somehow faster??
// NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, globalComms_[i], streams_[i]));
} else {
NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, localComms_[i], streams_[i])); // reduceScatter locally
NCCL_CHECK( ncclAllReduce(recvbuf, recvbuf, bufsize, ncclFloatType, ncclSum, globalComms_[i], streams_[i])); // then do tuple-wise allReduce across processes
}
}
groupEnd();
synchronizeAll();
@ -293,7 +326,80 @@ public:
if(vals->type() == Type::float16)
ncclFloatType = ncclFloat16;
NCCL_CHECK(ncclAllGather(sendbuf, recvbuf, bufsize, ncclFloatType, comms_[i], streams_[i]));
auto& comms = shardingMode_ == ShardingMode::global ? globalComms_ : localComms_;
NCCL_CHECK(ncclAllGather(sendbuf, recvbuf, bufsize, ncclFloatType, comms[i], streams_[i]));
}
groupEnd();
synchronizeAll();
}
void broadcastParams(bool average = false) const override {
synchronizeAllOnNullStream();
groupStart();
for(int i = 0; i < graphs_.size(); ++i) {
auto vals = graphs_[i]->params()->vals();
ncclDataType_t ncclFloatType = ncclFloat32;
if(vals->type() == Type::float16)
ncclFloatType = ncclFloat16;
if(average)
NCCL_CHECK(ncclAllReduce(vals->data(), vals->data(), vals->size(), ncclFloatType, ncclSum, globalComms_[i], streams_[i]));
else
NCCL_CHECK(ncclBroadcast(vals->data(), vals->data(), vals->size(), ncclFloatType, 0, globalComms_[i], streams_[i]));
}
groupEnd();
synchronizeAll();
if(average) {
auto avg = [&](size_t i, size_t /*begin*/, size_t /*end*/) {
auto vals = graphs_[i]->params()->vals();
using namespace functional;
Element(_1 = _1 / (float)mpi_->numMPIProcesses(), vals);
return true; // dummy success
};
foreach(avg);
}
}
void broadcastShards(const std::vector<Ptr<OptimizerBase>>& opts, bool average = false) const override {
if(shardingMode_ == ShardingMode::global)
return; // nothing to do, shards are indepedent
auto floatType = [](Tensor tensor) {
ncclDataType_t ncclFloatType = ncclFloat32;
if(tensor->type() == Type::float16)
ncclFloatType = ncclFloat16;
return ncclFloatType;
};
// if we are here we use local mode and shards are process-wise copies
synchronizeAllOnNullStream();
groupStart();
for(int i = 0; i < opts.size(); ++i) {
for(auto shard : opts[i]->getShards()) {
if(shard) {
if(average) {
NCCL_CHECK(ncclAllReduce(shard->data(),
shard->data(),
shard->size(),
floatType(shard),
ncclSum,
globalComms_[i],
streams_[i]));
using namespace functional;
Element(_1 = _1 / (float)mpi_->numMPIProcesses(), shard);
} else {
NCCL_CHECK(ncclBroadcast(shard->data(),
shard->data(),
shard->size(),
floatType(shard),
0,
globalComms_[i],
streams_[i]));
}
}
}
}
groupEnd();
synchronizeAll();
@ -304,11 +410,11 @@ public:
// It is assumed that all MPI processes get the same data() passed. Hence, no MPI transfers are needed here.
void scatterState(const io::Item& data, const OptimizerBase::ScatterStateSetFunc& setFn) const override {
size_t dataSize = data.size();
size_t numShards = numNcclRanks();
size_t numShards = shardingMode_ == ShardingMode::global ? numNcclRanks() : numLocalRanks(); // @TODO: numShards() function
size_t shardSize = (dataSize + numShards - 1) / numShards;
for(size_t localDeviceIndex = 0; localDeviceIndex < graphs_.size(); localDeviceIndex++) {
// We only slice out data that is kept in our MPI process. Remember that all MPI processes receive the same, complete data.
auto ncclRank = myNcclRank(localDeviceIndex);
auto ncclRank = shardingMode_ == ShardingMode::global ? myNcclRank(localDeviceIndex) : myLocalRank(localDeviceIndex);
size_t begin = ncclRank * shardSize;
size_t end = std::min(begin + shardSize, dataSize);
setFn(localDeviceIndex, data.bytes.data() + begin, data.bytes.data() + end);
@ -316,19 +422,19 @@ public:
}
// Collect shards across multiple devices and MPI processes in the NCCL configuration into a single CPU-side io::Item.
// This is used when persisting optimizer state, which is sharded.
// This is used when persisting optimizer state which is sharded.
io::Item gatherState(const OptimizerBase::GatherStateGetFunc& getFn) const override {
// first, concatenate over all local devices
io::Item localData = getFn(0);
for(size_t localDeviceIndex = 1; localDeviceIndex < graphs_.size(); localDeviceIndex++) {
localData.append(getFn(localDeviceIndex));
}
// local data now contains a concatentation of all local data
// localData now contains a concatentation of all local data
// second, concatenate across MPI processes
// Note that all local devices occupy consecutive ncclRanks in order.
io::Item data;
if (mpi_) {
if (mpi_ && shardingMode_ == ShardingMode::global) {
io::Item tmp = localData; // temp buffer used multiple times; assign localData for initialization
// push one rank's data at a time using broadcast
for(size_t mpiRank = 0; mpiRank < mpi_->numMPIProcesses(); mpiRank++) {

View File

@ -6,6 +6,7 @@ GraphGroup::GraphGroup(Ptr<Options> options, Ptr<IMPIWrapper> mpi)
: options_(options),
mpi_(mpi),
devices_(Config::getDevices(options, mpi->myMPIRank(), mpi->numMPIProcesses())),
shardingMode_(getShardingMode(options_, mpi)),
mbRoundUp_(options_->get<bool>("mini-batch-round-up", true)) {
if(options_->hasAndNotEmpty("cost-scaling")) {
auto vcs = options_->get<std::vector<std::string>>("cost-scaling");
@ -53,7 +54,11 @@ GraphGroup::GraphGroup(Ptr<Options> options, Ptr<IMPIWrapper> mpi)
// Note: We may well end up with only one MPI process or only one graph per worker.
// This part of the code will not special-case any of this here.
// Rather, it is assumed that the communicator knows to reduce unnecessary transfers to no-ops.
comm_ = createCommunicator(graphs_, /*noNccl=*/options_->get<bool>("no-nccl", false), /*mpi=*/mpi_);
// @TODO: createCommunicator(options, ...)
comm_ = createCommunicator(graphs_,
/*noNccl=*/options_->get<bool>("no-nccl", false),
shardingMode_,
/*mpi=*/mpi_);
auto formattedDeviceType = utils::utf8ToUpper(devices_.front().typeAsString()) + "s";
if (mpi_->numMPIProcesses() > 1)
@ -76,9 +81,10 @@ void GraphGroup::initGraphsAndOpts() {
if(options_->get<bool>("check-nan")) // @TODO: add to other places
graph->setThrowNaN(true);
graph->setDevice(device);
graph->setDevice(device);
graph->reserveWorkspaceMB(options_->get<size_t>("workspace"));
graphs_.push_back(graph);
optimizerShards_.push_back(Optimizer(options_));
@ -172,15 +178,19 @@ float GraphGroup::executeAndCollectNorm(const std::function<float(size_t, size_t
if(mpi_) { // accumulate gradientNorm from subprocesses
auto gradNormSquared = gradNorm * gradNorm; // undo sqrt
mpi_->allReduce(&gradNormSquared, &gradNormSquared, 1, MPI_FLOAT, MPI_SUM); // sum all
if(shardingMode_ == ShardingMode::local) // we already have the correct norm on one device, but we also need to check for NaN
gradNormSquared /= (float)mpi_->numMPIProcesses();
gradNorm = std::sqrt(gradNormSquared); // redo sqrt
}
return gradNorm;
}
/**
* This function computes a normalization factor that is applied to the gradient before an update.
* This function computes are normalization factor that is applied to the gradient before an update.
* Depending on various settings this will return a normalizer that can perform a combination of:
* - apply a cost-scaling factor if cost-scaling is enabled
* - apply a cost scaling factor if cost scaling is enabled
* - normalize the gradient by the number of words in a batch if requested (turning ce-sum in to ce-mean). @TODO: once fp16 stability issues are proven to not be caused by this, remove.
* - re-scale the gradient based on a dynamic running average of gradient norms
*/
@ -253,23 +263,21 @@ void GraphGroup::load(const OptimizerBase::ScatterStateFunc& scatterFn) {
/*
if not no-reload (=> i.e. do reload):
restore scheduler
if checkpoint is available:
if checkpoint is available or not no-reload-checkpoint:
reload from checkpoint
else if model is available:
reload from model, but warn that no checkpoint was used and the model could be smoothed
else if pretrained-model path given:
initialize matching weights from pretrained model
else if pretrained-model path given:
initialize matching weights from pretrained model
else:
(implicitly) don't do anything => initialize randomly later
*/
if(!options_->get<bool>("no-reload")) {
std::string modelFileName = options_->get<std::string>("model");
if(filesystem::exists(modelFileName)) {
if(scheduler_)
scheduler_->load(modelFileName);
// we just load it N times from disk (it'll be in disk cache after the first)
// this also allocates memory correctly when calling forward() inside restoreFromCheckPoint
size_t i = 0;
@ -307,6 +315,13 @@ bool GraphGroup::restoreFromCheckpoint(const std::string& modelFileName,
}
auto items = io::loadItems(checkpointName);
// make sure all nodes see the same checkpoint data, may not be the case with distributed file systems
// when there was a delay in updating the caches accross nodes. So here node 0 sends its data to all.
// We still load them all from disk, but that serves more as a trick to allocate the correct memory.
if(mpi_)
for(auto& item : items)
mpi_->bCast(item);
// @TODO: probably we want to have the list of DeviceIds as an attribute
std::vector<Ptr<Backend>> backends;
@ -384,11 +399,8 @@ void GraphGroup::save(bool isFinal,
swapWithSmoothed();
if(isMainProcess()) { // only save from main MPI process
// do final validation
if(isFinal && scheduler_)
scheduler_->validate(graphs_, isFinal);
}
if(isFinal && scheduler_)
scheduler_->validate(graphs_, isFinal);
barrier(); // (for better grouping of log messages)
@ -429,7 +441,10 @@ void GraphGroup::swapWithSmoothed() {
};
comm_->foreach(swap);
comm_->allGatherParams();
if(shardingMode_ == ShardingMode::local)
comm_->broadcastParams();
barrier();
}

View File

@ -47,6 +47,7 @@ protected:
Ptr<IMPIWrapper> mpi_; // [not null] all MPI-like communication goes through this (this is a dummy implementation if no MPI run)
std::vector<DeviceId> devices_; // [deviceIndex]
ShardingMode shardingMode_{ShardingMode::global}; // If local and multi-node training, shard only on local devices and do full sync (faster). If global shard across entire set of GPUs (more RAM).
// common for all graph groups, individual graph groups decide how to fill them
std::vector<Ptr<ExpressionGraph>> graphs_; // [deviceIndex]
@ -114,7 +115,7 @@ private:
public:
void swapWithSmoothed();
bool isMainProcess() const { return mpi_->myMPIRank() == 0; } // (we need this test a few times)
bool isMainProcess() const { return mpi_->isMainProcess(); } // (we need this test a few times)
void barrier() const { mpi_->barrier(); } // (we need this several times)
void validate();

View File

@ -29,26 +29,24 @@ void SyncGraphGroup::initialize(const Ptr<data::Batch>& exampleBatch) {
return true; // dummy success
});
// Copy weights from 0-th graph to all other graphs
// to have equal weights across devices
comm_->foreach([&](size_t i, size_t /*begin*/, size_t /*end*/) {
if (i > 0)
graphs_[i]->params()->vals()->copyFrom(graphs_[0]->params()->vals());
return true; // dummy success
});
// Copy weights from 0-th graph to all other graphs to have equal weights across devices.
// This is used after weight initialization and after checkpoint restoration.
comm_->broadcastParams();
// initialize model quantization
if (options_->get<size_t>("quantize-bits") > 0) {
for (int idx = 0; idx < graphs_.size(); idx++)
quantizers_.push_back(New<ModelQuantizer>(options_));
comm_->foreach([&](size_t idx, size_t /*begin*/, size_t /*end*/) { quantizers_[idx]->quantize(graphs_[idx]); return true; });
comm_->foreach([&](size_t idx, size_t /*begin*/, size_t /*end*/) {
quantizers_[idx]->quantize(graphs_[idx]); return true;
});
}
// We compute the readerMultiplier in collectStats(...) and the updateMultiplier_ here
// as collectStats maybe called for a different instance of this object and fields would not
// survive destruction.
double multiplier = devices_.size() * mpi_->numMPIProcesses() * delay_;
double multiplier = devices_.size() /** mpi_->numMPIProcesses()*/ * delay_; // @TODO: make this optional? Comment what is going on.
bool isDynamic = scheduler_->isDynamicMBSizeScaling();
updateMultiplier_ = isDynamic ? multiplier : 1.; // multiplier applied later in update()
}
@ -60,7 +58,7 @@ Ptr<data::BatchStats> SyncGraphGroup::collectStats(const std::vector<Ptr<Vocab>>
// If dynamic MB scaling, then we want fine-grained minibatches of the size of one GPU.
// If not, we prefer a single large batch that can be split into equal-size parts over GPUs,
// so that we have perfect load balancing and read precisely as much as we need (no waste).
double multiplier = devices_.size() * mpi_->numMPIProcesses() * delay_;
double multiplier = devices_.size() /** mpi_->numMPIProcesses()*/ * delay_;
bool isDynamic = scheduler_->isDynamicMBSizeScaling();
double readerMultiplier = isDynamic ? 1. : multiplier; // multiplier applied already by reader
return GraphGroup::collectStats(graphs_[0], models_[0], vocabs, readerMultiplier);
@ -112,7 +110,7 @@ bool SyncGraphGroup::tryGetSubBatches(Ptr<data::Batch> newBatch,
// - reference batch size specified: (reference batch size / typical aggregate reader batch size)
// - no ref size specified: 1
size_t warpSize = devices_.size() * mpi_->numMPIProcesses(); // warp := set of batches processed concurrently across GPUs and workers
size_t warpSize = devices_.size() /** mpi_->numMPIProcesses()*/; // warp := set of batches processed concurrently across GPUs and workers
// if not dynamic then return the big batch, but first split it over GPUs as it may be too large
if (!scheduler_->isDynamicMBSizeScaling()) {
@ -125,6 +123,9 @@ bool SyncGraphGroup::tryGetSubBatches(Ptr<data::Batch> newBatch,
numReadBatches = 1;
return true;
}
// we are collecting multiple batches and potentially cut them to size here
LOG_ONCE(info, "[training] Dynamic mini-batch scaling enabled");
// if dynamic and mini-batch-fit, then we get batches in the size of what fits into one GPU
@ -140,12 +141,12 @@ bool SyncGraphGroup::tryGetSubBatches(Ptr<data::Batch> newBatch,
// If a reference is given, then at progress == mbWarmup.n (ratio=1), we would like to have refBatchLabels instead of whichever
// the actual batch size is. Since we cannot know the future actual batch sizes that will be delivered
// by the reader, we approximate them with (typicalTrgBatchWords * updateMultiplier), and scale ratio accordingly.
auto refBatchLabels = options_->get<size_t>("mini-batch-words");
auto refBatchLabels = options_->get<size_t>("mini-batch-words") / mpi_->numMPIProcesses();
if (refBatchLabels != 0) {
LOG_ONCE(info, "[scheduler] Scaling to {} reference labels, using actual-batch-word estimate of {}", refBatchLabels, GraphGroup::getTypicalTrgBatchWords());
ABORT_IF(GraphGroup::getTypicalTrgBatchWords() == 0, "Dynamic scaling with words target requires MB size to be known in words"); // happens if MB size is specified in sentences
GraphGroup::updateAverageTrgBatchWords(newBatch->wordsTrg());
GraphGroup::updateAverageTrgBatchWords(newBatch->wordsTrg()); // @TODO: should this be synchronized with MPI?
ratio *= (double)refBatchLabels / (GraphGroup::getTypicalTrgBatchWords() * updateMultiplier_); // cancellation of updateMultiplier_
}
@ -153,8 +154,8 @@ bool SyncGraphGroup::tryGetSubBatches(Ptr<data::Batch> newBatch,
if(GraphGroup::mbRoundUp_) // true by default
ratio = roundUpRatio(ratio);
if (pendingBatches_.size() < ratio)
return false; // not enough data yet
if(pendingBatches_.size() < ratio || pendingBatches_.size() % (size_t)updateMultiplier_ != 0)
return false; // not enough data yet or not a multiple of the multiplier
// now we have enough to fill at least 'ratio' batches
// @BUGBUG: We do not handle the case that fixed MB size * ratio exceeds GPU memory (we'd need to split that).
@ -176,42 +177,11 @@ bool SyncGraphGroup::tryGetSubBatches(Ptr<data::Batch> newBatch,
batch = batch->split(/*numSubBatches=*/1, reducedBatchSize).front();
}
// load-balance: distribute the last numWarps-group's batches over GPUs
// This is tricky since batches do not have the same length, therefore we can only split, but not merge.
auto numWarps = (pendingBatches_.size() - 1) / warpSize + 1; // = ceil(#buffers / (#GPUs * #workers))
auto availableDevices = numWarps * warpSize; // we will run this many GPUs: better use them all
if (pendingBatches_.size() < availableDevices) {
// last warp does not use all available GPUs: try to re-balance
auto fullWarpsBatches = (numWarps - 1) * warpSize; // number of batches in all but the last warp. Those warps that are fully used.
auto lastWarpSize = pendingBatches_.size() - fullWarpsBatches; // the last warp is possibly not fully used
//LOG(info, "attempting to redistribute last {} batches over {} devices", lastWarpSize, warpSize);
auto splitInto = warpSize / lastWarpSize;
if (splitInto > 1) { // unfortunately we can only split in integer ratios
// split each of last numWarps's batches into 'splitInto' batches
// pop them first
std::vector<Ptr<data::Batch>> batchesToSplit;
while (pendingBatches_.size() > fullWarpsBatches) {
batchesToSplit.push_back(pendingBatches_.back());
pendingBatches_.pop_back();
}
// now split them and push them back
for (auto& batchToSplit : batchesToSplit) {
//LOG(info, "{}-way splitting batchToSplit with size {}", splitInto, batchToSplit->size());
auto splitBatches = batchToSplit->split(splitInto);
for (auto& splitBatch : splitBatches) {
//LOG(info, " -> getting batchToSplit with size {}", splitBatch->size());
pendingBatches_.push_back(splitBatch);
}
}
}
ABORT_IF(pendingBatches_.size() > availableDevices, "somehow split into too many batches??");
}
subBatches = std::move(pendingBatches_);
// Order by decreasing batch width to better pack computation in case of delayed updates
if(subBatches.size() > 1)
std::sort(subBatches.begin(), subBatches.end(),
[](Ptr<data::Batch> a, Ptr<data::Batch> b) { return a->widthTrg() > b->widthTrg(); });
std::sort(subBatches.begin(),
subBatches.end(),
[](Ptr<data::Batch> a, Ptr<data::Batch> b){ return a->widthTrg() > b->widthTrg(); });
return true;
}
@ -222,31 +192,35 @@ void SyncGraphGroup::update(Ptr<data::Batch> newBatch) /*override*/ {
std::vector<Ptr<data::Batch>> subBatches;
size_t numReadBatches; // actual #batches delivered by reader, for restoring from checkpoint --@TODO: reader should checkpoint itself; should not go via the scheduler
bool gotSubBatches = tryGetSubBatches(newBatch, subBatches, numReadBatches);
// not enough data yet: return right away
if (!gotSubBatches)
return;
// when decoupled, put barrier here?
barrier();
update(subBatches, numReadBatches);
}
void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t numReadBatches) {
size_t batchSize = 0;
size_t batchTrgWords = 0;
size_t updateBatchSize = 0;
size_t updateTargetWords = 0;
for (const auto& batch : subBatches) {
batchSize += batch->size();
batchTrgWords += batch->wordsTrg();
updateBatchSize += batch->size();
updateTargetWords += batch->wordsTrg();
}
mpi_->allReduce(&updateTargetWords, &updateTargetWords, 1, IMPIWrapper::getDataType(&updateTargetWords), MPI_SUM);
mpi_->allReduce(&updateBatchSize, &updateBatchSize, 1, IMPIWrapper::getDataType(&updateBatchSize), MPI_SUM);
std::sort(subBatches.begin(), subBatches.end(),
[](Ptr<data::Batch> a, Ptr<data::Batch> b) { return a->wordsTrg() > b->wordsTrg(); });
// Helper to access the subBatches array
auto getSubBatch = [&](size_t warp, size_t localDeviceIndex, size_t rank) -> Ptr<data::Batch> {
auto getSubBatch = [&](size_t warp, size_t localDeviceIndex, size_t /*rank*/) -> Ptr<data::Batch> {
// Warp should be slowest changing dimension. If subBatches are sorted by
// length, then grouping sentences of similar length into the same delay step can
// reduce unnecessary time spent in padding.
auto index = (warp * mpi_->numMPIProcesses() + rank) * devices_.size() + localDeviceIndex;
auto index = (warp /** mpi_->numMPIProcesses() + rank*/) * devices_.size() + localDeviceIndex;
if (index < subBatches.size())
return subBatches[index];
else
@ -257,7 +231,7 @@ void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t num
if(first_) {
LOG(info, "[training] Batches are processed as {} process(es) x {} devices/process",
mpi_->numMPIProcesses(), devices_.size());
initialize(subBatches.front());
initialize(subBatches.front()); // @TODO: rename to lazyInitialization, move to GraphGroup
first_ = false;
}
@ -267,8 +241,8 @@ void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t num
comm_->foreach([&](size_t localDeviceIndex, size_t /*begin*/, size_t /*end*/) { // parallel across devices. Aggregate for warp > 1.
auto graph = graphs_[localDeviceIndex];
// reset gradient --presently done outside
//graph->params()->allocateBackward();
//graph->params()->set_zero_adjoint();
// graph->params()->allocateBackward();
// graph->params()->set_zero_adjoint();
// This happens in multiple steps if there are more subbatches than devices.
for (size_t warp = 0; ; warp++) {
// Execute single forward/backward step
@ -288,7 +262,8 @@ void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t num
graph->backward(/*zero=*/false); // (gradients are reset before we get here)
}
#if 1 // experimental and should eventually be somewhere else
#if 1
// experimental and should eventually be somewhere else
// Handle local gradient explosion but only clip to largest possible value
// given number of GPUs and type. Should clip rarely. Also clips inf
// We do another clipping/rescaling after summation.
@ -318,18 +293,17 @@ void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t num
}
bool saneGradient = isFinite(gradNorm);
if(saneGradient) {
// actual model update
auto updateTrgWords = batchTrgWords; // total number of labels across all GPUs and nodes
float gradientNormalizer = GraphGroup::computeNormalizationFactor(gradNorm, updateTrgWords);
float gradientNormalizer = GraphGroup::computeNormalizationFactor(gradNorm, updateTargetWords);
// Update parameter shard with gradient shard
auto update = [&](size_t i, size_t begin, size_t end) -> float {
auto curGrad = graphs_[i]->params()->grads()->subtensor(begin, end-begin);
auto curParam = graphs_[i]->params()->vals()->subtensor(begin, end-begin);
float l2norm = optimizerShards_[i]->update(curParam, curGrad, updateTargetWords, gradientNormalizer);
float l2norm = optimizerShards_[i]->update(curParam, curGrad, updateTrgWords, gradientNormalizer);
// resets remaining gradient to zero
curGrad->set(0.f); // @TODO: all the different places where gradients get reset are confusing
return l2norm; // return partial norm
@ -338,14 +312,16 @@ void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t num
// Overwrite gradNorm with new value from normalized gradient
gradNorm = executeAndCollectNorm(update);
if(!options_->get<bool>("normalize-gradient"))
gradNorm /= updateTrgWords; // normalize for logging
gradNorm /= updateTargetWords; // normalize for logging
comm_->allGatherParams(); // distribute param value shards back
// Re-add the error residual from previous quantization,
// then re-quantize the model back and update the error residual
if (options_->get<size_t>("quantize-bits") > 0)
comm_->foreach([&](size_t idx, size_t /*begin*/, size_t /*end*/) { quantizers_[idx]->quantize(graphs_[idx]); return true; });
if(options_->get<size_t>("quantize-bits") > 0)
comm_->foreach([&](size_t idx, size_t /*begin*/, size_t /*end*/) {
quantizers_[idx]->quantize(graphs_[idx]); return true;
});
} else {
LOG(debug, "Seen NaN in gradient, skipping update, resetting gradient");
@ -367,23 +343,26 @@ void SyncGraphGroup::update(std::vector<Ptr<data::Batch>> subBatches, size_t num
if(scheduler_) {
// track and log localLoss
scheduler_->update(localLoss, numReadBatches, batchSize, batchTrgWords, gradNorm, mpi_);
scheduler_->update(localLoss, numReadBatches, updateBatchSize, updateTargetWords, gradNorm);
if(scheduler_->syncing()) {
if(shardingMode_ == ShardingMode::local) {
LOG(debug, "Syncing all parameters and optimizer shards across {} MPI processes", mpi_->numMPIProcesses());
comm_->broadcastParams();
comm_->broadcastShards(optimizerShards_);
}
}
// save intermediate model (and optimizer state) to file
if(scheduler_->saving())
if(scheduler_->saving()) {
save();
}
// process valid data set
// This may save a model as well.
if(scheduler_->validating()) {
swapWithSmoothed();
// We run validation only in the main process, but this is risky with MPI.
// Validators might modify random state etc., maybe we should run validators
// everywhere, but not report and not safe on the other processes.
if(isMainProcess())
scheduler_->validate(graphs_);
scheduler_->validate(graphs_);
swapWithSmoothed();
}
}

View File

@ -14,6 +14,7 @@ private:
Ptr<Options> options_;
Ptr<TrainingState> state_;
std::vector<Ptr<ValidatorBase>> validators_;
Ptr<IMPIWrapper> mpi_;
bool first_{true}; // true if this is the first update after renewing the training
size_t gradientNormAvgWindow_{100}; // window size for recording the exponential average of gradient norms, after this many updates about 90% of the mass comes from this many last updates
@ -27,7 +28,8 @@ private:
// (regardless if it's the 1st or nth epoch and if it's a new or continued training),
// which indicates the end of the training data stream from STDIN
bool endOfStdin_{false}; // true at the end of the epoch if training from STDIN;
// @TODO: figure out how to compute this with regard to updates as well, although maybe harder since no final value
// determine scheduled LR decay factor (--lr-decay-inv-sqrt option)
float getScheduledLRDecayFactor(const TrainingState& state) const {
auto args = options_->get<std::vector<std::string>>("lr-decay-inv-sqrt");
@ -49,12 +51,6 @@ private:
return 1.f;
}
// update current learning rate in state.eta
// This considers
// - base LR (--learn-rate)
// - LR warm-up (--lr-warmup, --lr=warmup-start-rate)
// - scheduled LR decay (--lr-decay-inv-sqrt)
// - state-based LR decay (--lr-decay, --lr-decay-strategy)
void updateLearningRate(TrainingState& state) const {
float baselr = options_->get<float>("learn-rate");
@ -134,8 +130,8 @@ private:
}
public:
Scheduler(Ptr<Options> options, Ptr<TrainingState> state)
: options_(options), state_(state),
Scheduler(Ptr<Options> options, Ptr<TrainingState> state, Ptr<IMPIWrapper> mpi = nullptr)
: options_(options), state_(state), mpi_(mpi),
gradientNormAvgWindow_(options_->get<size_t>("gradient-norm-average-window", 100)) {
// parse logical-epoch parameters
@ -178,7 +174,7 @@ public:
size_t progress = state_->getProgressIn(mbWarmup.unit); // number of updates/labels processed
auto progressRatio = (double)progress / (double)mbWarmup.n; // where are we relatively within target warm-up period
// if unit is labels, then account for the fact that our increment itself is not constant
#if 0 // this seems to hurt convergence quite a bit compared to when updates is used
#if 1 // this seems to hurt convergence quite a bit compared to when updates is used
if (mbWarmup.unit == SchedulingUnit::trgLabels)
progressRatio = std::sqrt(progressRatio);
#endif
@ -190,6 +186,7 @@ public:
// As LR goes down, MB gets ramped up by the same ratio, which has been found to be safe.
auto mbTracking = options_->get<bool>("mini-batch-track-lr");
if (mbTracking) {
ABORT("Please review this code");
auto lrFactor = getScheduledLRDecayFactor(*state_) * state_->factor; // (don't include lr-warmup)
if (lrFactor != 1)
LOG_ONCE(info, "[scheduler] Dynamic mini-batch size adjustment enabled and kicking in");
@ -286,6 +283,10 @@ public:
return state_->enteredNewPeriodOf(options_->get<std::string>("save-freq"));
}
bool syncing() {
return state_->enteredNewPeriodOf(options_->get<std::string>("sync-freq", "0"));
}
void validate(const std::vector<Ptr<ExpressionGraph>>& graphs,
bool isFinal = false) {
// Do not validate if already validated (for instance, after the model is loaded)
@ -302,27 +303,42 @@ public:
continue;
size_t stalledPrev = validator->stalled();
float value = validator->validate(graphs, state_);
if(validator->stalled() > 0) {
LOG_VALID(info,
"Ep. {} : Up. {} : {} : {} : stalled {} times (last best: {})",
formatLogicalEpoch(),
state_->batches,
validator->type(),
value,
validator->stalled(), validator->lastBest());
} else {
LOG_VALID(info,
"Ep. {} : Up. {} : {} : {} : new best",
formatLogicalEpoch(),
state_->batches,
validator->type(),
value);
if(firstValidator)
state_->validBest = value;
float value = 0;
if(!mpi_ || mpi_->isMainProcess()) {
// We run validation only in the main process, but this is risky with MPI.
// Validators might modify random state etc., maybe we should run validators
// everywhere, but not report and not save on the other processes.
value = validator->validate(graphs, state_);
if(validator->stalled() > 0) {
LOG_VALID(info,
"Ep. {} : Up. {} : {} : {} : stalled {} times (last best: {})",
formatLogicalEpoch(),
state_->batches,
validator->type(),
value,
validator->stalled(), validator->lastBest());
} else {
LOG_VALID(info,
"Ep. {} : Up. {} : {} : {} : new best",
formatLogicalEpoch(),
state_->batches,
validator->type(),
value);
}
}
if(mpi_) {
// collect and broadcast validation result to all processes and bring validator up-to-date
mpi_->bCast(&value, 1, IMPIWrapper::getDataType(&value));
// @TODO: add function to validator?
mpi_->bCast(&validator->stalled(), 1, IMPIWrapper::getDataType(&validator->stalled()));
mpi_->bCast(&validator->lastBest(), 1, IMPIWrapper::getDataType(&validator->lastBest()));
}
if(firstValidator)
state_->validBest = value;
state_->validators[validator->type()]["last-best"] = validator->lastBest();
state_->validators[validator->type()]["stalled"] = validator->stalled();
@ -353,8 +369,7 @@ public:
size_t numReadBatches, // number of batches read by the reader (for seeking in case of restart)
size_t batchSize, // total number of sentences in batch
size_t batchLabels, // total number of target words in batch
float gradientNorm, // gradientNorm of update
Ptr<IMPIWrapper> mpi = nullptr) {
float gradientNorm) { // gradientNorm of update
state_->rememberPreviousProgress(); // note: epoch increases happen at the wrong place, hence
// -freq parameters do not support epoch units
state_->validated = false;
@ -362,9 +377,9 @@ public:
// Since batchLabels is counted across all MPI processes, we also should temporarily
// extrapolate cost across MPI processes, to have numbers in the right range.
// When doing the actual log, we then aggregate across MPI processes to get the accurate number.
if(mpi) {
rationalLoss.loss *= mpi->numMPIProcesses();
rationalLoss.count *= mpi->numMPIProcesses();
if(mpi_) {
rationalLoss.loss *= mpi_->numMPIProcesses();
rationalLoss.count *= mpi_->numMPIProcesses();
}
// @BUGBUG: rationalLoss.count is float, not a count. Possible solution: make (costSum, costCount) a StaticLoss object as well
@ -399,14 +414,14 @@ public:
if(state_->enteredNewPeriodOf(options_->get<std::string>("disp-freq")) || state_->batches <= options_->get<size_t>("disp-first")) {
// if MPI then aggregate precise cost across workers
if(mpi) {
state_->costSum /= mpi->numMPIProcesses(); // undo the extra scaling
state_->costCount /= mpi->numMPIProcesses(); // undo the extra scaling
mpi->allReduce(&state_->costSum, &state_->costSum, 1, MPI_FLOAT, MPI_SUM);
mpi->allReduce(&state_->costCount, &state_->costCount, 1, MPI_FLOAT, MPI_SUM);
if(mpi_) {
state_->costSum /= mpi_->numMPIProcesses(); // undo the extra scaling
state_->costCount /= mpi_->numMPIProcesses(); // undo the extra scaling
mpi_->allReduce(&state_->costSum, &state_->costSum, 1, MPI_FLOAT, MPI_SUM);
mpi_->allReduce(&state_->costCount, &state_->costCount, 1, MPI_FLOAT, MPI_SUM);
}
if(mpi && mpi->myMPIRank() != 0) {
if(mpi_ && mpi_->myMPIRank() != 0) {
// skip the report on alternate worker processes
} else if(options_->get<bool>("lr-report")) {
LOG(info,
@ -443,7 +458,7 @@ public:
// progress heartbeat for MS-internal Philly compute cluster
// This environment variable exists when running on the cluster.
using namespace std::chrono;
if((!mpi || mpi->myMPIRank() == 0) && getenv("PHILLY_JOB_ID")
if((!mpi_ || mpi_->myMPIRank() == 0) && getenv("PHILLY_JOB_ID")
&& heartBeatTimer_.elapsed<std::chrono::minutes>() >= 30) {
fprintf(stderr, "PROGRESS: %.2f%%\nEVALERR: %.7f%%\n",
(double)calculateLogicalEpoch(),

View File

@ -33,14 +33,15 @@ public:
}
Ptr<CorpusBase> dataset;
auto corpusSeed = Config::seed + (mpi ? mpi->myMPIRank() : 0); // @BUGBUG: no correct resume right now
if(!options_->get<std::string>("sqlite").empty())
#ifndef _MSC_VER // @TODO: include SqLite in Visual Studio project
dataset = New<CorpusSQLite>(options_);
dataset = New<CorpusSQLite>(options_, /*translate=*/false, corpusSeed);
#else
ABORT("SqLite presently not supported on Windows");
#endif
else
dataset = New<Corpus>(options_);
dataset = New<Corpus>(options_, /*translate=*/false, corpusSeed);
dataset->prepare();
@ -56,7 +57,7 @@ public:
// use temporary scheduler to make sure everything gets destroyed properly
// otherwise the scheduler believes that registered objects still exist
auto tempTrainState = New<TrainingState>(options_->get<float>("learn-rate"));
auto tempScheduler = New<Scheduler>(options_, tempTrainState);
auto tempScheduler = New<Scheduler>(options_, tempTrainState, mpi);
model->setScheduler(tempScheduler); // collectStats() needs to know about dynamic MB scaling
stats = model->collectStats(dataset->getVocabs());
@ -64,7 +65,7 @@ public:
}
auto trainState = New<TrainingState>(options_->get<float>("learn-rate"));
auto scheduler = New<Scheduler>(options_, trainState);
auto scheduler = New<Scheduler>(options_, trainState, mpi);
if((options_->hasAndNotEmpty("valid-sets") || options_->hasAndNotEmpty("valid-script-path"))
&& SchedulingParameter::parse(options_->get<std::string>("valid-freq"))) {
@ -113,7 +114,8 @@ public:
model->save(true);
// Signal success to a potential MPI runner
model = nullptr; // release any reference to MPI that model may hold
model = nullptr; // release any reference to MPI that model may hold
scheduler = nullptr; // as above
finalizeMPI(std::move(mpi));
}
};

View File

@ -49,7 +49,7 @@ struct SchedulingParameter {
}
double number = utils::parseNumber(param);
res.n = (size_t)number;
ABORT_IF(number != (double)res.n, "Scheduling parameters must be whole numbers");
ABORT_IF(number != (double)res.n, "Scheduling parameters must be whole numbers"); // @TODO: do they?
return res;
}
@ -100,6 +100,7 @@ public:
}
// State-based multiplication factor for learning rate
float factor{1.f};
// @TODO: should also have warmup period here?
SchedulingParameter warmupStart; // has same unit as lr-warmup
// Sum of costs since last display
@ -265,6 +266,7 @@ public:
eta = config["eta"].as<float>();
factor = config["eta-factor"].as<float>();
warmupStart = SchedulingParameter::parse(config["warmup-start"].as<std::string>());
costSum = config["cost-sum"].as<float>();

View File

@ -610,9 +610,9 @@ void SacreBleuValidator::updateStats(std::vector<float>& stats,
ref.push_back(w);
}
LOG_VALID_ONCE(info, "[valid] First sentence's tokens as scored:");
LOG_VALID_ONCE(info, "[valid] Hyp: {}", utils::join(decode(cand, /*addEOS=*/false)));
LOG_VALID_ONCE(info, "[valid] Ref: {}", utils::join(decode(ref, /*addEOS=*/false)));
LOG_VALID_ONCE(info, "First sentence's tokens as scored:");
LOG_VALID_ONCE(info, " Hyp: {}", utils::join(decode(cand, /*addEOS=*/false)));
LOG_VALID_ONCE(info, " Ref: {}", utils::join(decode(ref, /*addEOS=*/false)));
if(useWordIds_)
updateStats(stats, cand, ref);

View File

@ -42,8 +42,8 @@ public:
Ptr<const TrainingState> state) = 0;
virtual std::string type() = 0;
float lastBest() { return lastBest_; }
size_t stalled() { return stalled_; }
float& lastBest() { return lastBest_; }
size_t& stalled() { return stalled_; }
virtual float initScore();
virtual void actAfterLoaded(TrainingState& state) override;

View File

@ -419,10 +419,9 @@ Histories BeamSearch::search(Ptr<ExpressionGraph> graph, Ptr<data::CorpusBatch>
// factoredVocab ? factoredVocab->word2string(prevWords[kk]) : (*batch->back()->vocab())[prevWords[kk]],
// prevScores[kk]);
states[i] = scorers_[i]->step(graph, states[i], hypIndices, prevWords, batchIndices, (int)maxBeamSize);
if (numFactorGroups == 1) // @TODO: this branch can go away
if (numFactorGroups == 1) { // @TODO: this branch can go away
logProbs = states[i]->getLogProbs().getLogits(); // [maxBeamSize, 1, currentDimBatch, dimVocab]
else
{
} else {
auto shortlist = scorers_[i]->getShortlist();
logProbs = states[i]->getLogProbs().getFactoredLogits(factorGroup, shortlist); // [maxBeamSize, 1, currentDimBatch, dimVocab]
}

View File

@ -13,12 +13,19 @@ private:
size_t beamSize_;
Ptr<const Vocab> trgVocab_;
const float INVALID_PATH_SCORE = std::numeric_limits<float>::lowest(); // @TODO: observe this closely
const float INVALID_PATH_SCORE;
const bool PURGE_BATCH = true; // @TODO: diagnostic, to-be-removed once confirmed there are no issues.
static float chooseInvalidPathScore(Ptr<Options> options) {
auto prec = options->get<std::vector<std::string>>("precision", {"float32"});
auto computeType = typeFromString(prec[0]);
return NumericLimits<float>(computeType).lowest;
}
public:
BeamSearch(Ptr<Options> options, const std::vector<Ptr<Scorer>>& scorers, const Ptr<const Vocab> trgVocab)
: options_(options), scorers_(scorers), beamSize_(options_->get<size_t>("beam-size")), trgVocab_(trgVocab)
: options_(options), scorers_(scorers), beamSize_(options_->get<size_t>("beam-size")), trgVocab_(trgVocab),
INVALID_PATH_SCORE{chooseInvalidPathScore(options)}
{}
// combine new expandedPathScores and previous beams into new set of beams