diff --git a/Examples/Image/Classification/ConvNet/Python/ConvNet_CIFAR10_DataAug_Distributed.py b/Examples/Image/Classification/ConvNet/Python/ConvNet_CIFAR10_DataAug_Distributed.py index b3f48550af9d..f14ca3884128 100644 --- a/Examples/Image/Classification/ConvNet/Python/ConvNet_CIFAR10_DataAug_Distributed.py +++ b/Examples/Image/Classification/ConvNet/Python/ConvNet_CIFAR10_DataAug_Distributed.py @@ -115,7 +115,7 @@ def create_trainer(network, epoch_size, num_quantization_bits, block_size, warm_ return cntk.Trainer(network['output'], (network['ce'], network['pe']), parameter_learner) # Train and test -def train_and_test(network, trainer, train_source, test_source, progress_printer, minibatch_size, epoch_size, restore, profiling=False): +def train_and_test(network, trainer, train_source, test_source, progress_writers, minibatch_size, epoch_size, restore, profiling=False): # define mapping from intput streams to network inputs input_map = { @@ -128,7 +128,7 @@ def train_and_test(network, trainer, train_source, test_source, progress_printer trainer = trainer, model_inputs_to_mb_source_mapping = input_map, mb_size_schedule = cntk.minibatch_size_schedule(minibatch_size), - progress_printer = progress_printer, + progress_printer = progress_writers, checkpoint_frequency = epoch_size, checkpoint_filename = os.path.join(model_path, "ConvNet_CIFAR10_DataAug"), # save_all_checkpoints = False, @@ -149,10 +149,12 @@ def train_and_test(network, trainer, train_source, test_source, progress_printer # Train and evaluate the network. def convnet_cifar10_dataaug(train_data, test_data, mean_data, minibatch_size=64, epoch_size=50000, num_quantization_bits=32, - block_size=3200, warm_up=0, max_epochs=2, restore=False, log_to_file=None, - num_mbs_per_log=None, gen_heartbeat=False, profiling=False): + block_size=3200, warm_up=0, max_epochs=2, restore=False, log_to_file=None, + num_mbs_per_log=None, gen_heartbeat=False, profiling=False, tensorboard_logdir=None): _cntk_py.set_computation_network_trace_level(0) + network = create_conv_network() + progress_printer = cntk.utils.ProgressPrinter( freq=num_mbs_per_log, tag='Training', @@ -161,11 +163,17 @@ def convnet_cifar10_dataaug(train_data, test_data, mean_data, minibatch_size=64, gen_heartbeat=gen_heartbeat, num_epochs=max_epochs) - network = create_conv_network() + tensorboard_writer = cntk.utils.TensorBoardProgressWriter( + freq=num_mbs_per_log, + log_dir=tensorboard_logdir if tensorboard_logdir is not None else 'log', + rank=cntk.distributed.Communicator.rank(), + model=network['output']) + trainer = create_trainer(network, epoch_size, num_quantization_bits, block_size, warm_up) train_source = create_image_mb_source(train_data, mean_data, train=True, total_number_of_samples=max_epochs * epoch_size) test_source = create_image_mb_source(test_data, mean_data, train=False, total_number_of_samples=cntk.io.FULL_DATA_SWEEP) - train_and_test(network, trainer, train_source, test_source, progress_printer, minibatch_size, epoch_size, restore, profiling) + train_and_test(network, trainer, train_source, test_source, [progress_printer, tensorboard_writer], minibatch_size, + epoch_size, restore, profiling) if __name__=='__main__': @@ -176,6 +184,7 @@ def convnet_cifar10_dataaug(train_data, test_data, mean_data, minibatch_size=64, parser.add_argument('-datadir', '--datadir', help='Data directory where the CIFAR dataset is located', required=False, default=data_path) parser.add_argument('-outputdir', '--outputdir', help='Output directory for checkpoints and models', required=False, default=None) parser.add_argument('-logdir', '--logdir', help='Log file', required=False, default=None) + parser.add_argument('-tensorboard_logdir', '--tensorboard_logdir', help='Directory where to tensorboard logs should be written', required=False, default='log') parser.add_argument('-n', '--num_epochs', help='Total number of epochs to train', type=int, required=False, default='160') parser.add_argument('-m', '--minibatch_size', help='Minibatch size', type=int, required=False, default='64') parser.add_argument('-e', '--epoch_size', help='Epoch size', type=int, required=False, default='50000') @@ -192,8 +201,6 @@ def convnet_cifar10_dataaug(train_data, test_data, mean_data, minibatch_size=64, model_path = args['outputdir'] + "/models" if args['datadir'] is not None: data_path = args['datadir'] - if args['logdir'] is not None: - log_dir = args['logdir'] if args['device'] is not None: cntk.device.set_default_device(cntk.device.gpu(args['device'])) @@ -213,7 +220,8 @@ def convnet_cifar10_dataaug(train_data, test_data, mean_data, minibatch_size=64, log_to_file=args['logdir'], num_mbs_per_log=100, gen_heartbeat=False, - profiling=args['profile']) + profiling=args['profile'], + tensorboard_logdir=args['tensorboard_logdir']) finally: cntk.distributed.Communicator.finalize() diff --git a/Examples/Tensorboard/LanguageUnderstanding.py b/Examples/Tensorboard/LanguageUnderstanding.py index 5f8ccfa753c2..e786a1834b8d 100644 --- a/Examples/Tensorboard/LanguageUnderstanding.py +++ b/Examples/Tensorboard/LanguageUnderstanding.py @@ -85,7 +85,12 @@ def train(reader, model, max_epochs): low_memory=True, gradient_clipping_threshold_per_sample=15, gradient_clipping_with_truncation=True) - trainer = Trainer(z, (ce, pe), [learner]) + # more detailed logging + progress_printer = ProgressPrinter(freq=100, first=10, tag='Training') + #progress_printer = ProgressPrinter(tag='Training') + tensorboard_writer = TensorBoardProgressWriter(freq=100, log_dir='atis_log', model=z) + + trainer = Trainer(z, (ce, pe), [learner], [progress_printer, tensorboard_writer]) # define mapping from reader streams to network inputs input_map = { @@ -95,9 +100,6 @@ def train(reader, model, max_epochs): # process minibatches and perform model training log_number_of_parameters(z) ; print() - # more detailed logging - progress_printer = ProgressPrinter(freq=100, first=10, tag='Training', tensorboard_log_dir='atis_log', model=z) - #progress_printer = ProgressPrinter(tag='Training') t = 0 for epoch in range(max_epochs): # loop over epochs @@ -107,16 +109,15 @@ def train(reader, model, max_epochs): data = reader.next_minibatch(min(minibatch_size, epoch_end-t), input_map=input_map) # fetch minibatch trainer.train_minibatch(data) # update model with it t += trainer.previous_minibatch_sample_count # count samples processed so far - progress_printer.update_with_trainer(trainer, with_metric=True) # log progress #def trace_node(name): # nl = [n for n in z.parameters if n.name() == name] # if len(nl) > 0: # print (name, np.asarray(nl[0].value)) #trace_node('W') #trace_node('stabilizer_param') - loss, metric, actual_samples = progress_printer.epoch_summary(with_metric=True) + trainer.summarize_training_progress() - return loss, metric + tensorboard_writer.close() ############################# # main function boilerplate # diff --git a/Examples/Tensorboard/SimpleMNIST.py b/Examples/Tensorboard/SimpleMNIST.py index fa2cd96f3f48..50cbb37f5f36 100644 --- a/Examples/Tensorboard/SimpleMNIST.py +++ b/Examples/Tensorboard/SimpleMNIST.py @@ -11,7 +11,7 @@ from cntk.learner import sgd, learning_rate_schedule, UnitType from cntk.ops import input_variable, cross_entropy_with_softmax, classification_error, relu, element_times, constant, \ reduce_max, reduce_mean, reduce_min -from cntk.utils import ProgressPrinter +from cntk.utils import * abs_path = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(abs_path, "..", "..")) @@ -68,13 +68,15 @@ def simple_mnist(): label: reader_train.streams.labels } - lr_per_minibatch = learning_rate_schedule(0.2, UnitType.minibatch) - # Instantiate the trainer object to drive the model training - trainer = Trainer(netout, (ce, pe), sgd(netout.parameters, lr=lr_per_minibatch)) + # Instantiate progress writers. + logdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mnist_log") + tensorboard_writer = TensorBoardProgressWriter(freq=1, log_dir=logdir, model=netout) + progress_printer = ProgressPrinter(freq=10, tag='Training') - # Instantiate a ProgressPrinter. - logdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mnist_log") - progress_printer = ProgressPrinter(tag='Training', freq=1, tensorboard_log_dir=logdir, model=netout) + # Instantiate the trainer object to drive the model training + lr_per_minibatch = learning_rate_schedule(0.2, UnitType.minibatch) + learner = sgd(netout.parameters, lr=lr_per_minibatch) + trainer = Trainer(netout, (ce, pe), learner, [tensorboard_writer, progress_printer]) # Get minibatches of images to train with and perform model training minibatch_size = 64 @@ -85,16 +87,15 @@ def simple_mnist(): for minibatch_idx in range(0, int(num_minibatches_to_train)): trainer.train_minibatch(reader_train.next_minibatch(minibatch_size, input_map=input_map)) - # Take snapshot of loss and eval criterion for the previous minibatch. - progress_printer.update_with_trainer(trainer, with_metric=True) - # Log max/min/mean of each parameter tensor, so that we can confirm that the parameters change indeed. # Don't want to do that very often though, otherwise will spend too much time computing min/max/mean. if minibatch_idx % 10 == 9: for p in netout.parameters: - progress_printer.update_value("mb_" + p.uid + "_max", reduce_max(p).eval(), minibatch_idx) - progress_printer.update_value("mb_" + p.uid + "_min", reduce_min(p).eval(), minibatch_idx) - progress_printer.update_value("mb_" + p.uid + "_mean", reduce_mean(p).eval(), minibatch_idx) + tensorboard_writer.write_value(p.uid + "/max", reduce_max(p).eval(), minibatch_idx) + tensorboard_writer.write_value(p.uid + "/min", reduce_min(p).eval(), minibatch_idx) + tensorboard_writer.write_value(p.uid + "/mean", reduce_mean(p).eval(), minibatch_idx) + + trainer.summarize_training_progress() # Load test data try: @@ -122,6 +123,7 @@ def simple_mnist(): test_result += trainer.test_minibatch(mb) # Average of evaluation errors of all test minibatches + trainer.summarize_test_progress() return test_result / num_minibatches_to_test if __name__ == '__main__': diff --git a/Makefile b/Makefile index 6e88e63cbdcf..66fcaacad65f 100644 --- a/Makefile +++ b/Makefile @@ -464,6 +464,7 @@ CNTKLIBRARY_COMMON_SRC =\ $(SOURCEDIR)/CNTKv2LibraryDll/DistributedLearnerBase.cpp \ $(SOURCEDIR)/CNTKv2LibraryDll/TrainingSession.cpp \ $(SOURCEDIR)/CNTKv2LibraryDll/DataParallelDistributedLearner.cpp \ + $(SOURCEDIR)/CNTKv2LibraryDll/ProgressWriter.cpp \ $(SOURCEDIR)/CNTKv2LibraryDll/proto/CNTK.pb.cc \ $(SOURCEDIR)/CNTKv2LibraryDll/tensorboard/tensorboard.pb.cc \ $(SOURCEDIR)/CNTKv2LibraryDll/tensorboard/TensorBoardFileWriter.cpp \ diff --git a/Source/CNTKv2LibraryDll/API/CNTKLibrary.h b/Source/CNTKv2LibraryDll/API/CNTKLibrary.h index a9474787f107..0c1f13cda2e1 100755 --- a/Source/CNTKv2LibraryDll/API/CNTKLibrary.h +++ b/Source/CNTKv2LibraryDll/API/CNTKLibrary.h @@ -404,6 +404,7 @@ namespace CNTK friend class LearnerBase; friend class Variable; friend class Value; + friend class Accumulator; friend class PackedValue; friend class MPICommunicatorImpl; friend class BlockMomentumDistributedLearner; @@ -2423,6 +2424,12 @@ namespace CNTK } } + /// + /// If the value stored is a scalar, returns it. Otherwise, throws an error. + /// + template + ElementType AsScalar() const; + private: template static void AppendSparseSequenceData(const NDArrayViewPtr& sequenceData, std::vector& colStarts, std::vector& rowIndices, std::vector& nonZeroValues, size_t maxSequenceLength); @@ -4169,34 +4176,24 @@ namespace CNTK size_t PreviousMinibatchSampleCount() const { return m_prevMinibatchNumSamples; } /// - /// Returns the average training loss per sample for accumulated training loss. - /// - CNTK_API double AccumulatedLossAverage() const; - - /// - /// Returns the average evaluation criterion value per sample for accumulated eval criterion. - /// - CNTK_API double AccumulatedEvaluationAverage() const; - - /// - /// Returns the number of samples accumulated + /// Learners associated with this Trainer for updating the model's parameters using computed gradients. /// - size_t AccumulatedSampleCount() const { return m_accumulatedNumSamples; } + CNTK_API const std::vector& ParameterLearners() const; /// - /// Reset the accumulation + /// Total number of samples seen from the begining of the training. /// - CNTK_API void ResetAccumulation(); + CNTK_API size_t TotalNumberOfSamplesSeen() const; /// - /// Learners associated with this Trainer for updating the model's parameters using computed gradients. + /// Writes the summary of training progress and resets the accumulators. /// - CNTK_API const std::vector& ParameterLearners() const; + CNTK_API void SummarizeTrainingProgress(); /// - /// Total number of samples seen from the beginnign of the training. + /// Writes the summary of test progress and resets the accumulators. /// - CNTK_API size_t TotalNumberOfSamplesSeen() const; + CNTK_API void SummarizeTestProgress(); private: template @@ -4208,8 +4205,10 @@ namespace CNTK // TODO: change the public interface to return pair(error, sampleCount) instead of average error. double TestMinibatch(const std::unordered_map& arguments, const DeviceDescriptor& computeDevice, size_t& sampleCount); - Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners); - Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners); + Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners, + const std::vector& progressWriters = {}); + Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners, + const std::vector& progressWriters = {}); void ExecuteForwardBackward( const std::unordered_map& arguments, @@ -4222,7 +4221,9 @@ namespace CNTK void Save(const std::wstring& modelFilePath, const std::vector& learnerState, const Dictionary& externalState); - void AccumulatePrevMinibatch(const DeviceDescriptor& computeDevice); + void UpdateTrainingProgress(size_t numSamples, const ValuePtr& loss, const ValuePtr& evalCriterion, const DeviceDescriptor& computeDevice); + void UpdateTestProgress(size_t numSamples, const ValuePtr& evalCriterion, const DeviceDescriptor& computeDevice); + void AddProgressWriters(const std::vector& progressWriters); FunctionPtr m_combinedTrainingFunction; FunctionPtr m_model; @@ -4242,23 +4243,27 @@ namespace CNTK ValuePtr m_prevMinibatchAggregateTrainingLossValue; ValuePtr m_prevMinibatchAggregateEvalCriterionValue; - size_t m_accumulatedNumSamples; - ValuePtr m_accumulatedTrainingLossValue; - ValuePtr m_accumulatedEvalCriterionValue; + AccumulatorPtr m_aggregatedTrainingLossValue; + AccumulatorPtr m_aggregatedTrainingEvalCriterionValue; + AccumulatorPtr m_aggregatedTestEvalCriterionValue; + + std::unordered_set m_progressWriters; }; /// /// Construct a Trainer to train the specified 'model' with the specified 'trainingLoss' Variable as the training criterion /// and using the specified set of 'parameterLearners' for updating the model's parameters using computed gradients. /// - CNTK_API TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners); + CNTK_API TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners, + const std::vector& progressWriters = {}); /// /// Construct a Trainer to train the specified 'model' with the specified 'trainingLoss' as the training criterion, /// the specified 'evaluationFunction' as the criterion for evaluating the trained model's quality, and using the specified set /// of 'parameterLearners' for updating the model's parameters using computed gradients. /// - CNTK_API TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners); + CNTK_API TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners, + const std::vector& progressWriters = {}); } namespace std { @@ -4688,7 +4693,8 @@ namespace CNTK bool restoreFromCheckpointIfExists = true, bool keepExistingCheckpoints = false, size_t maxNumberOfTrainingSamples = std::numeric_limits::max(), - size_t progressFrequency = std::numeric_limits::max()); + size_t progressFrequency = std::numeric_limits::max(), + const std::vector& progressWriters = {}); /// /// Runs the session. @@ -4742,11 +4748,6 @@ namespace CNTK /// CNTK_API virtual void OnCrossValidationEnd(size_t /*validationIndex*/, double /*averageError*/, size_t /*numberOfSamples*/, size_t /*numberOfMinibatches*/) {}; - /// - /// Optionally overridable callback that is invoked with progress frequency. - /// - CNTK_API virtual void OnProgress(size_t /*index*/) {}; - protected: /// /// Accessors. @@ -4809,10 +4810,107 @@ namespace CNTK bool restoreFromCheckpointIfExists = true, bool keepExistingCheckpoints = false, size_t maxNumberOfTrainingSamples = std::numeric_limits::max(), - size_t progressFrequency = std::numeric_limits::max()); - + size_t progressFrequency = std::numeric_limits::max(), + const std::vector& progressWriters = {}); CNTK_API void PrintBuiltInfo(); + + /// + /// Base class for all classes that want to record training/evaluation progress. + /// + class ProgressWriter + { + public: + /// + /// Constructor. + /// + /// The frequency arguments control a schedule on which the training/evaluation progress updates are written. + /// The frequency value of 0 specifies geometric schedule, i.e. write progress after 1, 2, 4, 8, 16... updates. + /// The frequency value other than zero specifies arithmetic schedule, i.e. write progress after each + /// 'frequency' updates. + /// + /// The firstUpdatesToWrite arguments only apply on arithemetic schedule. If specified, the first + /// 'firstUpdatesToWrite' updates will be written explicitly before using an arithmetic schedule. + /// + CNTK_API ProgressWriter(size_t trainingUpdateWriteFrequency, size_t trainingFirstUpdatesToWrite, + size_t testUpdateWriteFrequency, size_t testFirstUpdatesToWrite); + + /// + /// Destructor. + /// + CNTK_API virtual ~ProgressWriter(); + + /// + /// Actually outputs information about the update in training progress. Overridable in derived classes. + /// + CNTK_API virtual void OnWriteTrainingUpdate(const std::pair& /*samples*/, + const std::pair& /*updates*/, + const std::pair& /*aggregateLoss*/, + const std::pair& /*aggregateMetric*/) {}; + + /// + /// Actually outputs information about the update in evaluation progress. Overridable in derived classes. + /// + CNTK_API virtual void OnWriteTestUpdate(const std::pair& /*samples*/, + const std::pair& /*updates*/, + const std::pair& /*aggregateMetric*/) {}; + + /// + /// Called after each training update, regardless whether the actual write is needed. + /// + CNTK_API virtual void OnTrainingUpdateEnd() {}; + + /// + /// Actually outputs information about the summary of training progress. Overridable in derived classes. + /// + CNTK_API virtual void OnWriteTrainingSummary(size_t /*samples*/, size_t /*updates*/, size_t /*summaries*/, + double /*aggregateLoss*/, double /*aggregateMetric*/, + size_t /*elapsedMilliseconds*/) {}; + + /// + /// Actually outputs information about the summary of evaluation progress. Overridable in derived classes. + /// + CNTK_API virtual void OnWriteTestSummary(size_t /*samples*/, size_t /*updates*/, size_t /*summaries*/, + double /*aggregateMetric*/, size_t /*elapsedMilliseconds*/) {}; + + /// + /// Returns the total number of training progress updates received by the progress writer. + /// + CNTK_API size_t TotalTrainingUpdates() const; + + /// + /// Returns the total number of evaluation progress updates received by the progress writer. + /// + CNTK_API size_t TotalTestUpdates() const; + + /// + /// Updates the writer with the accumulated loss/metric since the start of training. + /// + void UpdateTraining(size_t numSamples, const ValuePtr& accumulatedLoss, const ValuePtr& accumulatedMetric); + + /// + /// Updates the writer with the accumulated metric since the start of evaluation. + /// + void UpdateTest(size_t numSamples, const ValuePtr& accumulatedMetric); + + /// + /// Writes a summary of training progress since the last call to this function. + /// + void WriteTrainingSummary(const ValuePtr& accumulatedLoss, const ValuePtr& accumulatedMetric); + + /// + /// Writes a summary of evaluation progress since the last call to this function. + /// + void WriteTestSummary(const ValuePtr& accumulatedMetric); + + private: + // Disallow copy and move construction and assignment + ProgressWriter(const ProgressWriter&) = delete; ProgressWriter(ProgressWriter&&) = delete; ProgressWriter& operator=(const ProgressWriter&) = delete; ProgressWriter& operator=(ProgressWriter&&) = delete; + + class Impl; + std::unique_ptr m_training; + std::unique_ptr m_test; + }; } diff --git a/Source/CNTKv2LibraryDll/API/CNTKLibraryInternals.h b/Source/CNTKv2LibraryDll/API/CNTKLibraryInternals.h index a238d8841a79..ceedb739ecb3 100644 --- a/Source/CNTKv2LibraryDll/API/CNTKLibraryInternals.h +++ b/Source/CNTKv2LibraryDll/API/CNTKLibraryInternals.h @@ -201,6 +201,12 @@ namespace CNTK class Trainer; typedef std::shared_ptr TrainerPtr; + class ProgressWriter; + typedef std::shared_ptr ProgressWriterPtr; + + class Accumulator; + typedef std::shared_ptr AccumulatorPtr; + namespace Internal { CNTK_API FunctionPtr IsWithin(const Variable& operand, int offset, const std::wstring& name = L""); diff --git a/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj b/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj index 1f2397097236..9e1763ee4747 100644 --- a/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj +++ b/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj @@ -168,6 +168,7 @@ Create + diff --git a/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj.filters b/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj.filters index 6a6263d1e22d..aa6fe5615626 100644 --- a/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj.filters +++ b/Source/CNTKv2LibraryDll/CNTKv2LibraryDll.vcxproj.filters @@ -34,6 +34,7 @@ tensorboard + diff --git a/Source/CNTKv2LibraryDll/ProgressWriter.cpp b/Source/CNTKv2LibraryDll/ProgressWriter.cpp new file mode 100644 index 000000000000..b972c7c02bf1 --- /dev/null +++ b/Source/CNTKv2LibraryDll/ProgressWriter.cpp @@ -0,0 +1,192 @@ +// +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// + +#include "stdafx.h" +#include "CNTKLibrary.h" +#include "Utils.h" + +#include + +namespace CNTK +{ + class ProgressWriter::Impl + { + public: + Impl(size_t updateWriteFrequency, size_t firstUpdatesToWrite) + : m_frequency(updateWriteFrequency), m_firstN(firstUpdatesToWrite), + m_totalUpdates(0), m_totalSummaries(0) + { + Reset(); + } + + template + void Update(size_t samples, const ValuePtr& accumulatedLoss, const ValuePtr& accumulatedMetric, + OnWriteUpdateFunc callback) + { + if (samples == 0) + { + return; + } + + m_samples.second += samples; + m_updates.second++; + m_totalUpdates++; + + if (ShouldWriteUpdate(m_updates.second)) + { + // Time to output the accumulated updates. + // Note that we take snapshot of the accumulated loss/metric only when we want to write. + // We do it this way on purpose, since accumulated loss/metric may be stored on a GPU + // and we want to minimize the number of GPU->CPU data transfers. + if (accumulatedLoss) + { + m_loss.second = accumulatedLoss->AsScalar(); + } + + if (accumulatedMetric) + { + m_metric.second = accumulatedMetric->AsScalar(); + } + + callback(m_samples, m_updates, m_loss, m_metric); + + // Reset the window. + m_loss.first = m_loss.second; + m_metric.first = m_metric.second; + m_samples.first = m_samples.second; + m_updates.first = m_updates.second; + } + } + + template + void WriteSummary(const ValuePtr& accumulatedLoss, const ValuePtr& accumulatedMetric, + OnWriteSummaryFunc callback) + { + if (accumulatedLoss && m_samples.second > 0) + { + m_loss.second = accumulatedLoss->AsScalar(); + } + + if (accumulatedMetric && m_samples.second > 0) + { + m_metric.second = accumulatedMetric->AsScalar(); + } + + m_totalSummaries++; + auto now = std::chrono::high_resolution_clock::now(); + size_t durationMs = std::chrono::duration_cast(now - m_lastResetTime).count(); + + callback(m_samples.second, m_updates.second, m_totalSummaries, m_loss.second, m_metric.second, durationMs); + + Reset(); + } + + size_t TotalUpdates() const + { + return m_totalUpdates; + } + + private: + bool ShouldWriteUpdate(size_t update) const + { + if (m_frequency == 0) + { + // Geometric schedule - write at every 2^(i) steps, with i = 1, 2, 3, ... + return ((update + 1) & update) == 0; + } + + // Arithmetic schedule - write at every m_frequency steps or if the update is one of the first m_firstN + // updates. + return update % m_frequency == 0 || update <= m_firstN; + } + + void Reset() + { + m_loss = { 0.0, 0.0 }; + m_metric = { 0.0, 0.0 }; + m_samples = { 0, 0 }; + m_updates = { 0, 0 }; + m_lastResetTime = std::chrono::high_resolution_clock::now(); + } + + const size_t m_frequency; + const size_t m_firstN; + + // (start, end) values in the current window to be reported. + std::pair m_loss; + std::pair m_metric; + std::pair m_samples; + std::pair m_updates; + + size_t m_totalUpdates; + size_t m_totalSummaries; + std::chrono::time_point m_lastResetTime; + }; + + ProgressWriter::ProgressWriter(size_t trainingUpdateWriteFrequency, size_t trainingFirstUpdatesToWrite, + size_t testUpdateWriteFrequency, size_t testFirstUpdatesToWrite) + : m_training(std::make_unique(trainingUpdateWriteFrequency, trainingFirstUpdatesToWrite)), + m_test(std::make_unique(testUpdateWriteFrequency, testFirstUpdatesToWrite)) + { + } + + ProgressWriter::~ProgressWriter() + { + } + + void ProgressWriter::UpdateTraining(size_t samples, const ValuePtr& accumulatedLoss, + const ValuePtr& accumulatedMetric) + { + m_training->Update(samples, accumulatedLoss, accumulatedMetric, + [this](const std::pair samples, std::pair updates, + const std::pair aggregateLoss, std::pair aggregateMetric) + { + OnWriteTrainingUpdate(samples, updates, aggregateLoss, aggregateMetric); + }); + OnTrainingUpdateEnd(); + } + + void ProgressWriter::UpdateTest(size_t samples, const ValuePtr& accumulatedMetric) + { + m_test->Update(samples, nullptr, accumulatedMetric, + [this](const std::pair samples, std::pair updates, + const std::pair /*aggregateLoss*/, std::pair aggregateMetric) + { + OnWriteTestUpdate(samples, updates, aggregateMetric); + }); + } + + void ProgressWriter::WriteTrainingSummary(const ValuePtr& accumulatedLoss, const ValuePtr& accumulatedMetric) + { + m_training->WriteSummary( + accumulatedLoss, accumulatedMetric, + [this](size_t samples, size_t updates, size_t summaries, double aggregateLoss, double aggregateMetric, + uint64_t elapsedMs) + { + OnWriteTrainingSummary(samples, updates, summaries, aggregateLoss, aggregateMetric, elapsedMs); + }); + } + + void ProgressWriter::WriteTestSummary(const ValuePtr& accumulatedMetric) + { + m_test->WriteSummary( + nullptr, accumulatedMetric, + [this](size_t samples, size_t updates, size_t summaries, double /*aggregateLoss*/, double aggregateMetric, + uint64_t elapsedMs) + { + OnWriteTestSummary(samples, updates, summaries, aggregateMetric, elapsedMs); + }); + } + + size_t ProgressWriter::TotalTrainingUpdates() const + { + return m_training->TotalUpdates(); + } + + size_t ProgressWriter::TotalTestUpdates() const + { + return m_test->TotalUpdates(); + } +} diff --git a/Source/CNTKv2LibraryDll/Trainer.cpp b/Source/CNTKv2LibraryDll/Trainer.cpp index 547f0a861986..e3bcaa61e39a 100644 --- a/Source/CNTKv2LibraryDll/Trainer.cpp +++ b/Source/CNTKv2LibraryDll/Trainer.cpp @@ -17,18 +17,25 @@ namespace namespace CNTK { - Trainer::Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners) - : Trainer(model, lossFunction, nullptr, parameterLearners) + Trainer::Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, + const std::vector& parameterLearners, + const std::vector& progressWriters) + : Trainer(model, lossFunction, nullptr, parameterLearners, progressWriters) {} - Trainer::Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners) + Trainer::Trainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, + const std::vector& parameterLearners, + const std::vector& progressWriters) : m_model(model), m_lossFunction(lossFunction), m_evaluationFunction(evaluationFunction), m_parameterLearners(std::make_shared(parameterLearners)), m_prevMinibatchNumSamples(1), m_distributed(false), - m_accumulatedNumSamples(0) + m_aggregatedTrainingLossValue(std::make_shared()), + m_aggregatedTrainingEvalCriterionValue(), + m_aggregatedTestEvalCriterionValue(), + m_progressWriters(progressWriters.begin(), progressWriters.end()) { // By default we set the number of threads to hardware concurrency. if (!Internal::MaxNumCPUThreadsSet()) @@ -69,6 +76,9 @@ namespace CNTK if ((m_testSampleCountVar != m_trainingSampleCountVar) && (model->Output() != m_testSampleCountVar)) combinedFunctionArgs.push_back(m_testSampleCountVar); } + + m_aggregatedTrainingEvalCriterionValue = std::make_shared(); + m_aggregatedTestEvalCriterionValue = std::make_shared(); } m_combinedTrainingFunction = Combine(combinedFunctionArgs); @@ -98,35 +108,6 @@ namespace CNTK m_distributed = m_parameterLearners->IsDistributed(); } - static double GetScalarValue(const ValuePtr& value) - { - if (value->Mask()) - LogicError("Scalar Value object cannot have an associated mask"); - - auto scalarData = value->Data(); - if (scalarData->Shape().TotalSize() != 1) - LogicError("Scalar Value object's has a size > 1"); - - double scalar = std::numeric_limits::quiet_NaN(); - NDArrayViewPtr cpuData; - if (scalarData->Device() == DeviceDescriptor::CPUDevice()) - cpuData = scalarData; - else - { - cpuData = std::make_shared(scalarData->GetDataType(), scalarData->Shape(), CNTK::DeviceDescriptor::CPUDevice()); - cpuData->CopyFrom(*scalarData); - } - - if (scalarData->GetDataType() == DataType::Float) - scalar = *(cpuData->DataBuffer()); - else if (scalarData->GetDataType() == DataType::Double) - scalar = *(cpuData->DataBuffer()); - else - LogicError("Unsupported DataType of training loss value"); - - return scalar; - } - static size_t GetSampleCount(const Variable& var, const ValuePtr& value) { auto valueDataShape = value->Shape(); @@ -164,8 +145,7 @@ namespace CNTK double Trainer::TestMinibatch(const std::unordered_map& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/) { size_t sampleCount = 0; - auto accumulatedError = TestMinibatch(arguments, computeDevice, sampleCount); - return accumulatedError / sampleCount; + return TestMinibatch(arguments, computeDevice, sampleCount); } double Trainer::TestMinibatch(const std::unordered_map& arguments, const DeviceDescriptor& computeDevice, size_t& sampleCount) @@ -177,8 +157,15 @@ namespace CNTK std::unordered_map outputs = { { m_aggregatedEvaluationFunction, nullptr }, { m_testSampleCountVar, nullptr } }; m_combinedTrainingFunction->Forward(arguments, outputs, computeDevice); + const ValuePtr& aggregateEvalCriterionValue = outputs[m_aggregatedEvaluationFunction]; sampleCount = GetSampleCount(m_testSampleCountVar, outputs[m_testSampleCountVar]); - return GetScalarValue(outputs[m_aggregatedEvaluationFunction]); + + UpdateTestProgress(sampleCount, aggregateEvalCriterionValue, computeDevice); + + // TODO: it is not optimal to return average evaluation after each minibatch, since it potentially requires a + // roundtrip to GPU. A better approach would be to have a separate method to return the average evaluation on + // demand, as done for training. However, removing the below return is an API breaking change. + return aggregateEvalCriterionValue->AsScalar() / sampleCount; } bool Trainer::TrainMinibatch(const std::unordered_map& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/) @@ -191,9 +178,14 @@ namespace CNTK { auto profMinibatch = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainMinibatch); - if (!m_distributed) - return TrainLocalMinibatch(GetInputs(arguments), outputsToFetch, IsAtSweepEnd(arguments), computeDevice); - return TrainDistributedMinibatch(GetInputs(arguments), outputsToFetch, IsAtSweepEnd(arguments), computeDevice); + bool result = (!m_distributed) ? + TrainLocalMinibatch(GetInputs(arguments), outputsToFetch, IsAtSweepEnd(arguments), computeDevice) : + TrainDistributedMinibatch(GetInputs(arguments), outputsToFetch, IsAtSweepEnd(arguments), computeDevice); + + // TODO: exclude updating progress writers from profiling? + UpdateTrainingProgress(m_prevMinibatchNumSamples, m_prevMinibatchAggregateTrainingLossValue, + m_prevMinibatchAggregateEvalCriterionValue, computeDevice); + return result; } bool Trainer::TrainMinibatch(const std::unordered_map& arguments, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/) @@ -206,9 +198,14 @@ namespace CNTK { auto profMinibatch = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainMinibatch); - if (!m_distributed) - return TrainLocalMinibatch(arguments, outputsToFetch, false, computeDevice); - return TrainDistributedMinibatch(arguments, outputsToFetch, false, computeDevice); + bool result = (!m_distributed) ? + TrainLocalMinibatch(arguments, outputsToFetch, false, computeDevice) : + TrainDistributedMinibatch(arguments, outputsToFetch, false, computeDevice); + + // TODO: exclude updating progress writers from profiling? + UpdateTrainingProgress(m_prevMinibatchNumSamples, m_prevMinibatchAggregateTrainingLossValue, + m_prevMinibatchAggregateEvalCriterionValue, computeDevice); + return result; } bool Trainer::TrainLocalMinibatch(const std::unordered_map& arguments, std::unordered_map& outputsToFetch, bool sweepEnd, const DeviceDescriptor& computeDevice /*= DeviceDescriptor::UseDefaultDevice()*/) @@ -220,8 +217,6 @@ namespace CNTK std::unordered_map parameterGradients; ExecuteForwardBackward(arguments, outputsToFetch, computeDevice, parameterGradients); - AccumulatePrevMinibatch(computeDevice); - auto profWeights = Microsoft::MSR::CNTK::ScopeProfile(Microsoft::MSR::CNTK::profilerEvtMainWeights); std::unordered_map gradients; @@ -268,85 +263,79 @@ namespace CNTK m_prevMinibatchAggregateTrainingLossValue = std::make_shared(info.trainingLossValue); } - // must accumulate after aggregation - AccumulatePrevMinibatch(computeDevice); - return updated; } - double Trainer::AccumulatedLossAverage() const + void Trainer::UpdateTrainingProgress(size_t numSamples, const ValuePtr& loss, const ValuePtr& evalCriterion, + const DeviceDescriptor& computeDevice) { - return m_accumulatedNumSamples == 0 ? 0 : (GetScalarValue(m_accumulatedTrainingLossValue) / m_accumulatedNumSamples); - } + if (numSamples == 0) + { + return; + } - double Trainer::AccumulatedEvaluationAverage() const - { - if (!m_evaluationFunction) - InvalidArgument("Trainer::AccumulatedEvaluationAverage: Cannot get evaluation criterion value when no evaluation function was specified during 'this' trainer's construction"); + m_aggregatedTrainingLossValue->Update(loss, computeDevice); + + if (m_aggregatedTrainingEvalCriterionValue) + { + m_aggregatedTrainingEvalCriterionValue->Update(evalCriterion, computeDevice); + } - return m_accumulatedNumSamples == 0 ? 0 : (GetScalarValue(m_accumulatedEvalCriterionValue) / m_accumulatedNumSamples); + for (auto& progressWriter : m_progressWriters) + { + progressWriter->UpdateTraining(numSamples, m_aggregatedTrainingLossValue, m_aggregatedTrainingEvalCriterionValue); + } } - static void ResetToZero(ValuePtr& v) + void Trainer::SummarizeTrainingProgress() { - if (v == nullptr) return; + for (auto& progressWriter : m_progressWriters) + { + progressWriter->WriteTrainingSummary(m_aggregatedTrainingLossValue, m_aggregatedTrainingEvalCriterionValue); + } - if (v->GetDataType() == DataType::Float) - v->Data()->SetValue(0.0f); - else - v->Data()->SetValue(0.0); - } + m_aggregatedTrainingLossValue->Reset(); - void Trainer::ResetAccumulation() - { - ResetToZero(m_accumulatedTrainingLossValue); - ResetToZero(m_accumulatedEvalCriterionValue); - m_accumulatedNumSamples = 0; + if (m_aggregatedTrainingEvalCriterionValue) + { + m_aggregatedTrainingEvalCriterionValue->Reset(); + } } - void Trainer::AccumulatePrevMinibatch(const DeviceDescriptor& computeDevice) + void Trainer::UpdateTestProgress(size_t numSamples, const ValuePtr& evalCriterion, const DeviceDescriptor& computeDevice) { - if (m_prevMinibatchNumSamples == 0) return; - - auto CreateIfDifferent_Add = [computeDevice](ValuePtr& accumulated, const ValuePtr& value) -> bool + if (numSamples == 0) { - bool created = false; - - if (!accumulated || - accumulated->GetDataType() != value->GetDataType() || - accumulated->Shape() != value->Shape() || - accumulated->Device() != computeDevice || - accumulated->Mask() != value->Mask()) - { - created = true; - accumulated = MakeSharedObject( - MakeSharedObject( - value->GetDataType(), - value->Shape(), - computeDevice), - value->Mask()); - - ResetToZero(accumulated); - } + return; + } - if (accumulated->GetDataType() == DataType::Float) - accumulated->Data()->GetWritableTensorView()->AddCopyOf(*value->Data()->GetTensorView()); - else - accumulated->Data()->GetWritableTensorView()->AddCopyOf(*value->Data()->GetTensorView()); + if (m_aggregatedTestEvalCriterionValue) + { + m_aggregatedTestEvalCriterionValue->Update(evalCriterion, computeDevice); + } - return created; - }; + for (auto& progressWriter : m_progressWriters) + { + progressWriter->UpdateTest(numSamples, m_aggregatedTestEvalCriterionValue); + } + } - bool createdLoss = CreateIfDifferent_Add(m_accumulatedTrainingLossValue, m_prevMinibatchAggregateTrainingLossValue); - bool createdEval = - m_aggregatedEvaluationFunction ? - CreateIfDifferent_Add(m_accumulatedEvalCriterionValue, m_prevMinibatchAggregateEvalCriterionValue) : - false; + void Trainer::SummarizeTestProgress() + { + for (auto& progressWriter : m_progressWriters) + { + progressWriter->WriteTestSummary(m_aggregatedTestEvalCriterionValue); + } - if ((createdLoss || createdEval) && m_accumulatedNumSamples != 0) - RuntimeError("Accumulation values are created when accumulated num samples not zero"); + if (m_aggregatedTestEvalCriterionValue) + { + m_aggregatedTestEvalCriterionValue->Reset(); + } + } - m_accumulatedNumSamples += m_prevMinibatchNumSamples; + void Trainer::AddProgressWriters(const std::vector& progressWriters) + { + m_progressWriters.insert(progressWriters.begin(), progressWriters.end()); } void Trainer::ExecuteForwardBackward(const std::unordered_map& arguments, std::unordered_map& outputsToFetch, const DeviceDescriptor& computeDevice, std::unordered_map& parameterGradients) @@ -478,7 +467,7 @@ namespace CNTK if (m_prevMinibatchNumSamples == 0) RuntimeError("There was no preceeding call to TrainMinibatch or the minibatch was empty."); - return (GetScalarValue(m_prevMinibatchAggregateTrainingLossValue) / m_prevMinibatchNumSamples); + return m_prevMinibatchAggregateTrainingLossValue->AsScalar() / m_prevMinibatchNumSamples; } double Trainer::PreviousMinibatchEvaluationAverage() const @@ -489,7 +478,7 @@ namespace CNTK if (m_prevMinibatchNumSamples == 0) RuntimeError("There was no preceeding call to TrainMinibatch or the minibatch was empty."); - return (GetScalarValue(m_prevMinibatchAggregateEvalCriterionValue) / m_prevMinibatchNumSamples); + return m_prevMinibatchAggregateEvalCriterionValue->AsScalar() / m_prevMinibatchNumSamples; } const std::vector& Trainer::ParameterLearners() const @@ -502,13 +491,15 @@ namespace CNTK return m_parameterLearners->ParameterLearners().front()->TotalNumberOfSamplesSeen(); } - TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners) + TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const std::vector& parameterLearners, + const std::vector& progressWriters) { - return MakeSharedObject(model, lossFunction, parameterLearners); + return MakeSharedObject(model, lossFunction, parameterLearners, progressWriters); } - TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners) + TrainerPtr CreateTrainer(const FunctionPtr& model, const FunctionPtr& lossFunction, const FunctionPtr& evaluationFunction, const std::vector& parameterLearners, + const std::vector& progressWriters) { - return MakeSharedObject(model, lossFunction, evaluationFunction, parameterLearners); + return MakeSharedObject(model, lossFunction, evaluationFunction, parameterLearners, progressWriters); } } diff --git a/Source/CNTKv2LibraryDll/TrainingSession.cpp b/Source/CNTKv2LibraryDll/TrainingSession.cpp index b45f704681c5..bce7693affc2 100644 --- a/Source/CNTKv2LibraryDll/TrainingSession.cpp +++ b/Source/CNTKv2LibraryDll/TrainingSession.cpp @@ -35,7 +35,8 @@ namespace CNTK bool restoreFromCheckpointIfExists, bool saveAllCheckpoints, size_t maxNumberOfSamples, - size_t progressFrequency) + size_t progressFrequency, + const std::vector& progressWriters) { return MakeSharedObject(trainingSource, trainer, @@ -49,7 +50,8 @@ namespace CNTK restoreFromCheckpointIfExists, saveAllCheckpoints, maxNumberOfSamples, - progressFrequency); + progressFrequency, + progressWriters); } TrainingSession::TrainingSession( @@ -65,7 +67,8 @@ namespace CNTK bool restoreFromCheckpointIfExists, bool saveAllCheckpoints, size_t maxNumberOfSamples, - size_t progressFrequencyInSamples) : + size_t progressFrequencyInSamples, + const std::vector& progressWriters) : m_trainingSource(trainingSource), m_trainer(trainer), m_modelInputToMinibatchSourceStream(modelInputToMinibatchSourceStream), @@ -123,7 +126,7 @@ namespace CNTK m_actions.push_back({ checkpointFrequencyInSamples, 0, 0, [this](size_t currentIndex, const DeviceDescriptor&) { - SaveCheckpoint(currentIndex); + SaveCheckpoint(currentIndex); // enable profiler after the first checkpoint // This has effect only if the profiler is globally enabled by StartProfiler() Microsoft::MSR::CNTK::ProfilerEnable(true); @@ -136,6 +139,8 @@ namespace CNTK if (progressFrequencyInSamples != 0) m_actions.push_back({ progressFrequencyInSamples, 0, 0, [this](size_t currentIndex, const DeviceDescriptor&) { ReportProgress(currentIndex); } }); + + m_trainer->AddProgressWriters(progressWriters); } void TrainingSession::Train(const DeviceDescriptor& computeDevice) @@ -158,9 +163,12 @@ namespace CNTK size_t samplesLeft = m_maxNumberOfSamples > m_trainer->TotalNumberOfSamplesSeen() ? m_maxNumberOfSamples - m_trainer->TotalNumberOfSamplesSeen() : 0; + + // Note that in case of distributed training we don't want to stop if the local minibatch + // is empty - it is possible that the other workers are still processing their minibatches. GetTrainingMinibatch(minibatch, samplesLeft, computeDevice); - // Train on the minibatch + // Train on the minibatch. OnMinibatchStart(); shouldTrain = m_trainer->TrainMinibatch(minibatch, computeDevice); OnMinibatchEnd(); @@ -212,19 +220,21 @@ namespace CNTK size_t sampleCount = 0; while(GetCrossValidationMinibatch(minibatch, m_crossValidationSchedule[sampleCount], computeDevice), !minibatch.empty()) { + // TODO: it may be slow to rely on TestMinibatch to return error each time, since it may require transfer + // of error from the GPU each time. error = m_trainer->TestMinibatch(minibatch, computeDevice, sampleCount); - accumulatedError += error; + accumulatedError += error * sampleCount; totalNumberOfSamples += sampleCount; numberOfMinibatches++; } m_crossValidationSource->RestoreFromCheckpoint(checkpoint); - + m_trainer->SummarizeTestProgress(); OnCrossValidationEnd(currentIndex, accumulatedError / totalNumberOfSamples, totalNumberOfSamples, numberOfMinibatches); } - inline void TrainingSession::ReportProgress(size_t currentIndex) + inline void TrainingSession::ReportProgress(size_t /*currentIndex*/) { - this->OnProgress(currentIndex); + m_trainer->SummarizeTrainingProgress(); } void TrainingSession::GetTrainingMinibatch(std::unordered_map& minibatch, size_t maxMbSize, const DeviceDescriptor& computeDevice) @@ -256,6 +266,7 @@ namespace CNTK if (mbSize == 0) return; + // TODO: is copy really necessary here? auto minibatchData = source->GetNextMinibatch(0 /*numberOfSequences*/, mbSize, numberOfWorkers, workerRank, computeDevice); if (minibatchData.empty()) return; diff --git a/Source/CNTKv2LibraryDll/Utils.cpp b/Source/CNTKv2LibraryDll/Utils.cpp index 18136e297e1b..1dfccff4f875 100644 --- a/Source/CNTKv2LibraryDll/Utils.cpp +++ b/Source/CNTKv2LibraryDll/Utils.cpp @@ -893,4 +893,64 @@ namespace CNTK template ValuePtr Utils::GetValueObjectFromCNTKImplMatrixAndMBLayout(const Variable& var, const Matrix& matrix, const MBLayoutPtr& layout, bool readOnly /*= true*/); template ValuePtr Utils::GetValueObjectFromCNTKImplMatrixAndMBLayout(const Variable& var, const Matrix& matrix, const MBLayoutPtr& layout, bool readOnly /*= true*/); + + void Accumulator::Update(const ValuePtr& delta, const DeviceDescriptor& device) + { + if (!delta) + { + InvalidArgument("Attempting to add a null value"); + } + + bool copied = false; + if (!Data() || + GetDataType() != delta->GetDataType() || + Shape() != delta->Shape() || + Device() != device || + Mask() != delta->Mask()) + { + copied = true; + m_data = MakeSharedObject(delta->GetDataType(), delta->Shape(), device); + m_mask = delta->Mask(); + ResetToZero(); + } + + if (delta->GetDataType() == DataType::Float) + { + Data()->GetWritableTensorView()->AddCopyOf(*delta->Data()->GetTensorView()); + } + else + { + Data()->GetWritableTensorView()->AddCopyOf(*delta->Data()->GetTensorView()); + } + + if (copied && m_numUpdates != 0) + { + RuntimeError("Accumulation values are created when accumulated num updates not zero"); + } + + m_numUpdates++; + } + + void Accumulator::Reset() + { + ResetToZero(); + m_numUpdates = 0; + } + + void Accumulator::ResetToZero() + { + if (Data() == nullptr) + { + return; + } + + if (GetDataType() == DataType::Float) + { + Data()->SetValue(0.0f); + } + else + { + Data()->SetValue(0.0); + } + } } diff --git a/Source/CNTKv2LibraryDll/Utils.h b/Source/CNTKv2LibraryDll/Utils.h index 39b0179cdca4..ec58b1b7c81b 100644 --- a/Source/CNTKv2LibraryDll/Utils.h +++ b/Source/CNTKv2LibraryDll/Utils.h @@ -582,4 +582,18 @@ namespace CNTK return namedListString; } + + class Accumulator : public Value + { + public: + Accumulator() : Value(nullptr), m_numUpdates(0) {} + + void Update(const ValuePtr& delta, const DeviceDescriptor& device); + void Reset(); + + private: + void ResetToZero(); + + size_t m_numUpdates; + }; } diff --git a/Source/CNTKv2LibraryDll/Value.cpp b/Source/CNTKv2LibraryDll/Value.cpp index 520a25ee668f..f402bfd8daf5 100644 --- a/Source/CNTKv2LibraryDll/Value.cpp +++ b/Source/CNTKv2LibraryDll/Value.cpp @@ -560,6 +560,35 @@ namespace CNTK return std::pair(maxSequenceLength, numSequences); } + template + ElementType Value::AsScalar() const + { + if (Mask()) + LogicError("Scalar Value object cannot have an associated mask"); + + auto scalarData = Data(); + if (scalarData->Shape().TotalSize() != 1) + LogicError("Scalar Value object's has a size > 1"); + + ElementType scalar = std::numeric_limits::quiet_NaN(); + NDArrayViewPtr cpuData; + if (scalarData->Device() == DeviceDescriptor::CPUDevice()) + cpuData = scalarData; + else + { + cpuData = std::make_shared(scalarData->GetDataType(), scalarData->Shape(), CNTK::DeviceDescriptor::CPUDevice()); + cpuData->CopyFrom(*scalarData); + } + + if (scalarData->GetDataType() == DataType::Float) + scalar = *(cpuData->DataBuffer()); + else if (scalarData->GetDataType() == DataType::Double) + scalar = static_cast(*(cpuData->DataBuffer())); + else + LogicError("Unsupported DataType"); + + return scalar; + } void PackedValue::Unpack() const { @@ -659,4 +688,6 @@ namespace CNTK template CNTK_API void Value::CopyVariableValueToVector(const Variable& outputVariable, std::vector>& sequences); template CNTK_API void Value::CopyVariableValueToVector(const Variable& outputVariable, std::vector>& sequences); template CNTK_API void Value::CopyVariableValueToVector(const Variable& outputVariable, std::vector>& sequences); + template float Value::AsScalar() const; + template double Value::AsScalar() const; } diff --git a/Source/CNTKv2LibraryDll/tensorboard/TensorBoardFileWriter.cpp b/Source/CNTKv2LibraryDll/tensorboard/TensorBoardFileWriter.cpp index 94bd3def477f..aab7d4bd9043 100644 --- a/Source/CNTKv2LibraryDll/tensorboard/TensorBoardFileWriter.cpp +++ b/Source/CNTKv2LibraryDll/tensorboard/TensorBoardFileWriter.cpp @@ -84,7 +84,7 @@ namespace CNTK void TensorBoardFileWriter::Init() { time_t time = std::time(0); - std::wstring filePath = Internal::GetNewFilePath(m_dir, time); + std::wstring filePath = GetNewFilePath(m_dir, time); msra::files::make_intermediate_dirs(filePath); @@ -114,7 +114,7 @@ namespace CNTK summaryValue->set_tag(ToString(name)); summaryValue->set_simple_value(value); - WriteRecord(Internal::Serialize(event)); + WriteRecord(Serialize(event)); } void TensorBoardFileWriter::WriteModel() @@ -123,7 +123,7 @@ namespace CNTK // Convert the model to tensorflow GraphDef first. tensorflow::GraphDef graph; - TensorBoardUtils::CreateGraph(m_model->RootFunction(), graph); + CreateTensorBoardGraph(m_model->RootFunction(), graph); std::string graphStr; graph.AppendToString(&graphStr); @@ -133,7 +133,7 @@ namespace CNTK event.set_wall_time(static_cast(std::time(0))); event.set_graph_def(graphStr); - WriteRecord(Internal::Serialize(event)); + WriteRecord(Serialize(event)); } void TensorBoardFileWriter::WriteRecord(const std::string& data) @@ -145,12 +145,12 @@ namespace CNTK // Header: record length (uint64_t) + masked CRC of that (uint32_t). char header[sizeof(uint64_t) + sizeof(uint32_t)]; - Internal::Encode(header, static_cast(data.size())); - Internal::Encode(header + sizeof(uint64_t), Internal::GetMaskedCrc(header, sizeof(uint64_t))); + Encode(header, static_cast(data.size())); + Encode(header + sizeof(uint64_t), GetMaskedCrc(header, sizeof(uint64_t))); // Footer: marked CRC of the actual record. char footer[sizeof(uint32_t)]; - Internal::Encode(footer, Internal::GetMaskedCrc(data.data(), data.size())); + Encode(footer, GetMaskedCrc(data.data(), data.size())); try { @@ -178,7 +178,7 @@ namespace CNTK event.set_wall_time(static_cast(time)); event.set_file_version("brain.Event:2"); - WriteRecord(Internal::Serialize(event)); + WriteRecord(Serialize(event)); } bool TensorBoardFileWriter::Flush() diff --git a/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.cpp b/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.cpp index 7c9181f48c7d..2482faca90d3 100644 --- a/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.cpp +++ b/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.cpp @@ -131,12 +131,12 @@ namespace CNTK if (src.IsOutput()) { result = CreateFunctionNode(src.Owner(), dst, functionNodes, variableNodes, placeholders, - placeholderInputs, scope); + placeholderInputs, scope); } else { result = dst.add_node(); - Internal::PopulateNodeDef(scope, src, *result); + PopulateNodeDef(scope, src, *result); } variableNodes.emplace(src, result); @@ -162,14 +162,14 @@ namespace CNTK tensorflow::NodeDef* functionNode = nullptr; if (src->IsBlock()) { - std::wstring newScope = Internal::GetScopedName(scope, src); + std::wstring newScope = GetScopedName(scope, src); functionNode = CreateFunctionNode(src->BlockRoot(), dst, functionNodes, - variableNodes, placeholders, placeholderInputs, newScope); + variableNodes, placeholders, placeholderInputs, newScope); } else { functionNode = dst.add_node(); - Internal::PopulateNodeDef(scope, src, *functionNode); + PopulateNodeDef(scope, src, *functionNode); } // Note that we map the block function to its root node here (on purpose). @@ -185,7 +185,7 @@ namespace CNTK // We don't create placeholders immediately, just register them so we can later trace the actual // input. inputNode = CreateVariableNode(input, dst, functionNodes, variableNodes, placeholders, - placeholderInputs, scope); + placeholderInputs, scope); } if (!src->IsBlock()) @@ -214,47 +214,43 @@ namespace CNTK return functionNode; } - namespace TensorBoardUtils + void CreateTensorBoardGraph(const FunctionPtr& src, tensorflow::GraphDef& dst) { - void CreateGraph(const FunctionPtr& src, tensorflow::GraphDef& dst) + // For each function/variable visited, contains a matching tensorflow node. + std::unordered_map functionNodes; + std::unordered_map variableNodes; + + // For each (function, placeholder input) found, contains (tensorflow_node, (placeholder, scope)). + std::unordered_multimap placeholders; + // For each placeholder found, contains a (placeholder, (replacement variable, scope)). + std::unordered_map placeholderInputs; + + // Create all nodes in the graph, except placeholders. + CreateFunctionNode(src, dst, functionNodes, variableNodes, placeholders, placeholderInputs, L""); + + // For each function that had a placeholder as its input, add a link to the actual input if it was + // found. + for (auto placeholder : placeholders) { - // For each function/variable visited, contains a matching tensorflow node. - std::unordered_map functionNodes; - std::unordered_map variableNodes; - - // For each (function, placeholder input) found, contains (tensorflow_node, (placeholder, scope)). - std::unordered_multimap placeholders; - // For each placeholder found, contains a (placeholder, (replacement variable, scope)). - std::unordered_map placeholderInputs; - - // Create all nodes in the graph, except placeholders. - Internal::CreateFunctionNode(src, dst, functionNodes, variableNodes, - placeholders, placeholderInputs, L""); - - // For each function that had a placeholder as its input, add a link to the actual input if it was - // found. - for (auto placeholder : placeholders) - { - // Follow the placeholder chain until the end. - Internal::VariableWithScope* finalValue = &placeholder.second; + // Follow the placeholder chain until the end. + VariableWithScope* finalValue = &placeholder.second; - do + do + { + auto nextInput = placeholderInputs.find(finalValue->first); + if (nextInput == placeholderInputs.end()) { - auto nextInput = placeholderInputs.find(finalValue->first); - if (nextInput == placeholderInputs.end()) - { - break; - } - - finalValue = &nextInput->second; - } while (true); - - // Create a node for the finalValue and add it as an input of the function. - tensorflow::NodeDef* inputNode = Internal::CreateVariableNode( - finalValue->first, dst, functionNodes, variableNodes, placeholders, placeholderInputs, - finalValue->second); - placeholder.first->add_input(inputNode->name()); - } + break; + } + + finalValue = &nextInput->second; + } while (true); + + // Create a node for the finalValue and add it as an input of the function. + tensorflow::NodeDef* inputNode = CreateVariableNode( + finalValue->first, dst, functionNodes, variableNodes, placeholders, placeholderInputs, + finalValue->second); + placeholder.first->add_input(inputNode->name()); } } } diff --git a/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.h b/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.h index dfb00daea11f..5f7d3b3f318f 100644 --- a/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.h +++ b/Source/CNTKv2LibraryDll/tensorboard/TensorBoardUtils.h @@ -17,12 +17,9 @@ namespace CNTK { namespace Internal { - namespace TensorBoardUtils - { - /// - /// Populates the given TensorFlow GraphDef with the graph of the given CNTK function. - /// - void CreateGraph(const FunctionPtr& src, tensorflow::GraphDef& dst); - } + /// + /// Populates the given TensorBoard GraphDef with the graph of the given CNTK function. + /// + void CreateTensorBoardGraph(const FunctionPtr& src, tensorflow::GraphDef& dst); } } \ No newline at end of file diff --git a/Tests/EndToEndTests/CNTKv2Python/Examples/ConvNet_CIFAR10_DataAug_Distributed_test.py b/Tests/EndToEndTests/CNTKv2Python/Examples/ConvNet_CIFAR10_DataAug_Distributed_test.py index 9adeb46e4ad7..0de2bd3c0a32 100644 --- a/Tests/EndToEndTests/CNTKv2Python/Examples/ConvNet_CIFAR10_DataAug_Distributed_test.py +++ b/Tests/EndToEndTests/CNTKv2Python/Examples/ConvNet_CIFAR10_DataAug_Distributed_test.py @@ -40,7 +40,7 @@ def mpiexec_test(device_id, script, params, expected_test_error, match_exactly=T os.kill(p.pid, signal.CTRL_C_EVENT) raise RuntimeError('Timeout in mpiexec, possibly hang') str_out = out.decode(sys.getdefaultencoding()) - results = re.findall("Cross Validation \[.+?\]: Minibatch\[.+?\]: errs = (.+?)%", str_out) + results = re.findall("Finished Evaluation \[.+?\]: Minibatch\[.+?\]: metric = (.+?)%", str_out) assert len(results) == 2 print(results) diff --git a/bindings/csharp/Swig/cntk_cs.i b/bindings/csharp/Swig/cntk_cs.i index 545fdd005515..8985315a9aea 100755 --- a/bindings/csharp/Swig/cntk_cs.i +++ b/bindings/csharp/Swig/cntk_cs.i @@ -240,6 +240,8 @@ %ignore_function CNTK::CreateDataParallelDistributedTrainer; %ignore_function CNTK::CreateQuantizedDataParallelDistributedTrainer; +%ignore_class CNTK::ProgressWriter; + %ignore_struct std::hash<::CNTK::DistributedWorkerDescriptor>; // Todo: add correct typemap as they might be useful for C# in future. diff --git a/bindings/python/cntk/cntk_py.i b/bindings/python/cntk/cntk_py.i index d8422af0c76f..879f295e6634 100644 --- a/bindings/python/cntk/cntk_py.i +++ b/bindings/python/cntk/cntk_py.i @@ -123,6 +123,8 @@ %template() std::vector>; %template() std::vector>; %template() std::vector>; +%template() std::vector>; +%template() std::pair; %template() std::pair; %template() std::pair; %template() std::pair; @@ -624,6 +626,12 @@ public: %feature("nodirector") CNTK::TrainingSession::OnCheckpointStart; %feature("nodirector") CNTK::TrainingSession::GetMinibatchSize; +%feature("director") CNTK::ProgressWriter; +%ignore CNTK::ProgressWriter::UpdateTraining; +%ignore CNTK::ProgressWriter::UpdateTest; +%ignore CNTK::ProgressWriter::WriteTrainingSummary; +%ignore CNTK::ProgressWriter::WriteTestSummary; + // // NDShape // @@ -1312,7 +1320,6 @@ std::unordered_map& parameterLearners) + // TODO: do progressWriters below also have a similar issue? + CNTK::TrainerPtr TrainerImpl(const ::CNTK::FunctionPtr& model, const ::CNTK::FunctionPtr& lossFunction, const ::CNTK::FunctionPtr& evaluationFunction, + const std::vector& parameterLearners, const std::vector& progressWriters) { std::vector learners; learners.reserve(parameterLearners.size()); for(const auto& l : parameterLearners) learners.push_back(l); - return CreateTrainer(model, lossFunction, evaluationFunction, learners); + return CreateTrainer(model, lossFunction, evaluationFunction, learners, progressWriters); } - CNTK::TrainerPtr TrainerImpl(const ::CNTK::FunctionPtr& model, const ::CNTK::FunctionPtr& lossFunction, const ::CNTK::FunctionPtr& evaluationFunction, const std::vector& parameterLearners) + CNTK::TrainerPtr TrainerImpl(const ::CNTK::FunctionPtr& model, const ::CNTK::FunctionPtr& lossFunction, const ::CNTK::FunctionPtr& evaluationFunction, + const std::vector& parameterLearners, const std::vector& progressWriters) { - return CreateTrainer(model, lossFunction, evaluationFunction, parameterLearners); + return CreateTrainer(model, lossFunction, evaluationFunction, parameterLearners, progressWriters); } // Global rank of current worker diff --git a/bindings/python/cntk/tests/training_session_test.py b/bindings/python/cntk/tests/training_session_test.py index 154ad609d8b9..d9da6112d800 100644 --- a/bindings/python/cntk/tests/training_session_test.py +++ b/bindings/python/cntk/tests/training_session_test.py @@ -56,9 +56,9 @@ def mb_source(tmpdir, fileprefix, epoch_size=FULL_DATA_SWEEP): f.write(ctf_data) mbs = MinibatchSource(CTFDeserializer(ctf_file, StreamDefs( - features = StreamDef(field='S0', shape=input_dim, is_sparse=True), - labels = StreamDef(field='S1', shape=input_dim, is_sparse=True) - )), + features = StreamDef(field='S0', shape=input_dim, is_sparse=True), + labels = StreamDef(field='S1', shape=input_dim, is_sparse=True) + )), randomize=False, epoch_size=epoch_size) return mbs @@ -76,47 +76,37 @@ def trainer(device): return { 'trainer':trainer, 'input':in1, - 'label':labels + 'label':labels, + 'model':z, + 'criteria':(ce, errs), + 'learners':[learner] } -class MockProgressPrinter: - def __init__(self, trainer, expected_cv=None, epoch_summary_counter=0): - self.epoch_summary_counter = epoch_summary_counter - self.trainer = trainer + +class MockProgressWriter(cntk_py.ProgressWriter): + def __init__(self, expected_cv=None, training_summary_counter=0): + super(MockProgressWriter, self).__init__(1, 0, 1, 0) + self.training_summary_counter = training_summary_counter + self.cv_summary_counter = 0 self.expected_cv = expected_cv self.minibatch_info = [] - def update_with_trainer(self, trainer, with_metric): + def on_write_training_update(self, samples, updates, aggregate_loss, aggregate_metric): + mb_samples = samples[1] - samples[0] + avg_loss = (aggregate_loss[1] - aggregate_loss[0]) / mb_samples + avg_metric = (aggregate_metric[1] - aggregate_metric[0]) / mb_samples self.minibatch_info.append( - (self.epoch_summary_counter, - (trainer.previous_minibatch_loss_average, - trainer.previous_minibatch_evaluation_average, - trainer.previous_minibatch_sample_count, - trainer.total_number_of_samples_seen))) - - def epoch_summary(self, with_metric): - self.epoch_summary_counter += 1 - - def log(self, msg): - results = re.findall("Cross Validation \[(.+?)\]: Minibatch\[.+?\]: errs = (.+?)% \* (\d+)", msg) - assert(len(results) == 1) - validation_index = int(results[0][0]) - 1 - assert(self.expected_cv[validation_index][0] == float(results[0][1])) - assert(self.expected_cv[validation_index][1] == int(results[0][2])) + (self.training_summary_counter, (avg_loss, avg_metric, mb_samples))) -def test_session_sanity_check(tmpdir, device_id): + def on_write_training_summary(self, samples, updates, summaries, aggregate_loss, aggregate_metric, + elapsed_milliseconds): + self.training_summary_counter += 1 - device=cntk_device(device_id) - t = trainer(device) - mbs = mb_source(tmpdir, "training") + def on_write_test_summary(self, samples, updates, summaries, aggregate_metric, elapsed_milliseconds): + assert (self.expected_cv[self.cv_summary_counter][0] == float(aggregate_metric / samples * 100.0)) + assert (self.expected_cv[self.cv_summary_counter][1] == int(samples)) + self.cv_summary_counter += 1 - input_map = { - t['input'] : mbs.streams.features, - t['label'] : mbs.streams.labels - } - - session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), model_inputs_to_mb_source_mapping=input_map) - session.train(device) def test_session_sanity_check(tmpdir, device_id): device=cntk_device(device_id) @@ -124,8 +114,8 @@ def test_session_sanity_check(tmpdir, device_id): mbs = mb_source(tmpdir, "training") input_map = { - t['input'] : mbs.streams.features, - t['label'] : mbs.streams.labels + t['input']: mbs.streams.features, + t['label']: mbs.streams.labels } session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), model_inputs_to_mb_source_mapping=input_map) @@ -141,8 +131,8 @@ def test_session_max_samples(tmpdir, device_id): t['label'] : mbs.streams.labels } - session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping=input_map, max_training_samples=20) + session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), + model_inputs_to_mb_source_mapping=input_map, max_training_samples=20) session.train(device) assert(t['trainer'].total_number_of_samples_seen == 21) @@ -158,13 +148,14 @@ def test_session_cross_validation_at_end(tmpdir, device_id): t['label'] : mbs.streams.labels } - printer = MockProgressPrinter(t['trainer'], expected_cv=[[92, 25]]) - session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping=input_map, - max_training_samples=20, cv_source=mbs1, progress_printer=printer) + writer = MockProgressWriter(expected_cv=[[92, 25]]) + session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), + model_inputs_to_mb_source_mapping=input_map, + max_training_samples=20, cv_source=mbs1, progress_printer=[writer]) session.train(device) assert(t['trainer'].total_number_of_samples_seen == 21) + assert(writer.cv_summary_counter == 1) def test_session_cross_validation_3_times(tmpdir, device_id): device=cntk_device(device_id) @@ -177,14 +168,15 @@ def test_session_cross_validation_3_times(tmpdir, device_id): t['label'] : mbs.streams.labels } - printer = MockProgressPrinter(t['trainer'], expected_cv=[[92, 25], [92, 25], [92, 25]]) - session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping=input_map, - max_training_samples=60, cv_source=mbs1, cv_frequency=20, - cv_mb_size_schedule=minibatch_size_schedule(2), progress_printer=printer) + writer = MockProgressWriter(expected_cv=[[92, 25], [92, 25], [92, 25]]) + session = training_session(mbs, t['trainer'], minibatch_size_schedule(4), + model_inputs_to_mb_source_mapping=input_map, + max_training_samples=60, cv_source=mbs1, cv_frequency=20, + cv_mb_size_schedule=minibatch_size_schedule(2), progress_printer=[writer]) session.train(device) assert(t['trainer'].total_number_of_samples_seen == 61) + assert(writer.cv_summary_counter == 3) def test_session_cross_validation_3_times_checkpoints_2_save_all(tmpdir, device_id): @@ -203,18 +195,18 @@ def test_session_cross_validation_3_times_checkpoints_2_save_all(tmpdir, device_ test_dir = str(tmpdir) - printer = MockProgressPrinter(t['trainer'], expected_cv=[[92, 25], [92, 25], [92, 25]]) + writer = MockProgressWriter(expected_cv=[[92, 25], [92, 25], [92, 25]]) session = training_session( training_minibatch_source = mbs, - trainer = t['trainer'], - mb_size_schedule=minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping = input_map, - max_training_samples = 60, - cv_source = mbs1, - cv_frequency = 20, - progress_printer = printer, + trainer = t['trainer'], + mb_size_schedule = minibatch_size_schedule(4), + model_inputs_to_mb_source_mapping = input_map, + max_training_samples = 60, + cv_source = mbs1, + cv_frequency = 20, + progress_printer = [writer], checkpoint_frequency = 35, - checkpoint_filename = str(tmpdir/"checkpoint_save_all"), + checkpoint_filename = str(tmpdir / "checkpoint_save_all"), save_all_checkpoints = True) session.train(device) @@ -229,6 +221,8 @@ def test_session_cross_validation_3_times_checkpoints_2_save_all(tmpdir, device_ assert("checkpoint_save_all" in candidates) assert("checkpoint_save_all.ckp" in candidates) + assert(writer.cv_summary_counter == 3) + def test_session_progress_print(tmpdir, device_id): from os import listdir from os.path import isfile, join @@ -244,20 +238,19 @@ def test_session_progress_print(tmpdir, device_id): test_dir = str(tmpdir) - printer = MockProgressPrinter(t['trainer']) + writer = MockProgressWriter() session = training_session( training_minibatch_source = mbs, - trainer = t['trainer'], - mb_size_schedule=minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping = input_map, - max_training_samples = 60, - progress_printer = printer, + trainer = t['trainer'], + mb_size_schedule = minibatch_size_schedule(4), + model_inputs_to_mb_source_mapping = input_map, + max_training_samples = 60, + progress_printer = [writer], progress_frequency = 10) session.train(device) - assert(printer.epoch_summary_counter == 6) - + assert(writer.training_summary_counter == 6) def test_session_restart_from_checkpoint(tmpdir, device_id): from os import listdir @@ -274,17 +267,17 @@ def test_session_restart_from_checkpoint(tmpdir, device_id): } test_dir = str(tmpdir) - printer = MockProgressPrinter(t['trainer']) + writer = MockProgressWriter() session = training_session( training_minibatch_source = mbs, - trainer = t['trainer'], - mb_size_schedule=minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping = input_map, - max_training_samples = 60, + trainer = t['trainer'], + mb_size_schedule = minibatch_size_schedule(4), + model_inputs_to_mb_source_mapping = input_map, + max_training_samples = 60, checkpoint_frequency = 35, - checkpoint_filename = str(tmpdir/"restart_from_checkpoint"), - progress_printer=printer, + checkpoint_filename=str(tmpdir/"restart_from_checkpoint"), + progress_printer = [writer], progress_frequency = 35, save_all_checkpoints = True) @@ -309,19 +302,19 @@ def test_session_restart_from_checkpoint(tmpdir, device_id): os.remove(str(tmpdir/f)) # restoring from a particular checkpoint and again save everything from the second epoch - printer2 = MockProgressPrinter(t['trainer'], epoch_summary_counter=1) + writer2 = MockProgressWriter(training_summary_counter=1) session = training_session( training_minibatch_source=mbs, - trainer=t['trainer'], + trainer=Trainer(t['model'], t['criteria'], t['learners']), mb_size_schedule=minibatch_size_schedule(4), - model_inputs_to_mb_source_mapping = input_map, - progress_printer=printer2, - checkpoint_frequency = 35, - progress_frequency = 35, + model_inputs_to_mb_source_mapping=input_map, + progress_printer=[writer2], + checkpoint_frequency=35, + progress_frequency=35, max_training_samples=60, - checkpoint_filename = str(tmpdir/"saved_restart_from_checkpoint0"), - restore = True, - save_all_checkpoints= True) + checkpoint_filename=str(tmpdir/"saved_restart_from_checkpoint0"), + restore=True, + save_all_checkpoints=True) session.train(device) candidates = [f for f in listdir(test_dir) if isfile(join(test_dir, f)) and f.startswith("saved_restart_from_checkpoint0")] @@ -336,6 +329,6 @@ def test_session_restart_from_checkpoint(tmpdir, device_id): assert("saved_restart_from_checkpoint0.ckp" in candidates) # remove information about 0 epoch from the mock printer - first_run_minibatch_info = [i for i in printer.minibatch_info if i[0] != 0] - - assert(first_run_minibatch_info == printer2.minibatch_info) + first_run_minibatch_info = [i for i in writer.minibatch_info if i[0] != 0] + + assert(first_run_minibatch_info == writer2.minibatch_info) diff --git a/bindings/python/cntk/trainer.py b/bindings/python/cntk/trainer.py index 29641aa0dfd5..7d59f9d1fcce 100644 --- a/bindings/python/cntk/trainer.py +++ b/bindings/python/cntk/trainer.py @@ -28,8 +28,10 @@ class Trainer(cntk_py.Trainer): criteria (Python tuple of :class:`~cntk.ops.functions.Function`, or :class:`~cntk.ops.functions.Function` or ): loss and metric function, given as a either Python tuple or tuple-valued CNTK Function parameter_learners (list): list of learners from :mod:`cntk.learner` + progress_writers (list): optionally, list of progress writers from :mod:`cntk.utils` to automatically track + training progress. ''' - def __init__(self, model, criteria, parameter_learners): + def __init__(self, model, criteria, parameter_learners, progress_writers=None): if isinstance(criteria, cntk_py.Function): criteria = criteria.outputs # turn CNTK Function into a tuple loss_function, eval_function = criteria # destructure the tuple @@ -40,8 +42,12 @@ def __init__(self, model, criteria, parameter_learners): eval_function = sanitize_function(eval_function) if not isinstance(parameter_learners, list): parameter_learners = [parameter_learners] + if progress_writers is None: + progress_writers = [] + elif not isinstance(progress_writers, list): + progress_writers = [progress_writers] - trainer = cntk_py.trainer_impl(model, loss_function, eval_function, parameter_learners) + trainer = cntk_py.trainer_impl(model, loss_function, eval_function, parameter_learners, progress_writers) # transplant into this class instance self.__dict__ = trainer.__dict__ @@ -118,7 +124,6 @@ def train_minibatch(self, arguments, outputs=None, device=None): return updated - def test_minibatch(self, arguments, device=None): ''' Test the model on the specified batch of samples using the evaluation @@ -243,29 +248,16 @@ def total_number_of_samples_seen(self): ''' return super(Trainer, self).total_number_of_samples_seen() - @property - def accumulated_loss_average(self): - ''' - The average training loss per sample since the last reset_accumulation() - ''' - return super(Trainer, self).accumulated_loss_average() - - @property - def accumulated_evaluation_average(self): - ''' - The average evaluation criterion value per sample since the last reset_accumulation() - ''' - return super(Trainer, self).accumulated_evaluation_average() - - @property - def accumulated_sample_count(self): + def summarize_training_progress(self): ''' - The number of samples since last reset_accumulation + Updates the progress writers with the summary of training progress since start and resets the internal + accumulators. ''' - return super(Trainer, self).accumulated_sample_count() + return super(Trainer, self).summarize_training_progress() - def reset_accumulation(self): + def summarize_test_progress(self): ''' - Reset accumulated loss and evaluation criterion + Updates the progress writers with the summary of test progress since start and resets the internal + accumulators. ''' - return super(Trainer, self).reset_accumulation() + return super(Trainer, self).summarize_test_progress() \ No newline at end of file diff --git a/bindings/python/cntk/training_session.py b/bindings/python/cntk/training_session.py index c747750757a6..6c12952a347e 100644 --- a/bindings/python/cntk/training_session.py +++ b/bindings/python/cntk/training_session.py @@ -30,7 +30,7 @@ class TrainingSession(cntk_py.TrainingSession): training_minibatch_source (:class:`~cntk.io.MinibatchSource`): minibatch source used for training trainer (:class:`~cntk.trainer.Trainer`): trainer mb_size_schedule (:class:`~cntk.cntk_py.minibatch_size_schedule`): minibatch schedule for training - progress_printer (:class:`~cntk.utils.progress_print.ProgressPrinter`): progress printer + progress_printer (list): list of progress writers from :mod:`cntk.utils` model_inputs_to_mb_source_mapping (dict): mapping between input variables and input streams checkpoint_frequency (int): checkpoint frequency in samples. If 0, no checkpointing takes place. If ``sys.maxsize``, a single checkpoint is taken at the end of the training. @@ -50,8 +50,8 @@ def __init__(self, training_minibatch_source, trainer, mb_size_schedule, checkpoint_frequency, checkpoint_filename, save_all_checkpoints, restore, progress_frequency, cv_source, cv_frequency, cv_mb_size_schedule, max_training_samples): - self.progress_printer = progress_printer - self.trainer = trainer + # TODO: rename progress_printer argument to progress_writers + progress_writers = progress_printer if not isinstance(mb_size_schedule, cntk_py.minibatch_size_schedule): raise ValueError('mb_size_schedule type (%s) not supported. ' @@ -87,6 +87,11 @@ def __init__(self, training_minibatch_source, trainer, mb_size_schedule, if cv_mb_size_schedule is None: cv_mb_size_schedule = minibatch_size_schedule(1) + if progress_writers is None: + progress_writers = [] + elif not isinstance(progress_writers, list): + progress_writers = [progress_writers] + super(TrainingSession, self).__init__( training_minibatch_source, trainer, @@ -100,7 +105,8 @@ def __init__(self, training_minibatch_source, trainer, mb_size_schedule, restore, save_all_checkpoints, max_training_samples, - progress_frequency) + progress_frequency, + progress_writers) @typemap def train(self, device=None): @@ -117,24 +123,6 @@ def train(self, device=None): super(TrainingSession, self).train(device) - def on_minibatch_end(self): - ''' - Callback that gets executed at the end of each minibatch. - ''' - if self.progress_printer and self.trainer.total_number_of_samples_seen != 0: - self.progress_printer.update_with_trainer( - self.trainer, with_metric=True) - - def on_progress(self, index): - ''' - Callback that gets executed with the ``progress_frequency`` frequency in samples. - - Args: - index (int): index of the current callback. - ''' - if self.progress_printer: - self.progress_printer.epoch_summary(with_metric=True) - def on_cross_validation_end(self, index, average_error, num_samples, num_minibatches): ''' Callback that gets executed at the end of cross validation. @@ -145,10 +133,7 @@ def on_cross_validation_end(self, index, average_error, num_samples, num_minibat num_samples (int): number of samples in cross validation num_minibatches (int): number of minibatch in cross validation ''' - if self.progress_printer: - msg = "Cross Validation [{}]: Minibatch[1-{}]: errs = {:0.2f}% * {}".format( - index + 1, num_minibatches, average_error * 100, num_samples) - self.progress_printer.log(msg) + pass @typemap @@ -216,7 +201,7 @@ def training_session(training_minibatch_source, training_minibatch_source (:class:`~cntk.io.MinibatchSource`): minibatch source used for training trainer (:class:`~cntk.trainer.Trainer`): trainer mb_size_schedule (:class:`~cntk.cntk_py.minibatch_size_schedule`): minibatch schedule for training - progress_printer (:class:`~cntk.utils.progress_print.ProgressPrinter`): progress printer + progress_printer (list): list of progress writers from :mod:`cntk.utils` model_inputs_to_mb_source_mapping (dict): mapping between input variables and input streams checkpoint_filename (str): checkpoint file name. checkpoint_frequency (int): checkpoint frequency in samples. If 0, no checkpointing takes place. diff --git a/bindings/python/cntk/utils/progress_print.py b/bindings/python/cntk/utils/progress_print.py index 426770314923..de43661d4035 100644 --- a/bindings/python/cntk/utils/progress_print.py +++ b/bindings/python/cntk/utils/progress_print.py @@ -5,46 +5,61 @@ # ============================================================================== from __future__ import print_function import os +import sys import time -from cntk.cntk_py import TensorBoardFileWriter, print_built_info +from cntk import cntk_py + +def _warn_deprecated(message): + from warnings import warn + warn('DEPRECATED: ' + message, DeprecationWarning, stacklevel=2) + + +def _avg(numerator, denominator): + if isinstance(numerator, tuple): + numerator = numerator[1] - numerator[0] + if isinstance(denominator, tuple): + denominator = denominator[1] - denominator[0] + return (numerator / denominator) if denominator > 0 else 0.0 + # TODO: Let's switch to import logging in the future instead of print. [ebarsoum] -class ProgressPrinter(object): +class ProgressPrinter(cntk_py.ProgressWriter): ''' - Allows tracking various training time statistics (e.g. loss and metric) - and output them as training progresses. - - It provides the number of samples, average loss and average metric - since the last output or since the start of accumulation. - - Args: - freq (int or None, default None): determines how often - printing will occur. The value of 0 means an geometric - schedule (1,2,4,...). A value > 0 means a arithmetic schedule - (a log print for minibatch number: ``freq``, a log print for minibatch number: 2*``freq``, a log print for minibatch number: 3*``freq``,...), and a value of None means no per-minibatch log. - first (int, default 0): Only start logging after the minibatch number is greater or equal to ``first``. - tag (string, default EmptyString): prepend minibatch log lines with your own string - log_to_file (string or None, default None): if None, output log data to stdout. If a string is passed, the string is path to a file for log data. - gen_heartbeat (bool, default False): If True output a progress message every 10 seconds or so to stdout. - num_epochs (int, default 300): The total number of epochs to be trained. Used for some metadata. This parameter is optional. - tensorboard_log_dir (string or None, default None): if a string is passed, logs statistics to the TensorBoard events file in the given directory. - model (:class:`~cntk.ops.Function` or None, default None): if a Function is passed and ``tensorboard_log_dir`` is not None, records model graph to a TensorBoard events file. + Allows printing various training time statistics (e.g. loss and metric) and printing them as training progresses. ''' def __init__(self, freq=None, first=0, tag='', log_to_file=None, rank=None, gen_heartbeat=False, num_epochs=300, - tensorboard_log_dir=None, model=None): + test_freq=None, test_first=0): ''' - Constructor. The optional ``freq`` parameter determines how often - printing will occur. The value of 0 means an geometric - schedule (1,2,4,...). A value > 0 means a arithmetic schedule - (freq, 2*freq, 3*freq,...), and a value of None means no per-minibatch log. - set log_to_file if you want the output to go file instead of stdout. - set rank to distributed.rank if you are using distibuted parallelism -- each rank's log will go to seperate file. + Constructor. + + Args: + freq (`int` or `None`, default `None`): determines how often + printing will occur. The value of 0 means an geometric + schedule (1,2,4,...). A value > 0 means a arithmetic schedule + (a log print for minibatch number: ``freq``, a log print for minibatch number: 2*``freq``, + a log print for minibatch number: 3*``freq``,...), and a value of None means no per-minibatch log. + first (`int`, default 0): Only start logging after the minibatch number is greater or equal to ``first``. + tag (`string`, default EmptyString): prepend minibatch log lines with your own string + log_to_file (`string` or `None`, default `None`): if None, output log data to stdout. + If a string is passed, the string is path to a file for log data. + rank (`int` or `None`, default `None`): set this to distributed.rank if you are using distributed + parallelism -- each rank's log will go to separate file. + gen_heartbeat (`bool`, default `False`): If True output a progress message every 10 seconds or so to stdout. + num_epochs (`int`, default 300): The total number of epochs to be trained. Used for some metadata. + This parameter is optional. + test_freq (`int` or `None`, default `None`): similar to ``freq``, but applies to printing intermediate + test results. + test_first (`int`, default 0): similar to ``first``, but applies to printing intermediate test results. ''' - from sys import maxsize if freq is None: - freq = maxsize + freq = sys.maxsize + + if test_freq is None: + test_freq = sys.maxsize + + super(ProgressPrinter, self).__init__(freq, first, test_freq, test_first) self.loss_since_start = 0 self.metric_since_start = 0 @@ -57,34 +72,22 @@ def __init__(self, freq=None, first=0, tag='', log_to_file=None, rank=None, gen_ self.epochs = 0 self.freq = freq self.first = first + self.test_freq = test_freq self.tag = '' if not tag else "[{}] ".format(tag) self.epoch_start_time = time.time() self.progress_timer_time = 0 self.log_to_file = log_to_file - self.rank = rank self.gen_heartbeat = gen_heartbeat - self.num_epochs = num_epochs - self.trainer = None - - # print out data about CNTK build - print_built_info() + self.num_epochs = num_epochs - # Create TensorBoardFileWriter if the path to a log directory was provided. - self.tensorboard_writer = None - if tensorboard_log_dir is not None: - tb_run_name = tag.lower() if tag else '' - if self.rank is not None: - tb_run_name += 'rank' + str(self.rank) - - if tb_run_name: - tensorboard_log_dir = os.path.join(tensorboard_log_dir, tb_run_name) - self.tensorboard_writer = TensorBoardFileWriter(tensorboard_log_dir, model) + # print out data about CNTK build + cntk_py.print_built_info() self.logfilename = None if self.log_to_file is not None: self.logfilename = self.log_to_file - if self.rank != None: + if rank is not None: self.logfilename = self.logfilename + 'rank' + str(self.rank) # print to stdout @@ -97,68 +100,88 @@ def __init__(self, freq=None, first=0, tag='', log_to_file=None, rank=None, gen_ self.___logprint('CNTKCommandTrainInfo: CNTKNoMoreCommands_Total : ' + str(num_epochs)) self.___logprint('CNTKCommandTrainBegin: train') - if freq==0: + if freq == 0: self.___logprint(' average since average since examples') self.___logprint(' loss last metric last ') self.___logprint(' ------------------------------------------------------') def end_progress_print(self, msg=""): + ''' + Prints the given message signifying the end of training. + + Args: + msg (`string`, default ''): message to print. + ''' self.___logprint('CNTKCommandTrainEnd: train') if msg != "" and self.log_to_file is not None: self.___logprint(msg) - if self.tensorboard_writer is not None: - self.tensorboard_writer.close() - - def flush(self): - if self.tensorboard_writer is not None: - self.tensorboard_writer.flush() def avg_loss_since_start(self): ''' + DEPRECATED. + Returns: the average loss since the start of accumulation ''' - return self.loss_since_start/self.samples_since_start + _warn_deprecated('The method was deprecated.') + return _avg(self.loss_since_start, self.samples_since_start) def avg_metric_since_start(self): ''' + DEPRECATED. + Returns: the average metric since the start of accumulation ''' - return self.metric_since_start/self.samples_since_start + _warn_deprecated('The method was deprecated.') + return _avg(self.metric_since_start, self.samples_since_start) def avg_loss_since_last(self): ''' + DEPRECATED. + Returns: the average loss since the last print ''' - return self.loss_since_last/self.samples_since_last + _warn_deprecated('The method was deprecated.') + return _avg(self.loss_since_last, self.samples_since_last) def avg_metric_since_last(self): ''' + DEPRECATED. + Returns: the average metric since the last print ''' - return self.metric_since_last/self.samples_since_last + _warn_deprecated('The method was deprecated.') + return _avg(self.metric_since_last, self.samples_since_last) def reset_start(self): ''' + DEPRECATED. + Resets the 'start' accumulators Returns: tuple of (average loss since start, average metric since start, samples since start) ''' + _warn_deprecated('The method was deprecated.') ret = self.avg_loss_since_start(), self.avg_metric_since_start(), self.samples_since_start - self.loss_since_start = 0 - self.metric_since_start = 0 + self.loss_since_start = 0 + self.metric_since_start = 0 self.samples_since_start = 0 self.updates_since_start = 0 return ret def reset_last(self): ''' + DEPRECATED. + Resets the 'last' accumulators Returns: tuple of (average loss since last, average metric since last, samples since last) ''' + if self.total_updates == 0: + # Only warn once to avoid flooding with warnings. + _warn_deprecated('The method was deprecated.') ret = self.avg_loss_since_last(), self.avg_metric_since_last(), self.samples_since_last - self.loss_since_last = 0 - self.metric_since_last = 0 + self.loss_since_last = 0 + self.metric_since_last = 0 self.samples_since_last = 0 return ret @@ -170,56 +193,42 @@ def ___logprint(self, logline): # to named file. if distributed, one file per rank with open(self.logfilename, "a") as logfile: logfile.write(logline + "\n") - + def epoch_summary(self, with_metric=False): ''' + DEPRECATED. + If on an arithmetic schedule print an epoch summary using the 'start' accumulators. If on a geometric schedule does nothing. Args: with_metric (`bool`): if `False` it only prints the loss, otherwise it prints both the loss and the metric ''' - self.update_with_trainer(self.trainer, with_metric, force_update=True) + _warn_deprecated('The method was deprecated.') self.epochs += 1 - if self.freq > 0: - epoch_end_time = time.time() - time_delta = epoch_end_time - self.epoch_start_time - speed = 0 - avg_loss, avg_metric, samples = (0, 0, 0) - if self.samples_since_start != 0: - avg_loss, avg_metric, samples = self.reset_start() - if (time_delta > 0): - speed = samples / time_delta - self.epoch_start_time = epoch_end_time - if with_metric: - self.___logprint("Finished Epoch[{} of {}]: {}loss = {:0.6f} * {}, metric = {:0.1f}% * {} {:0.3f}s ({:5.1f} samples per second);".format(self.epochs, self.num_epochs, self.tag, avg_loss, samples, avg_metric*100.0, samples, time_delta, speed)) - else: - self.___logprint("Finished Epoch[{} of {}]: {}loss = {:0.6f} * {} {:0.3f}s ({:5.1f} samples per second);".format(self.epochs, self.num_epochs, self.tag, avg_loss, samples, time_delta, speed)) + epoch_end_time = time.time() + elapsed_milliseconds = (epoch_end_time - self.epoch_start_time) * 1000 - # For logging to TensorBoard, we use self.total_updates as it does not reset after each epoch. - self.update_value('epoch_avg_loss', avg_loss, self.epochs) - if with_metric: - self.update_value('epoch_avg_metric', avg_metric * 100.0, self.epochs) + metric_since_start = self.metric_since_start if with_metric else None + self.on_write_training_summary(self.samples_since_start, self.updates_since_start, self.epochs, + self.loss_since_start, metric_since_start, elapsed_milliseconds) - self.loss_since_last = 0 - self.metric_since_last = 0 - self.samples_since_last = 0 - return avg_loss, avg_metric, samples # BUGBUG: for freq=0, we don't return anything here + if self.freq > 0: + return self.reset_start() def ___generate_progress_heartbeat(self): timer_delta = time.time() - self.progress_timer_time - + # print progress no sooner than 10s apart if timer_delta > 10 and self.gen_heartbeat: # print to stdout print("PROGRESS: 0.00%") self.progress_timer_time = time.time() - def log(self, message): - self.___logprint(message) - - def update(self, loss, minibatch_size, metric=None, updates_inc=True): + def update(self, loss, minibatch_size, metric=None): ''' + DEPRECATED. + Updates the accumulators using the loss, the minibatch_size and the optional metric. Args: @@ -228,90 +237,239 @@ def update(self, loss, minibatch_size, metric=None, updates_inc=True): metric (`float` or `None`): if `None` do not update the metric accumulators, otherwise update with the given value ''' + if self.total_updates == 0: + # Only warn once to avoid flooding with warnings. + _warn_deprecated('The method was deprecated.') + + if minibatch_size == 0: + return + self.samples_since_start += minibatch_size - self.samples_since_last += minibatch_size - self.loss_since_start += loss * minibatch_size - self.loss_since_last += loss * minibatch_size - - if updates_inc: - self.updates_since_start += 1 - self.total_updates += 1 + self.samples_since_last += minibatch_size + self.loss_since_start += loss * minibatch_size + self.loss_since_last += loss * minibatch_size + self.updates_since_start += 1 + self.total_updates += 1 if metric is not None: self.metric_since_start += metric * minibatch_size - self.metric_since_last += metric * minibatch_size + self.metric_since_last += metric * minibatch_size self.___generate_progress_heartbeat() - if self.freq == 0 and (self.updates_since_start+1) & self.updates_since_start == 0: - avg_loss, avg_metric, samples = self.reset_last() - if metric is not None: - self.___logprint(' {:8.3g} {:8.3g} {:8.3g} {:8.3g} {:10d}'.format( - self.avg_loss_since_start(), avg_loss, - self.avg_metric_since_start(), avg_metric, - self.samples_since_start)) - else: - self.___logprint(' {:8.3g} {:8.3g} {:8s} {:8s} {:10d}'.format( - self.avg_loss_since_start(), avg_loss, - '', '', self.samples_since_start)) - elif self.freq > 0 and (self.updates_since_start % self.freq == 0 or self.updates_since_start <= self.first): - avg_loss, avg_metric, samples = self.reset_last() - - if self.updates_since_start <= self.first: # printing individual MBs - first_mb = self.updates_since_start - else: - first_mb = max(self.updates_since_start - self.freq + 1, self.first+1) + if ((self.freq == 0 and (self.updates_since_start + 1) & self.updates_since_start == 0) or + self.freq > 0 and (self.updates_since_start % self.freq == 0 or self.updates_since_start <= self.first)): + samples = (self.samples_since_start - self.samples_since_last, self.samples_since_start) + updates = None + if self.freq > 0: + if self.updates_since_start <= self.first: # printing individual MBs + first_update = self.updates_since_start + else: + first_update = max(self.updates_since_start - self.freq, self.first) + updates = (first_update, self.updates_since_start) + + aggregate_loss = (self.loss_since_start - self.loss_since_last, self.loss_since_start) + aggregate_metric = None if metric is not None: - self.___logprint(' Minibatch[{:4d}-{:4d}]: loss = {:0.6f} * {:d}, metric = {:0.1f}% * {:d};'.format( - first_mb, self.updates_since_start, avg_loss, samples, avg_metric*100.0, samples)) - else: - self.___logprint(' Minibatch[{:4d}-{:4d}]: loss = {:0.6f} * {:d};'.format( - first_mb, self.updates_since_start, avg_loss, samples)) + aggregate_metric = (self.metric_since_start - self.metric_since_last, self.metric_since_start) - if self.updates_since_start > self.first: - # For logging to TensorBoard, we use self.total_updates as it does not reset after each epoch. - self.update_value('mb_avg_loss', avg_loss, self.total_updates) - if metric is not None: - self.update_value('mb_avg_metric', avg_metric * 100.0, self.total_updates) + self.on_write_training_update(samples, updates, aggregate_loss, aggregate_metric) + self.reset_last() - def update_with_trainer(self, trainer, with_metric=False, force_update=False): + def update_with_trainer(self, trainer, with_metric=False): ''' - Updates the accumulators using the loss, the minibatch_size and optionally the metric - using the information from the ``trainer``. + DEPRECATED. Use :func:`cntk.utils.ProgressPrinter.update_training` instead. + + Update the current loss, the minibatch size and optionally the metric using the information from the + ``trainer``. Args: trainer (:class:`cntk.trainer.Trainer`): trainer from which information is gathered with_metric (`bool`): whether to update the metric accumulators ''' - if trainer == None or trainer.previous_minibatch_sample_count == 0: + if self.total_updates == 0: + # Only warn once to avoid flooding with warnings. + _warn_deprecated('Use ProgressPrinter.update_progress() instead.') + + if trainer is not None and trainer.previous_minibatch_sample_count != 0: + self.update( + trainer.previous_minibatch_loss_average, + trainer.previous_minibatch_sample_count, + trainer.previous_minibatch_evaluation_average if with_metric else None) + + def on_write_training_update(self, samples, updates, aggregate_loss, aggregate_metric): + # Override for ProgressWriter.on_write_training_update. + self.___write_progress_update(samples, updates, aggregate_loss, aggregate_metric, self.freq, '') + + def on_training_update_end(self): + # Override for ProgressWriter.on_training_update_end. + self.___generate_progress_heartbeat() + + def on_write_test_update(self, samples, updates, aggregate_metric): + # Override for ProgressWriter.on_write_test_update. + self.___write_progress_update(samples, updates, None, aggregate_metric, self.test_freq, 'Evaluation ') + + def ___write_progress_update(self, samples, updates, aggregate_loss, aggregate_metric, frequency, name): + format_str = ' ' + format_args = [] + + if frequency == 0: + if aggregate_loss is not None: + format_str += '{:8.3g} {:8.3g} ' + format_args.extend([_avg(aggregate_loss[1], samples[1]), _avg(aggregate_loss, samples)]) + else: + format_str += '{:8s} {:8s} ' + format_args.extend(['', '']) + + if aggregate_metric is not None: + format_str += '{:8.3g} {:8.3g} ' + format_args.extend([_avg(aggregate_metric[1], samples[1]), _avg(aggregate_metric, samples)]) + else: + format_str += '{:8s} {:8s} ' + format_args.extend(['', '']) + + format_str += ' {:10d}' + format_args.append(samples[1]) + else: + format_str += '{}Minibatch[{:4d}-{:4d}]: ' + format_args.extend([name, updates[0] + 1, updates[1]]) + + if aggregate_loss is not None: + format_str += 'loss = {:0.6f} * {:d}' + format_args.extend([_avg(aggregate_loss, samples), samples[1] - samples[0]]) + + if aggregate_metric is not None: + if aggregate_loss is not None: + format_str += ', ' + format_str += 'metric = {:0.2f}% * {:d}' + format_args.extend([_avg(aggregate_metric, samples) * 100.0, samples[1] - samples[0]]) + + format_str += ';' + + self.___logprint(format_str.format(*format_args)) + + def on_write_training_summary(self, samples, updates, summaries, aggregate_loss, aggregate_metric, + elapsed_milliseconds): + # Override for ProgressWriter.on_write_training_summary. + if self.freq == 0: + # Only log training summary when on arithmetic schedule. return - #remember the trainer for epoch_summary - self.trainer = trainer + elapsed_seconds = elapsed_milliseconds / 1000 + speed = _avg(samples, elapsed_seconds) + avg_loss = _avg(aggregate_loss, samples) - self.updates_since_start += 1 - self.total_updates += 1 - - if force_update or (self.freq == 0 and (self.updates_since_start+1) & self.updates_since_start == 0) or (self.freq > 0 and (self.updates_since_start % self.freq == 0 or self.updates_since_start <= self.first)): - self.update( - trainer.accumulated_loss_average, - trainer.accumulated_sample_count, - trainer.accumulated_evaluation_average if with_metric else None, - updates_inc=False) - trainer.reset_accumulation() + if aggregate_metric is not None: + avg_metric = _avg(aggregate_metric, samples) + msg = "Finished Epoch[{} of {}]: {}loss = {:0.6f} * {}, metric = {:0.2f}% * {} {:0.3f}s ({:5.1f} samples per second);".format( + summaries, self.num_epochs, self.tag, avg_loss, samples, avg_metric * 100.0, samples, + elapsed_seconds, speed) + else: + msg = "Finished Epoch[{} of {}]: {}loss = {:0.6f} * {} {:0.3f}s ({:5.1f} samples per second);".format( + summaries, self.num_epochs, self.tag, avg_loss, samples, elapsed_seconds, speed) - def update_value(self, name, value, step): + self.___logprint(msg) + + def on_write_test_summary(self, samples, updates, summaries, aggregate_metric, elapsed_milliseconds): + # Override for ProgressWriter.on_write_test_summary. + self.___logprint("Finished Evaluation [{}]: Minibatch[1-{}]: metric = {:0.2f}% * {};".format( + summaries, updates, _avg(aggregate_metric, samples) * 100.0, samples)) + + +class TensorBoardProgressWriter(cntk_py.ProgressWriter): + ''' + Allows tracking various training time statistics (e.g. loss and metric) and write them as TensorBoard event files. + The generated files can be opened in TensorBoard to visualize the progress. + ''' + + def __init__(self, freq=None, log_dir='.', rank=None, model=None): + ''' + Constructor. + + Args: + freq (`int` or `None`, default `None`): frequency at which progress is logged. + For example, the value of 2 will cause the progress to be logged every second time when + `:func:cntk.util.TensorBoardFileWriter.update_with_trainer` is invoked. + None indicates that progress is logged only when + `:func:cntk.util.TensorBoardFileWriter.summarize_progress` is invoked. + Must be a positive integer otherwise. + log_dir (`string`, default '.'): directory where to create a TensorBoard event file. + rank (`int` or `None`, default `None`): rank of a worker when using distributed training, or `None` if + training locally. If not `None`, event files will be created in log_dir/rank[rank] rather than log_dir. + model (:class:`cntk.ops.Function` or `None`, default `None`): model graph to plot. + ''' + if freq is None: + freq = sys.maxsize + + super(TensorBoardProgressWriter, self).__init__(freq, 0, sys.maxsize, 0) + + # Only log either when rank is not specified or when rank is 0. + self.writer = cntk_py.TensorBoardFileWriter(log_dir, model) if not rank else None + self.closed = False + + def write_value(self, name, value, step): ''' - Updates a named value at the given step. + Record value of a scalar variable at the given time step. Args: - name (string): name of the value to update. - value (float): the updated value. - step (int): step at which the value is recorded. + name (`string`): name of a variable. + value (`float`): value of the variable. + step (`int`): time step at which the value is recorded. ''' - if self.tensorboard_writer is not None: - self.tensorboard_writer.write_value(str(name), float(value), int(step)) + if self.closed: + raise RuntimeError('Attempting to use a closed TensorBoardProgressWriter') + + if self.writer: + self.writer.write_value(str(name), float(value), int(step)) + + def flush(self): + '''Make sure that any outstanding records are immediately persisted.''' + if self.closed: + raise RuntimeError('Attempting to use a closed TensorBoardProgressWriter') + + if self.writer: + self.writer.flush() + + def close(self): + ''' + Make sure that any outstanding records are immediately persisted, then close any open files. + Any subsequent attempt to use the object will cause a RuntimeError. + ''' + if self.closed: + raise RuntimeError('Attempting to use a closed TensorBoardProgressWriter') + + if self.writer: + self.writer.close() + self.closed = True + + def on_write_training_update(self, samples, updates, aggregate_loss, aggregate_metric): + # Override for ProgressWriter.on_write_training_update(). + self.write_value('minibatch/avg_loss', _avg(aggregate_loss, samples), self.total_training_updates()) + self.write_value('minibatch/avg_metric', _avg(aggregate_metric, samples), self.total_training_updates()) + + def on_write_test_update(self, samples, updates, aggregate_metric): + # Override for ProgressWriter.on_write_test_update(). + # It is not particularly useful to record per-minibatch test results in TensorBoard, + # hence it is not currently supported. + raise NotImplementedError( + 'TensorBoardProgressWriter does not support recording per-minibatch cross-validation results') + + def on_write_training_summary(self, samples, updates, summaries, aggregate_loss, aggregate_metric, + elapsed_milliseconds): + # Override for BaseProgressWriter.on_write_training_summary(). + self.write_value('summary/avg_loss', _avg(aggregate_loss, samples), summaries) + self.write_value('summary/avg_metric', _avg(aggregate_metric, samples), summaries) + + def on_write_test_summary(self, samples, updates, summaries, aggregate_metric, elapsed_milliseconds): + # Override for BaseProgressWriter.on_write_test_summary(). + avg_metric = _avg(aggregate_metric, samples) + if self.total_training_updates() != 0: + # Record test summary using training minibatches as a step. + # This allows to easier correlate the training and test metric graphs in TensorBoard. + self.write_value('minibatch/test_avg_metric', avg_metric, self.total_training_updates()) + else: + self.write_value('summary/test_avg_metric', avg_metric, self.summaries) # print the total number of parameters to log @@ -325,4 +483,4 @@ def log_number_of_parameters(model, trace_level=0): if trace_level > 0: print() for p in parameters: - print("\t{}".format(p.shape)) + print("\t{}".format(p.shape)) \ No newline at end of file