Skip to content

Commit

Permalink
WORKING VERSION 1.0
Browse files Browse the repository at this point in the history
Eddajeeeeee
  • Loading branch information
matteomedioli authored Jan 12, 2020
1 parent c791107 commit 2608773
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 38 deletions.
3 changes: 1 addition & 2 deletions include/barrier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ class Barrier
std::condition_variable barrier_cv ;
std::mutex barrier_mutex;
public:
Barrier(int workers, std::string name);
Barrier(int workers);
~Barrier() noexcept;
void wait();
std::string get_name();
};

#endif
1 change: 1 addition & 0 deletions include/superstep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SuperStep {
void communication(std::function<F(Args...)> b, std::vector<std::pair<int,std::vector<int>>> protocol);
void set_barrier(std::shared_ptr<Barrier> b);
void sync();
std::vector<T> get_results();
};

#endif
7 changes: 1 addition & 6 deletions src/barrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "../include/barrier.hpp"


Barrier::Barrier(int workers, std::string n):active_workers(workers),name(n)
Barrier::Barrier(int workers):active_workers(workers)
{
assert(0 != active_workers);
}
Expand All @@ -32,8 +32,3 @@ void Barrier::wait()
barrier_cv.wait(lock, [this]() { return 0 == active_workers; });
}
}

std::string Barrier::get_name()
{
return name;
}
145 changes: 118 additions & 27 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ int main()
//std::vector<int> data_vector = generate_data(n);
std::vector<int> data_vector{21,18,16,1,3,20,2,10,15,4,17,5,9,19,6,11,14,7,12,8,13};

/* DEFINE COMPUTATION BARRIER AND BODY COMPUTATION THREAD */
/* COMPUTE TSEQ */
{
std::vector<int> seq=generate_data(n);
Utimer t("T_SEQ: ");
std::sort(seq.begin(), seq.end(), std::less<int>());
}


/* DEFINE COMPUTATION BODY THREAD */
std::function<std::vector<int>(std::vector<int>)> sort_and_separators = [nw](std::vector<int> data)
{
std::sort(data.begin(), data.end(), std::less<int>());
Expand All @@ -47,56 +54,140 @@ int main()
return sample;
};

/* DEFINE COMPUTATION BODY THREAD */
std::function<std::vector<int>(std::vector<int>)> sort = [](std::vector<int> data)
{
std::sort(data.begin(), data.end(), std::less<int>());
return data;
};

/* DEFINE COMMUNICATION BARRIER AND BODY COMMUNICATION THREAD */

std::function<std::vector<int>(std::vector<int>)> send = [](std::vector<int> data)
/* DEFINE COMMUNICATION BODY THREAD */
std::function<std::vector<int>(std::vector<int>)> void_comp = [](std::vector<int> data)
{
return data;
};
/* DEFINE COMMUNICATION BODY THREAD */
std::function<std::vector<int>(std::vector<int>, int dest)> void_comm = [](std::vector<int> data, int dest)
{
return data;
};


/* COMPUTE TSEQ */
std::function<std::vector<int>(std::vector<int>,int)> distribute_by_bound = [sort_and_separators,nw](std::vector<int> data, int dest)
{
std::vector<int> seq=generate_data(n);
Utimer t("T_SEQ: ");
std::sort(seq.begin(), seq.end(), std::less<int>());
std::vector<int> filtered;
std::vector<int> boundaries=sort_and_separators(data);
int infer = boundaries[dest];
int super = boundaries[dest+1];
if (dest!=nw-1)
std::copy_if (data.begin(), data.end(), std::back_inserter(filtered),[infer,super](int i){return infer<=i && i<super;});
else
std::copy_if (data.begin(), data.end(), std::back_inserter(filtered),[infer,super](int i){return infer<=i && i<=super;});
return filtered;
};

/* COMMUNICATION PROTOCOLS */
std::vector<std::pair<int,std::vector<int>>> to_itself;
for(int i=0; i<nw; i++)
{
std::vector<int> d{i};
to_itself.push_back(std::make_pair(i,d));
}

std::vector<std::pair<int,std::vector<int>>> protocol;
for(int i=0; i<nw; i++)
{
std::vector<int> d{i};
protocol.push_back(std::make_pair(i,d));
}

