Skip to content

Commit

Permalink
Dist mint setup cache lists
Browse files Browse the repository at this point in the history
  • Loading branch information
jayashreemohan29 committed Apr 12, 2020
1 parent 1862acb commit d009873
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
50 changes: 47 additions & 3 deletions dali/operators/reader/loader/file_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,45 @@ class FileLoader : public Loader<CPUBackend, ImageLabelWrapper> {
// Create a shuffled list for caching
// Sort it so that search becomes easier
if (!caching_done_ && cache_size_ > 0){
outfile << "Seed is " << seed_ << endl;
outfile << "Seed is " << shuffle_seed_ << endl;

//Get the cache list for other nodes
if ( num_nodes_ > 1) {
shm_cache_index_list_other_nodes.resize(num_nodes_);
for (int nid = 0; nid < num_nodes_; nid ++ ){
if (nid == node_id_){
// We are in the current node; do nothing
continue;
}
vector<int> nid_list = shm_cache_index_list_other_nodes[nid];
outfile << "For node " << nid << std::endl;
// Resize list to the total size of shards in this node
//nid_list.resize(Size()/num_nodes_);
std::mt19937 gen(shuffle_seed_);
for (int sh = 0; sh < num_shards_per_node_; sh ++){
Index shard_start_idx = start_index(sh*node_id_ + num_shards_per_node_, num_shards_, Size());
Index shard_end_idx = shard_start_idx + Size()/num_shards_;
Index shard_size = shard_end_idx - shard_start_idx;
vector<int> cache_list_per_shard(shard_size);
outfile << "\tShard " << sh*node_id_ + num_shards_per_node_ << ", size " << shard_size << std::endl;
outfile << "\t\t Index begin " << shard_start_idx << ", index end " << shard_end_idx << std::endl;
std::iota(cache_list_per_shard.begin(), cache_list_per_shard.end(), shard_start_idx);
std::shuffle(cache_list_per_shard.begin(), cache_list_per_shard.end(), gen);
cache_list_per_shard.resize(cache_size_);
nid_list.insert(nid_list.end(), cache_list_per_shard.begin(), cache_list_per_shard.end());
}
std::sort (nid_list.begin(), nid_list.end());
}
}
for (int nid = 0; nid < num_nodes_; nid ++){
outfile << "For node " << nid << endl;
if (shm_cache_index_list_other_nodes[nid].size() > 0){
for (int i = 0; i < static_cast<int>(shm_cache_index_list_other_nodes[nid].size()); i++)
outfile << "\t" << i << " : " << shm_cache_index_list_other_nodes[nid][i] << std::endl;
}
}
//shm_cache_index_list_.resize(cache_size_);
std::mt19937 gen(seed_);
std::mt19937 gen(shuffle_seed_);
//std::uniform_int_distribution<int> distr(index_start_, index_end_);
//std::generate(shm_cache_index_list_.begin(), shm_cache_index_list_.end(), [&](){ return distr(gen); });
shm_cache_index_list_.resize(Size()/num_shards_);
Expand All @@ -165,7 +201,7 @@ class FileLoader : public Loader<CPUBackend, ImageLabelWrapper> {
shm_cache_index_list_.resize(cache_size_);
std::sort (shm_cache_index_list_.begin(), shm_cache_index_list_.end());

outfile << "Index list to cache : " << endl;
outfile << "Index list to cache for this shard : " << endl;
for (int i = 0; i < static_cast<int>(shm_cache_index_list_.size()); i++)
outfile << i << " : " << shm_cache_index_list_[i] << std::endl;
}
Expand All @@ -183,8 +219,11 @@ class FileLoader : public Loader<CPUBackend, ImageLabelWrapper> {
using Loader<CPUBackend, ImageLabelWrapper>::shuffle_seed_;
using Loader<CPUBackend, ImageLabelWrapper>::cache_size_;
using Loader<CPUBackend, ImageLabelWrapper>::num_shards_;
using Loader<CPUBackend, ImageLabelWrapper>::num_nodes_;
using Loader<CPUBackend, ImageLabelWrapper>::node_id_;
using Loader<CPUBackend, ImageLabelWrapper>::seed_;
using Loader<CPUBackend, ImageLabelWrapper>::outfile;
using Loader<CPUBackend, ImageLabelWrapper>::num_shards_per_node_;

string file_root_, file_list_;

Expand All @@ -196,7 +235,12 @@ class FileLoader : public Loader<CPUBackend, ImageLabelWrapper> {
bool caching_done_;
int index_start_;
int index_end_;
//int num_shards_per_node_ = num_shards_ / num_nodes_;
//vector<int> current_shards_;
//vector<int> current_shards_(num_shards_per_node_);
//std::iota(current_shards_.begin(), current_shards_.end(), node_id_*num_shards_per_node_);
vector<int> shm_cache_index_list_;
vector<vector<int>> shm_cache_index_list_other_nodes;
vector<std::string> shm_cached_items_;
FileStream::FileStreamMappinReserver mmap_reserver;
};
Expand Down
12 changes: 12 additions & 0 deletions dali/operators/reader/loader/loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ to sequentially read data and then randomly sample it to form a batch.)code", fa
this parameter is ignored.)code", 1024)
.AddOptionalArg("num_shards",
R"code(Partition the data into this many parts (used for multiGPU training).)code", 1)
.AddOptionalArg("num_nodes",
R"code(Number of physical nodes involved in multi GPU training).)code", 1)
.AddOptionalArg("node_id",
R"code(ID of the node in multi GPU training).)code", 0)
.AddOptionalArg("shard_id",
R"code(Id of the part to read.)code", 0)
.AddOptionalArg("cache_size",
Expand Down Expand Up @@ -63,6 +67,14 @@ size_t start_index(const size_t shard_id,
return size * shard_id / shard_num;
}


size_t start_index_for_node(const size_t node_id,
const size_t num_nodes,
const size_t size) {
return size * node_id / num_nodes;
}


Index num_samples(const size_t shard_num,
const size_t size) {
return static_cast<Index>(std::ceil(size * 1.0 / shard_num));
Expand Down
7 changes: 6 additions & 1 deletion dali/operators/reader/loader/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class Loader {
shuffle_seed_(options.GetArgument<int>("shuffle_seed")),
shard_id_(options.GetArgument<int>("shard_id")),
num_shards_(options.GetArgument<int>("num_shards")),
num_nodes_(options.GetArgument<int>("num_nodes")),
node_id_(options.GetArgument<int>("node_id")),
copy_read_data_(false),
read_ahead_(options.GetArgument<bool>("read_ahead")),
stick_to_shard_(options.GetArgument<bool>("stick_to_shard")),
Expand Down Expand Up @@ -345,6 +347,8 @@ class Loader {
// sharding
const int shard_id_;
const int num_shards_;
const int num_nodes_;
const int node_id_;

// if read data need to be copied or can be just shared with tensor
bool copy_read_data_;
Expand All @@ -366,7 +370,7 @@ class Loader {
bool lazy_init_;
bool loading_flag_;

// Image cache
// Image cachen
std::once_flag fetch_cache_;
std::shared_ptr<ImageCache> cache_;

Expand All @@ -388,6 +392,7 @@ class Loader {
};

std::deque<ShardBoundaries> shards_;
const int num_shards_per_node_ = num_shards_ / num_nodes_;
};

template<typename T, typename... Args>
Expand Down

0 comments on commit d009873

Please sign in to comment.