From a5a17236e23080afae785bb0608435ac3a23f68c Mon Sep 17 00:00:00 2001 From: Amit Agarwal Date: Tue, 22 Mar 2016 23:19:02 -0700 Subject: [PATCH] Addressed some pending CR feedback --- Source/CNTK/CNTK.cpp | 4 ++-- Source/Common/Include/MPIWrapper.h | 7 ++++-- Source/Common/Include/ProgressTracing.h | 18 +++++++++++++++ .../ComputationNetworkEvaluation.cpp | 3 +-- Source/SGDLib/DataReaderHelpers.h | 2 +- Source/SGDLib/IDistGradAggregator.h | 4 ++-- Source/SGDLib/MASGD.h | 6 ++--- Source/SGDLib/SGD.cpp | 11 +--------- Source/SGDLib/SGD.h | 4 ++-- Source/SGDLib/SimpleDistGradAggregator.h | 2 +- Source/SGDLib/SimpleEvaluator.h | 15 +++---------- Source/SGDLib/SimpleOutputWriter.h | 22 ++----------------- 12 files changed, 41 insertions(+), 57 deletions(-) diff --git a/Source/CNTK/CNTK.cpp b/Source/CNTK/CNTK.cpp index 3345ef340a8a..1e0af6deea7e 100644 --- a/Source/CNTK/CNTK.cpp +++ b/Source/CNTK/CNTK.cpp @@ -197,7 +197,7 @@ void DoCommands(const ConfigParameters& config, const shared_ptr& mp std::cerr << "CNTKCommandTrainInfo: CNTKNoMoreCommands_Total : " << fullTotalMaxEpochs << endl; // set up progress tracing for compute cluster management - if (progressTracing && ((mpi == nullptr) || mpi->IsMainNode())) + if (progressTracing && (!mpi || mpi->IsMainNode())) { ProgressTracing::SetTracingFlag(); ProgressTracing::TraceTotalNumberOfSteps(fullTotalMaxEpochs); // enable tracing, using this as the total number of epochs @@ -304,7 +304,7 @@ void DoCommands(const ConfigParameters& config, const shared_ptr& mp ndlScript.ClearGlobal(); // clear global macros between commands // Synchronize all ranks before proceeding to next action/command - if (mpi != nullptr) + if (mpi) mpi->WaitAll(); } } diff --git a/Source/Common/Include/MPIWrapper.h b/Source/Common/Include/MPIWrapper.h index b03d8d007b7a..0411da1bddb3 100644 --- a/Source/Common/Include/MPIWrapper.h +++ b/Source/Common/Include/MPIWrapper.h @@ -50,6 +50,9 @@ static int operator||(int rc, const MpiFail &what) RuntimeError("%s", what.c_str()); } +class MPIWrapper; +typedef std::shared_ptr MPIWrapperPtr; + class MPIWrapper : public std::enable_shared_from_this { int m_myRank; @@ -59,7 +62,7 @@ class MPIWrapper : public std::enable_shared_from_this // MPI communicator that reflects the current subset selection MPI_Comm m_currentComm; - static std::shared_ptr s_mpi; + static MPIWrapperPtr s_mpi; // MPI_Init() with delay-loading the msmpi.dll (possibly causing a failure if missing; we want to catch that) int MPI_Init_DL() @@ -223,7 +226,7 @@ class MPIWrapper : public std::enable_shared_from_this public: - static std::shared_ptr GetInstance(bool create = false) + static MPIWrapperPtr GetInstance(bool create = false) { static bool initialized = false; if (create) diff --git a/Source/Common/Include/ProgressTracing.h b/Source/Common/Include/ProgressTracing.h index 15afd78fe0d7..fba0b8e3cf34 100644 --- a/Source/Common/Include/ProgressTracing.h +++ b/Source/Common/Include/ProgressTracing.h @@ -112,5 +112,23 @@ namespace Microsoft { namespace MSR { namespace CNTK { printf("EVALERR: %.7f%%\n", err); } + + // This prints a PROGRESS message with a percentage value of 0 to prevent timeouts on Philly + // when executing long running non-training operations like PreCompute, CV, Eval, and Write + static size_t TraceFakeProgress(size_t numIterationsBeforePrintingProgress, size_t numItersSinceLastPrintOfProgress) + { + size_t newNumItersSinceLastPrintOfProgress = numItersSinceLastPrintOfProgress; + if (GetTracingFlag()) + { + newNumItersSinceLastPrintOfProgress++; + if (newNumItersSinceLastPrintOfProgress >= numIterationsBeforePrintingProgress) + { + printf("PROGRESS: %.2f%%\n", 0.0f); + newNumItersSinceLastPrintOfProgress = 0; + } + } + + return newNumItersSinceLastPrintOfProgress; + } }; } } } diff --git a/Source/ComputationNetworkLib/ComputationNetworkEvaluation.cpp b/Source/ComputationNetworkLib/ComputationNetworkEvaluation.cpp index ec38d6bf6b18..25f873e22a5c 100644 --- a/Source/ComputationNetworkLib/ComputationNetworkEvaluation.cpp +++ b/Source/ComputationNetworkLib/ComputationNetworkEvaluation.cpp @@ -678,8 +678,7 @@ void ComputationNetwork::MarkValueNonSharableNodes() std::list allPreComputeNodes; for (const auto& node : nodes) { - auto pcnode = dynamic_pointer_cast(node); - if (pcnode) + if (node->Is()) allPreComputeNodes.push_back(node); } diff --git a/Source/SGDLib/DataReaderHelpers.h b/Source/SGDLib/DataReaderHelpers.h index 2044e64f86b1..8d496dd4a32c 100644 --- a/Source/SGDLib/DataReaderHelpers.h +++ b/Source/SGDLib/DataReaderHelpers.h @@ -31,7 +31,7 @@ namespace Microsoft { namespace MSR { namespace CNTK { bool useParallelTrain, StreamMinibatchInputs& inputMatrices, size_t& actualMBSize, - const std::shared_ptr& mpi) + const MPIWrapperPtr& mpi) { auto pMBLayout = net->GetMBLayoutPtr(); // Reading consists of a sequence of Reader API calls: diff --git a/Source/SGDLib/IDistGradAggregator.h b/Source/SGDLib/IDistGradAggregator.h index 5ab7283c6ed4..2b9445fa4061 100644 --- a/Source/SGDLib/IDistGradAggregator.h +++ b/Source/SGDLib/IDistGradAggregator.h @@ -9,7 +9,7 @@ template class IDistGradAggregator { public: - IDistGradAggregator(const std::shared_ptr& mpi) + IDistGradAggregator(const MPIWrapperPtr& mpi) : m_mpi(mpi) { } @@ -37,7 +37,7 @@ class IDistGradAggregator } protected: - std::shared_ptr m_mpi; + MPIWrapperPtr m_mpi; }; #define UsingIDistGradAggregatorMembers \ diff --git a/Source/SGDLib/MASGD.h b/Source/SGDLib/MASGD.h index 92ac5ca312bc..e0b123e9bdfa 100644 --- a/Source/SGDLib/MASGD.h +++ b/Source/SGDLib/MASGD.h @@ -102,7 +102,7 @@ namespace Microsoft { namespace MSR { namespace CNTK { { typedef shared_ptr> ComputationNodePtr; public: - IMASGD(const std::shared_ptr& pMPI, size_t perfReportFreq) + IMASGD(const MPIWrapperPtr& pMPI, size_t perfReportFreq) :m_MAworkerStatus(pMPI->NumNodesInUse(), MAWorkerStatus::NOTSTARTED), m_numSyncPerformed(0), m_numWorkers(pMPI->NumNodesInUse()), @@ -286,7 +286,7 @@ namespace Microsoft { namespace MSR { namespace CNTK { size_t m_numWorkers; size_t m_myRank; MASGDPerfStats m_perfReporter; - std::shared_ptr m_pMPI; + MPIWrapperPtr m_pMPI; }; @@ -299,7 +299,7 @@ namespace Microsoft { namespace MSR { namespace CNTK { using Base::DownCast; public: - BasicModelAveragingSGD(const std::shared_ptr& pMPI, size_t reportFreq) + BasicModelAveragingSGD(const MPIWrapperPtr& pMPI, size_t reportFreq) :Base(pMPI, reportFreq) {} diff --git a/Source/SGDLib/SGD.cpp b/Source/SGDLib/SGD.cpp index eda012b95559..463f2aa37c1e 100644 --- a/Source/SGDLib/SGD.cpp +++ b/Source/SGDLib/SGD.cpp @@ -1323,16 +1323,7 @@ bool SGD::PreCompute(ComputationNetworkPtr net, net->ForwardProp(nodes); - if (ProgressTracing::GetTracingFlag()) - { - numItersSinceLastPrintOfProgress++; - if (numItersSinceLastPrintOfProgress >= numIterationsBeforePrintingProgress) - { - // TODO: For now just print 0.0 instead of calculating actual progress - printf("PROGRESS: %.2f%%\n", 0.0f); - numItersSinceLastPrintOfProgress = 0; - } - } + numItersSinceLastPrintOfProgress = ProgressTracing::TraceFakeProgress(numIterationsBeforePrintingProgress, numItersSinceLastPrintOfProgress); } // finalize diff --git a/Source/SGDLib/SGD.h b/Source/SGDLib/SGD.h index 2ac75433e861..c0e503786ecc 100644 --- a/Source/SGDLib/SGD.h +++ b/Source/SGDLib/SGD.h @@ -236,7 +236,7 @@ struct SGDParams : public ScriptableObjects::Object bool m_useAllDataForPreComputedNode; // Parallel training - std::shared_ptr m_mpi; + MPIWrapperPtr m_mpi; ParallelizationMethod m_parallelizationMethod; bool m_enableDistributedMBReading; @@ -315,7 +315,7 @@ class SGD : public SGDParams { } - void InitMPI(const std::shared_ptr& mpi) + void InitMPI(const MPIWrapperPtr& mpi) { m_mpi = mpi; diff --git a/Source/SGDLib/SimpleDistGradAggregator.h b/Source/SGDLib/SimpleDistGradAggregator.h index 58bf271bb5e1..04d8a558ace1 100644 --- a/Source/SGDLib/SimpleDistGradAggregator.h +++ b/Source/SGDLib/SimpleDistGradAggregator.h @@ -15,7 +15,7 @@ class SimpleDistGradAggregator : public IDistGradAggregator UsingIDistGradAggregatorMembers; public: - SimpleDistGradAggregator(const std::shared_ptr& mpi, bool useAsyncAggregation, int syncStatsTrace) + SimpleDistGradAggregator(const MPIWrapperPtr& mpi, bool useAsyncAggregation, int syncStatsTrace) : IDistGradAggregator(mpi), m_useAsyncAggregation(useAsyncAggregation), m_currentEpochNumber(-1), m_bufferedGradHeader(nullptr), m_syncStatsTrace(syncStatsTrace), m_iterationCount(0) { } diff --git a/Source/SGDLib/SimpleEvaluator.h b/Source/SGDLib/SimpleEvaluator.h index 079e4ca6f6ea..60fa66aab42f 100644 --- a/Source/SGDLib/SimpleEvaluator.h +++ b/Source/SGDLib/SimpleEvaluator.h @@ -31,7 +31,7 @@ template class SimpleEvaluator { public: - SimpleEvaluator(ComputationNetworkPtr net, const std::shared_ptr& mpi, const size_t numMBsToShowResult = 100, const int traceLevel = 0, const size_t maxSamplesInRAM = SIZE_MAX, + SimpleEvaluator(ComputationNetworkPtr net, const MPIWrapperPtr& mpi, const size_t numMBsToShowResult = 100, const int traceLevel = 0, const size_t maxSamplesInRAM = SIZE_MAX, const size_t numSubminiBatches = 1) : m_net(net), m_numMBsToShowResult(numMBsToShowResult), @@ -210,16 +210,7 @@ class SimpleEvaluator } - if (ProgressTracing::GetTracingFlag()) - { - numItersSinceLastPrintOfProgress++; - if (numItersSinceLastPrintOfProgress >= numIterationsBeforePrintingProgress) - { - // TODO: For now just print 0.0 instead of calculating actual progress - printf("PROGRESS: %.2f%%\n", 0.0f); - numItersSinceLastPrintOfProgress = 0; - } - } + numItersSinceLastPrintOfProgress = ProgressTracing::TraceFakeProgress(numIterationsBeforePrintingProgress, numItersSinceLastPrintOfProgress); // call DataEnd to check if end of sentence is reached // datareader will do its necessary/specific process for sentence ending @@ -289,7 +280,7 @@ class SimpleEvaluator size_t m_numMBsToShowResult; size_t m_maxSamplesInRAM; size_t m_numSubminiBatches; - std::shared_ptr m_mpi; + MPIWrapperPtr m_mpi; shared_ptr> m_distGradAgg; struct DistGradHeader* m_gradHeader; diff --git a/Source/SGDLib/SimpleOutputWriter.h b/Source/SGDLib/SimpleOutputWriter.h index 620b2bdaa0d5..f17404f39fcb 100644 --- a/Source/SGDLib/SimpleOutputWriter.h +++ b/Source/SGDLib/SimpleOutputWriter.h @@ -131,16 +131,7 @@ class SimpleOutputWriter totalEpochSamples += actualMBSize; - if (ProgressTracing::GetTracingFlag()) - { - numItersSinceLastPrintOfProgress++; - if (numItersSinceLastPrintOfProgress >= numIterationsBeforePrintingProgress) - { - // TODO: For now just print 0.0 instead of calculating actual progress - printf("PROGRESS: %.2f%%\n", 0.0f); - numItersSinceLastPrintOfProgress = 0; - } - } + numItersSinceLastPrintOfProgress = ProgressTracing::TraceFakeProgress(numIterationsBeforePrintingProgress, numItersSinceLastPrintOfProgress); // call DataEnd function in dataReader to do // reader specific process if sentence ending is reached @@ -292,16 +283,7 @@ class SimpleOutputWriter fprintf(stderr, "Minibatch[%lu]: ActualMBSize = %lu\n", ++numMBsRun, actualMBSize); - if (ProgressTracing::GetTracingFlag()) - { - numItersSinceLastPrintOfProgress++; - if (numItersSinceLastPrintOfProgress >= numIterationsBeforePrintingProgress) - { - // TODO: For now just print 0.0 instead of calculating actual progress - printf("PROGRESS: %.2f%%\n", 0.0f); - numItersSinceLastPrintOfProgress = 0; - } - } + numItersSinceLastPrintOfProgress = ProgressTracing::TraceFakeProgress(numIterationsBeforePrintingProgress, numItersSinceLastPrintOfProgress); // call DataEnd function in dataReader to do // reader specific process if sentence ending is reached