std::shared_ptr<Barrier> computation(new Barrier(nw+1,"COMPUTATION_1"));
std::shared_ptr<Barrier> communication(new Barrier(nw+1, "COMMUNICATION_1"));
/* COMMUNICATION PROTOCOLS */
std::vector<std::pair<int,std::vector<int>>> to_all;
std::vector<int> d;
for(int i=0; i<nw; i++)
d.push_back(i);
for(int i=0; i<nw; i++)
to_all.push_back(std::make_pair(i,d));


/* BARRIER */
std::shared_ptr<Barrier> barrier(new Barrier(nw+1));


/* SUPERSTEP 1 */
std::cout<<std::endl;
SuperStep<int> s1(nw, data_vector);
//S1 COMPUTATION PHASE
s1.set_barrier(computation);
s1.set_barrier(barrier);
{
Utimer t("COMP_S1:");
s1.computation(sort_and_separators,true);
s1.sync();
}

//RESET BARRIER FOR COMMUNICATION
barrier.reset(new Barrier(nw+1));
//S1 COMMUNICATION PHASE
s1.set_barrier(communication);
s1.set_barrier(barrier);
{
Utimer t("COMM_S1:");
s1.communication(send,protocol);
s1.communication(void_comm,to_itself);
s1.sync();
}
barrier.reset(new Barrier(nw+1));

computation.reset(new Barrier(nw+1,"COMPUTATION_2"));
communication.reset(new Barrier(nw+1,"COMMUNICATION_2"));
for (int i=0;i<nw;i++)
{
for(auto a: s1.get_output(i)->get_vector())
std::cout<<a<< " ";
for(auto a:s1.get_results())
std::cout<<a<<" ";
std::cout<<std::endl;


/* SUPERSTEP 2 */
std::cout<<std::endl;
SuperStep<int> s2(nw, s1.get_results());
//S1 COMPUTATION PHASE
s2.set_barrier(barrier);
{
Utimer t("COMP_S2:");
s2.computation(void_comp,false);
s2.sync();
}

//RESET BARRIER FOR COMMUNICATION
barrier.reset(new Barrier(nw+1));
//S1 COMMUNICATION PHASE
s2.set_barrier(barrier);
{
Utimer t("COMM_S2:");
s2.communication(distribute_by_bound,to_itself);
s2.sync();
}

barrier.reset(new Barrier(nw+1));

for(auto a:s2.get_results())
std::cout<<a<<" ";
std::cout<<std::endl;

/* SUPERSTEP 3 */
std::cout<<std::endl;
SuperStep<int> s3(nw, s2.get_results());
//S1 COMPUTATION PHASE
s3.set_barrier(barrier);
{
Utimer t("COMP_S3:");
s3.computation(sort,true);
s3.sync();
}

//RESET BARRIER FOR COMMUNICATION
barrier.reset(new Barrier(nw+1));
//S1 COMMUNICATION PHASE
s3.set_barrier(barrier);
{
Utimer t("COMM_S3:");
s3.communication(void_comm,to_itself);
s3.sync();
}

barrier.reset(new Barrier(nw+1));
std::vector<int> result = s3.get_results();

for(auto a:result)
std::cout<<a<<" ";
std::cout<<std::endl;

}

}
10 changes: 9 additions & 1 deletion src/superstep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ std::vector<T> SuperStep<T>::get_input()
template<typename T>
void SuperStep<T>::set_barrier(std::shared_ptr<Barrier> b)
{

barrier=b;
}

Expand Down Expand Up @@ -84,3 +83,12 @@ void SuperStep<T>::sync()
}


template<typename T>
std::vector<T> SuperStep<T>::get_results()
{
std::vector<T> result;
for(int i=0; i<nw; i++)
result.insert(std::end(result), std::begin(output[i]->get_vector()), std::end(output[i]->get_vector()));
return result;
}

4 changes: 2 additions & 2 deletions src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ void Worker<T>::send(std::function<F(Args...)> body, std::vector<std::pair<int,s
{
auto it = std::find_if( protocol.begin(), protocol.end(),[this](const std::pair<int, std::vector<int>>& element){return element.first == id;});
std::vector<int> destination = it->second;
for(auto des : destination)
ss->get_output(des)->append(body(output));
for(auto dest : destination)
ss->get_output(dest)->append(body(output,dest));
ss->get_barrier()->wait();
}
};
Expand Down

0 comments on commit 2608773

Please sign in to comment.