Skip to content

Commit

Permalink
Auto-enable distributed mb reading and parallel train
Browse files Browse the repository at this point in the history
  * the default value for parallelTrain must automatically be true
  when running under MPI with >1 node, else false
  * same for distributedReading when running with a new (v2) reader
  • Loading branch information
Alexey Reznichenko committed Oct 2, 2016
1 parent d1ad5fc commit e0fea5b
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 8 deletions.
14 changes: 10 additions & 4 deletions Source/ActionsLib/EvalActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ using namespace std;
using namespace Microsoft::MSR;
using namespace Microsoft::MSR::CNTK;

bool GetDistributedMBReadingDefaultValue(const ConfigParameters& config, const IDataReader& reader)
{
// Return 'true' if we're running a parallel training with a v2 reader, 'false' otherwise.
return (MPIWrapper::GetInstance() != nullptr && !reader.IsLegacyReader());
}

// ===========================================================================
// DoEvalBase() - implements CNTK "eval" command
// ===========================================================================
Expand All @@ -62,7 +68,7 @@ static void DoEvalBase(const ConfigParameters& config, IDataReader& reader)
size_t maxSamplesInRAM = config(L"maxSamplesInRAM", (size_t)SIZE_MAX);
size_t numSubminiBatches = config(L"numSubminibatches", (size_t)1);

bool enableDistributedMBReading = config(L"distributedMBReading", false);
bool enableDistributedMBReading = config(L"distributedMBReading", GetDistributedMBReadingDefaultValue(config, reader));

vector<wstring> evalNodeNamesVector;

Expand Down Expand Up @@ -104,7 +110,7 @@ static void DoEvalBNBase(const ConfigParameters& config, IDataReader& reader)
size_t maxSamplesInRAM = config(L"maxSamplesInRAM", (size_t)SIZE_MAX);
size_t numSubminiBatches = config(L"numSubminibatches", (size_t)1);

bool enableDistributedMBReading = config(L"distributedMBReading", false);
bool enableDistributedMBReading = config(L"distributedMBReading", GetDistributedMBReadingDefaultValue(config, reader));

vector<wstring> evalNodeNamesVector;

Expand Down Expand Up @@ -189,8 +195,6 @@ void DoCrossValidate(const ConfigParameters& config)
size_t maxSamplesInRAM = config(L"maxSamplesInRAM", (size_t)SIZE_MAX);
size_t numSubminiBatches = config(L"numSubminibatches", (size_t)1);

bool enableDistributedMBReading = config(L"distributedMBReading", false);

ConfigArray evalNodeNames = config(L"evalNodeNames", "");
vector<wstring> evalNodeNamesVector;
for (int i = 0; i < evalNodeNames.size(); ++i)
Expand All @@ -203,6 +207,8 @@ void DoCrossValidate(const ConfigParameters& config)

DataReader cvDataReader(readerConfig);

bool enableDistributedMBReading = config(L"distributedMBReading", GetDistributedMBReadingDefaultValue(config, cvDataReader));

