Skip to content

Commit

Permalink
ReaderLib: add option to use multiple threads to BlockRandomizer and …
Browse files Browse the repository at this point in the history
…NoRandomizer

Turn on for ImageReader for speed up image decoding / transformation.
  • Loading branch information
mahilleb-msft committed Apr 19, 2016
1 parent 343fba0 commit 079846e
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 57 deletions.
71 changes: 32 additions & 39 deletions Source/Readers/CNTKTextFormatReader/TextParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ void TextParser<ElemType>::TextDataChunk::GetSequence(size_t sequenceId, std::ve
{
auto it = m_sequencePtrMap.find(sequenceId);
assert(it != m_sequencePtrMap.end());
//TODO: Remove pragma once new randomizer is in master.
#pragma omp atomic
++m_sequenceRequestCount;
result.reserve(it->second.size());
copy(it->second.begin(), it->second.end(), back_inserter(result));
Expand All @@ -230,52 +228,47 @@ template <class ElemType>
ChunkPtr TextParser<ElemType>::GetChunk(size_t chunkId)
{
ChunkPtr chunk;
//TODO: Remove pragma once new randomizer is in master.
#pragma omp critical
auto it = m_chunkCache.find(chunkId);
if (it != m_chunkCache.end())
{
auto it = m_chunkCache.find(chunkId);
if (it != m_chunkCache.end())
{
chunk = it->second;
}
else
{
const auto& chunkDescriptor = m_indexer->GetIndex()[chunkId];
auto textChunk = make_shared<TextDataChunk>(chunkDescriptor);
chunk = it->second;
}
else
{
const auto& chunkDescriptor = m_indexer->GetIndex()[chunkId];
auto textChunk = make_shared<TextDataChunk>(chunkDescriptor);

attempt(5, [this, &textChunk, &chunkDescriptor]()
{
LoadChunk(textChunk, chunkDescriptor);
});
attempt(5, [this, &textChunk, &chunkDescriptor]()
{
LoadChunk(textChunk, chunkDescriptor);
});

if (m_chunkCacheSize > 0 && m_chunkCache.size() == m_chunkCacheSize)
if (m_chunkCacheSize > 0 && m_chunkCache.size() == m_chunkCacheSize)
{
size_t candidateId = SIZE_MAX;
size_t minNumSequencesLeft = SIZE_MAX;
for (const auto& it : m_chunkCache)
{
size_t candidateId = SIZE_MAX;
size_t minNumSequencesLeft = SIZE_MAX;
for (const auto& it : m_chunkCache)
const auto& chunk = *(it.second.get());
size_t numSequencesUsed = 0;
numSequencesUsed += chunk.m_sequenceRequestCount;
size_t numSequencesLeft = chunk.m_sequences.size() - numSequencesUsed;
if (numSequencesLeft < minNumSequencesLeft)
{
const auto& chunk = *(it.second.get());
size_t numSequencesUsed = 0;
#pragma omp atomic
numSequencesUsed += chunk.m_sequenceRequestCount;
size_t numSequencesLeft = chunk.m_sequences.size() - numSequencesUsed;
if (numSequencesLeft < minNumSequencesLeft)
{
minNumSequencesLeft = numSequencesLeft;
candidateId = it.first;
}
minNumSequencesLeft = numSequencesLeft;
candidateId = it.first;
}
assert(candidateId != SIZE_MAX);
m_chunkCache.erase(candidateId);
}

if (m_chunkCacheSize > 0)
{
m_chunkCache[chunkId] = textChunk;
}
assert(candidateId != SIZE_MAX);
m_chunkCache.erase(candidateId);
}

chunk = textChunk;
if (m_chunkCacheSize > 0)
{
m_chunkCache[chunkId] = textChunk;
}

chunk = textChunk;
}
return chunk;
}
Expand Down
7 changes: 5 additions & 2 deletions Source/Readers/ImageReader/ImageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ ImageReader::ImageReader(MemoryProviderPtr provider,
auto deserializer = std::make_shared<ImageDataDeserializer>(config);

TransformerPtr randomizer;
// Request multi-threaded randomizer operation to speed up CPU-intensive image-decoding and transformations.
const bool multithreadedGetNextSequences = true;
if (configHelper.ShouldRandomize())
{
randomizer = std::make_shared<BlockRandomizer>(0, 1, deserializer, BlockRandomizer::DecimationMode::sequence, false);
bool useLegacyRandomization = false;
randomizer = std::make_shared<BlockRandomizer>(0, 1, deserializer, BlockRandomizer::DecimationMode::sequence, useLegacyRandomization, multithreadedGetNextSequences);
}
else
{
randomizer = std::make_shared<NoRandomizer>(deserializer);
randomizer = std::make_shared<NoRandomizer>(deserializer, multithreadedGetNextSequences);
}

randomizer->Initialize(nullptr, config);
Expand Down
27 changes: 19 additions & 8 deletions Source/Readers/ReaderLib/BlockRandomizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ BlockRandomizer::BlockRandomizer(
size_t randomizationRangeInSamples,
IDataDeserializerPtr deserializer,
DecimationMode decimationMode,
bool useLegacyRandomization)
bool useLegacyRandomization,
bool multithreadedGetNextSequence)
: m_verbosity(verbosity),
m_deserializer(deserializer),
m_decimationMode(decimationMode),
Expand All @@ -31,7 +32,8 @@ BlockRandomizer::BlockRandomizer(
m_epochStartPosition(0),
m_sweepTotalNumberOfSamples(0),
m_lastSeenChunkId(SIZE_MAX),
m_chunkRandomizer(std::make_shared<ChunkRandomizer>(deserializer, randomizationRangeInSamples, useLegacyRandomization))
m_chunkRandomizer(std::make_shared<ChunkRandomizer>(deserializer, randomizationRangeInSamples, useLegacyRandomization)),
m_multithreadedGetNextSequences(multithreadedGetNextSequence)
{
assert(deserializer != nullptr);

Expand Down Expand Up @@ -116,11 +118,7 @@ Sequences BlockRandomizer::GetNextSequences(size_t sampleCount)

result.m_data.resize(m_streams.size(), std::vector<SequenceDataPtr>(decimated.size()));

// TODO: This will be changed, when we move transformers under the randomizer.
// TODO: Randomizer won't should not deal with multithreading.
#pragma omp parallel for ordered schedule(dynamic)
for (int i = 0; i < decimated.size(); ++i)
{
auto process = [&](int i) -> void {
const auto& description = decimated[i];
std::vector<SequenceDataPtr> sequence;
auto it = m_chunks.find(description.m_chunk->m_chunkId);
Expand All @@ -134,6 +132,19 @@ Sequences BlockRandomizer::GetNextSequences(size_t sampleCount)
{
result.m_data[j][i] = sequence[j];
}
};

// TODO: This will be changed, when we move transformers under the randomizer, should not deal with multithreading here.
if (m_multithreadedGetNextSequences)
{
#pragma omp parallel for ordered schedule(dynamic)
for (int i = 0; i < decimated.size(); ++i)
process(i);
}
else
{
for (int i = 0; i < decimated.size(); ++i)
process(i);
}

m_sequenceRandomizer->ReleaseChunks();
Expand Down Expand Up @@ -214,7 +225,7 @@ void BlockRandomizer::RetrieveDataChunks()
m_lastSeenChunkId = window.back().m_chunkId;

// in the loop we are building a new map of currently loaded chunks:
// we are iterating thru all chunks in the window and if they are not in m_chunks map -
// we are iterating thru all chunks in the window and if they are not in m_chunks map -
// they get requested from the deserializer.
// There could be some chunks in the m_chunks that are not required anymore, by swapping the chunks with m_chunks, we are removing those.
std::map<size_t, ChunkPtr> chunks;
Expand Down
11 changes: 8 additions & 3 deletions Source/Readers/ReaderLib/BlockRandomizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ namespace Microsoft { namespace MSR { namespace CNTK {
// The code is based on the old block randomizer and it preserves the same behavior to pass all available tests.
// The high-level algorithm is:
// When next sequences are requested (limited by the sampleCount), the following steps are performed:
// 1) if a new sweep is entered, randomize chunk descriptions using ChunkRandomizer, also precalculate randomization windows for all
// 1) if a new sweep is entered, randomize chunk descriptions using ChunkRandomizer, also precalculate randomization windows for all
// chunk descriptions
// 2) if a new chunk is entered, using SequenceRandomizer identify a window of chunks and requested their sequence descriptions from deserializer.
// 3) randomize sequence descriptions inside the window
// 3) randomize sequence descriptions inside the window
// 4) return sequence descriptions not exceeding sampleCount/minibatch limit
// 5) decimate sequence descriptions based on the worker rank
// 6) request chunks of data based on decimated sequences and return sequence data
Expand All @@ -47,7 +47,8 @@ class BlockRandomizer : public Transformer
size_t randomizationRangeInSamples,
IDataDeserializerPtr deserializer,
DecimationMode decimationMode = DecimationMode::chunk,
bool useLegacyRandomization = false);
bool useLegacyRandomization = false,
bool multithreadedGetNextSequences = false);

virtual void Initialize(TransformerPtr, const ConfigParameters&) override {};

Expand Down Expand Up @@ -118,6 +119,10 @@ class BlockRandomizer : public Transformer
// Decimation mode.
DecimationMode m_decimationMode;

// Whether to get sequences using multiple thread.
// TODO temporary; should go away when transformers are moved closer to the deserializer
bool m_multithreadedGetNextSequences;

// General configuration
int m_verbosity;
};
Expand Down
22 changes: 18 additions & 4 deletions Source/Readers/ReaderLib/NoRandomizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

namespace Microsoft { namespace MSR { namespace CNTK {

NoRandomizer::NoRandomizer(IDataDeserializerPtr deserializer)
NoRandomizer::NoRandomizer(IDataDeserializerPtr deserializer, bool multithreadedGetNextSequences)
: m_deserializer(deserializer),
m_samplePositionInEpoch(0),
m_currentChunkPosition(SIZE_MAX),
m_globalSamplePosition(0),
m_totalNumberOfSamples(0),
m_currentSequencePositionInChunk(0)
m_currentSequencePositionInChunk(0),
m_multithreadedGetNextSequences(multithreadedGetNextSequences)
{
assert(deserializer != nullptr);
m_streams = m_deserializer->GetStreamDescriptions();
Expand Down Expand Up @@ -172,8 +173,8 @@ Sequences NoRandomizer::GetNextSequences(size_t sampleCount)
}

result.m_data.resize(m_streams.size(), std::vector<SequenceDataPtr>(subsetSize));
for (int i = 0; i < subsetSize; ++i)
{

auto process = [&](int i) -> void {
std::vector<SequenceDataPtr> sequence;
const auto& sequenceDescription = descriptions[start + i];
if (sequenceDescription.m_chunkId != m_currentChunkId)
Expand All @@ -187,6 +188,19 @@ Sequences NoRandomizer::GetNextSequences(size_t sampleCount)
{
result.m_data[j][i] = sequence[j];
}
};

// TODO: This will be changed, when we move transformers under the (no-) randomizer, should not deal with multithreading here.
if (m_multithreadedGetNextSequences)
{
#pragma omp parallel for ordered schedule(dynamic)
for (int i = 0; i < subsetSize; ++i)
process(i);
}
else
{
for (int i = 0; i < subsetSize; ++i)
process(i);
}

return result;
Expand Down
6 changes: 5 additions & 1 deletion Source/Readers/ReaderLib/NoRandomizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Microsoft { namespace MSR { namespace CNTK {
class NoRandomizer : public Transformer
{
public:
NoRandomizer(IDataDeserializerPtr deserializer);
NoRandomizer(IDataDeserializerPtr deserializer, bool multithreadedGetNextSequences = false);

virtual void Initialize(TransformerPtr next, const ConfigParameters& readerConfig) override;
virtual void StartEpoch(const EpochConfiguration& config) override;
Expand All @@ -43,6 +43,10 @@ class NoRandomizer : public Transformer

IDataDeserializerPtr m_deserializer;

// Whether to get sequences using multiple thread.
// TODO temporary; should go away when transformers are moved closer to the deserializer
bool m_multithreadedGetNextSequences;

// Stream descriptions
std::vector<StreamDescriptionPtr> m_streams;

Expand Down

0 comments on commit 079846e

Please sign in to comment.