bool finalModelEvaluated = false;
for (size_t i = cvInterval[0]; i <= cvInterval[2]; i += cvInterval[1])
{
Expand Down
17 changes: 14 additions & 3 deletions Source/CNTK/CNTK.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,14 @@ int wmainWithBS(int argc, wchar_t* argv[]) // called from wmain which is a wrapp
// parallel training
shared_ptr<Microsoft::MSR::CNTK::MPIWrapper> mpi;
auto ensureMPIWrapperCleanup = MakeScopeExit(&MPIWrapper::DeleteInstance);
bool paralleltrain = config(L"parallelTrain", false);
// when running under MPI with more than one node, use 'true' as the default value for parallelTrain,
// 'false' otherwise.
bool paralleltrain = config(L"parallelTrain", (MPIWrapper::GetTotalNumberOfMPINodes() > 1));

if (paralleltrain)
{
mpi = MPIWrapper::GetInstance(true /*create*/);
}

g_shareNodeValueMatrices = config(L"shareNodeValueMatrices", false);

Expand Down Expand Up @@ -684,9 +689,15 @@ int wmainOldCNTKConfig(int argc, wchar_t* argv[])
// The top-level 'parallelTrain' is a bool, not to be confused with the parallelTrain block inside SGD.
shared_ptr<Microsoft::MSR::CNTK::MPIWrapper> mpi;
auto ensureMPIWrapperCleanup = MakeScopeExit(&MPIWrapper::DeleteInstance);
bool paralleltrain = config(L"parallelTrain", "false");

// when running under MPI with more than one node, use 'true' as the default value for parallelTrain,
// 'false' otherwise.
bool paralleltrain = config(L"parallelTrain", (MPIWrapper::GetTotalNumberOfMPINodes() > 1));

if (paralleltrain)
mpi = MPIWrapper::GetInstance(true /*create*/);
{
mpi = MPIWrapper::GetInstance(true /*create*/);
}

g_shareNodeValueMatrices = config(L"shareNodeValueMatrices", false);

Expand Down
17 changes: 17 additions & 0 deletions Source/Common/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,23 @@ bool DataReader::SupportsDistributedMBRead() const
return supportsDistributedMBRead;
}

//IsLegacyReader - Returns true if one of the readers is a legacy reader, false otherwise.
bool DataReader::IsLegacyReader() const
{
for (size_t i = 0; i < m_ioNames.size(); i++)
{
auto currReaderIter = m_dataReaders.find(m_ioNames[i]);
assert(currReaderIter != m_dataReaders.end());

if (currReaderIter->second->IsLegacyReader())
{
return true;
}
}

return false;
}

//StartDistributedMinibatchLoop - Startup a distributed minibatch loop for parallel training
// mbSize - [in] size of the minibatch (number of frames, etc.)
// epoch - [in] epoch number for this loop
Expand Down
7 changes: 7 additions & 0 deletions Source/Common/Include/DataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ class DATAREADER_API IDataReader
return false;
};

// old DataReader architecture
virtual bool IsLegacyReader() const
{
return true;
};

virtual void StartDistributedMinibatchLoop(size_t mbSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples = requestDataSize)
{
if (SupportsDistributedMBRead() || (numSubsets != 1) || (subsetNum != 0))
Expand Down Expand Up @@ -417,6 +423,7 @@ class DataReader : public IDataReader, protected Plugin, public ScriptableObject
virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, size_t requestedEpochSamples = requestDataSize);

virtual bool SupportsDistributedMBRead() const override;
virtual bool IsLegacyReader() const override;
virtual void StartDistributedMinibatchLoop(size_t mbSize, size_t epoch, size_t subsetNum, size_t numSubsets, size_t requestedEpochSamples = requestDataSize) override;

virtual void StartMinibatchLoop(size_t mbSize, size_t epoch, const std::unordered_set<InputStreamDescription>&, size_t requestedEpochSamples = requestDataSize) override;
Expand Down
29 changes: 29 additions & 0 deletions Source/Common/Include/MPIWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ class MPIWrapper : public std::enable_shared_from_this<MPIWrapper>
MPI_Comm_size(MPI_COMM_WORLD, &m_numMPINodes);
m_numNodesInUse = m_numMPINodes;

// Verify that the environment variable used by GetTotalNumberOfMPINodes()
// matches what the MPI API says. There're actually two possible cases:
// 1) when we're running with mpiexec both values have to match;
// 2) when we're running without mpiexec, the former will return 0, and
// the later will be set to 1.
assert((GetTotalNumberOfMPINodes() == 0 && m_numNodesInUse == 1) ||
(GetTotalNumberOfMPINodes() == m_numNodesInUse));

// Applying MPI workaround
s_myRank = m_myRank;
atexit(&MPIWrapper::MPIWorkaroundAtExit);
Expand All @@ -160,6 +168,27 @@ class MPIWrapper : public std::enable_shared_from_this<MPIWrapper>
::Sleep((DWORD)(500 * CurrentNodeRank()));
}

// Note that specifically, this function is such that it does not require
// MPI initialization. Moreover, it can be used without actually loading any
// MPI libs.
// TODO: Once we move to dynamic loading for MPI libs on Linux, move it to utilities.
static int GetTotalNumberOfMPINodes()
{
#ifdef WIN32
const char* p = std::getenv("PMI_SIZE");
#else
const char* p = std::getenv("OMPI_COMM_WORLD_SIZE");
#endif
if (!p)
{
return 0;
}
else
{
return std::stoi(string(p));
}
}

// Note: we don't clear the sub-communication here although we should, because in case of a crash, this prevents the EXE from terminating.
// It's OK since this class is a singleton anyway that gets instantiated exactly once at program startup.
~MPIWrapper()
Expand Down
5 changes: 5 additions & 0 deletions Source/Readers/ReaderLib/ReaderShim.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class ReaderShim : public IDataReader
return true;
}

virtual bool IsLegacyReader() const override
{
return false;
}

virtual bool GetMinibatch(StreamMinibatchInputs& matrices) override;

virtual bool DataEnd() override;
Expand Down
3 changes: 2 additions & 1 deletion Source/SGDLib/DataReaderHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ namespace Microsoft { namespace MSR { namespace CNTK {
// TODO: This must be a runtime check, not an assert().
UNUSED(iter);
}


assert(trainSetDataReader.IsLegacyReader());
DecimateMinibatchInPlace<ElemType>(inputMatrices, mpi->NumNodesInUse(), mpi->CurrentNodeRank(), pMBLayout);
}

Expand Down
14 changes: 14 additions & 0 deletions Source/SGDLib/SGD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ void SGD<ElemType>::TrainOrAdaptModel(int startEpoch, ComputationNetworkPtr net,
}
}

// This code is only relevant for the new (V2) readers. It exist because of
// a shortcoming in DecimateMinibatchInPlace, which does not yet work when inputs
// in the same minibatch have different layouts, which is something only V2 readers can
// produce.
if (m_enableDistributedMBReadingNotSpecified && m_mpi != nullptr && !trainSetDataReader->IsLegacyReader())
{
// we're running a parallel training with a v2 reader,
// auto-enable distributed reading
if (m_traceLevel > 0)
LOGPRINTF(stderr, "\"distributedMBReading\" is not explicitly specified, defaulting to 'true'.\n");
m_enableDistributedMBReading = true;
}

// determine evaluationNodes from GetEvalCriterionNodes(), ensuring each criterion is only logged once
std::vector<ComputationNodeBasePtr> evaluationNodes;
{
Expand Down Expand Up @@ -2710,6 +2723,7 @@ SGDParams::SGDParams(const ConfigRecordType& configSGD, size_t sizeofElemType)
if (m_parallelizationStartEpochNum < 0 /* sic */)
// Be explicit that user-facing epoch numbers are 1-based
InvalidArgument("parallelizationStartEpoch must be greater or equal to 1");
m_enableDistributedMBReadingNotSpecified = !configParallelTrain.Exists(L"distributedMBReading");
m_enableDistributedMBReading = configParallelTrain(L"distributedMBReading", false);
m_syncStatsTrace = configParallelTrain(L"syncPerfStats", (int) 0);

Expand Down
7 changes: 7 additions & 0 deletions Source/SGDLib/SGD.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ struct SGDParams : public ScriptableObjects::Object

ParallelizationMethod m_parallelizationMethod;
bool m_enableDistributedMBReading;
// indicates if we're using default value of the m_enableDistributedMBReading flag
// (in which case, it can potentially be overriden).
// This flag is only relevant for the new (V2) readers. It exist because of
// a shortcoming in DecimateMinibatchInPlace, which does not yet work when inputs
// in the same minibatch have different layouts, which is something only V2 readers can
// produce.
bool m_enableDistributedMBReadingNotSpecified;
int m_parallelizationStartEpochNum;

// decide if/how often we measure and show sync performance stats (seconds spend on sync, seconds since last sync etc.) ?
Expand Down

0 comments on commit e0fea5b

Please sign in to comment.