Skip to content

Commit

Permalink
Modify: modify std::sort to parallelsort using openmp
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Dec 24, 2017
1 parent 3a8c6a8 commit d9d14f1
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 102 deletions.
36 changes: 27 additions & 9 deletions Database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,9 @@ Database::build_s2xx(ID_TUPLE* _p_id_tuples)
{
//NOTICE: STL sort() is generally fatser than C qsort, especially when qsort is very slow
//STL sort() not only use qsort algorithm, it can also choose heap-sort method
sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::spo_cmp_idtuple);
//sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::spo_cmp_idtuple);
omp_set_num_threads(thread_num);
__gnu_parallel::sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::spo_cmp_idtuple);
//qsort(_p_id_tuples, this->triples_num, sizeof(int*), Util::_spo_cmp);
this->kvstore->build_subID2values(_p_id_tuples, this->triples_num);

Expand Down Expand Up @@ -2102,7 +2104,9 @@ Database::build_s2xx(ID_TUPLE* _p_id_tuples)
void
Database::build_o2xx(ID_TUPLE* _p_id_tuples)
{
sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::ops_cmp_idtuple);
//sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::ops_cmp_idtuple);
omp_set_num_threads(thread_num);
__gnu_parallel::sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::ops_cmp_idtuple);
//qsort(_p_id_tuples, this->triples_num, sizeof(int*), Util::_ops_cmp);
this->kvstore->build_objID2values(_p_id_tuples, this->triples_num);

Expand Down Expand Up @@ -2220,7 +2224,9 @@ Database::build_o2xx(ID_TUPLE* _p_id_tuples)
void
Database::build_p2xx(ID_TUPLE* _p_id_tuples)
{
sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::pso_cmp_idtuple);
//sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::pso_cmp_idtuple);
omp_set_num_threads(thread_num);
__gnu_parallel::sort(_p_id_tuples, _p_id_tuples + this->triples_num, Util::pso_cmp_idtuple);
//qsort(_p_id_tuples, this->triples_num, sizeof(int*), Util::_pso_cmp);
this->kvstore->build_preID2values(_p_id_tuples, this->triples_num);
}
Expand Down Expand Up @@ -3398,7 +3404,9 @@ Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
#ifdef DEBUG
cout << "update s2o: " << _sub_id << " " << oidlist_s.size() << endl;
#endif
sort(oidlist_s.begin(), oidlist_s.end());
//sort(oidlist_s.begin(), oidlist_s.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(oidlist_s.begin(), oidlist_s.end());
//this->kvstore->updateInsert_s2o(_sub_id, oidlist_s);
oidlist_s.clear();
}
Expand Down Expand Up @@ -3454,7 +3462,9 @@ Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
#ifdef DEBUG
cout << "update o2s: " << _obj_id << " " << sidlist_o.size() << endl;
#endif
sort(sidlist_o.begin(), sidlist_o.end());
//sort(sidlist_o.begin(), sidlist_o.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(sidlist_o.begin(), sidlist_o.end());
//this->kvstore->updateInsert_o2s(_obj_id, sidlist_o);
sidlist_o.clear();

Expand Down Expand Up @@ -3517,7 +3527,9 @@ Database::insert(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
#ifdef DEBUG
cout << "update p2o: " << _pre_id << " " << oidlist_p.size() << endl;
#endif
sort(oidlist_p.begin(), oidlist_p.end());
//sort(oidlist_p.begin(), oidlist_p.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(oidlist_p.begin(), oidlist_p.end());
//this->kvstore->updateInsert_p2o(_pre_id, oidlist_p);
oidlist_p.clear();

Expand Down Expand Up @@ -3729,7 +3741,9 @@ Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
this->kvstore->updateRemove_s2po(_sub_id, pidoidlist_s);
pidoidlist_s.clear();

sort(oidlist_s.begin(), oidlist_s.end());
//sort(oidlist_s.begin(), oidlist_s.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(oidlist_s.begin(), oidlist_s.end());
this->kvstore->updateRemove_s2o(_sub_id, oidlist_s);
oidlist_s.clear();

Expand Down Expand Up @@ -3802,7 +3816,9 @@ Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,

if (_obj_change)
{
sort(sidlist_o.begin(), sidlist_o.end());
//sort(sidlist_o.begin(), sidlist_o.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(sidlist_o.begin(), sidlist_o.end());
this->kvstore->updateRemove_o2s(_obj_id, sidlist_o);
sidlist_o.clear();
this->kvstore->updateRemove_o2ps(_obj_id, pidsidlist_o);
Expand Down Expand Up @@ -3896,7 +3912,9 @@ Database::remove(const TripleWithObjType* _triples, TYPE_TRIPLE_NUM _triple_num,
this->kvstore->updateRemove_p2s(_pre_id, sidlist_p);
sidlist_p.clear();

sort(oidlist_p.begin(), oidlist_p.end());
//sort(oidlist_p.begin(), oidlist_p.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(oidlist_p.begin(), oidlist_p.end());
this->kvstore->updateRemove_p2o(_pre_id, oidlist_p);
oidlist_p.clear();

Expand Down
4 changes: 3 additions & 1 deletion Query/IDList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ IDList::to_str()
int
IDList::sort()
{
std::sort(id_list.begin(), id_list.end());
//std::sort(id_list.begin(), id_list.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(id_list.begin(), id_list.end());
return 0;
}

Expand Down
15 changes: 9 additions & 6 deletions Query/QueryCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ bool QueryCache::getMinimalRepresentation(const Patterns &triple_pattern, Patter
}
}

sort(temp_repre.begin(), temp_repre.end());

//sort(temp_repre.begin(), temp_repre.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(temp_repre.begin(), temp_repre.end());
if (i == 0)
{
minimal_repre = temp_repre;
Expand Down Expand Up @@ -130,8 +131,9 @@ bool QueryCache::tryCaching(const Patterns &triple_pattern, const TempResult &te
unordered_varset.addVar(minimal_mapping[temp_result.id_varset.vars[i]]);

Varset ordered_varset = unordered_varset;
sort(ordered_varset.vars.begin(), ordered_varset.vars.end());

//sort(ordered_varset.vars.begin(), ordered_varset.vars.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(ordered_varset.vars.begin(), ordered_varset.vars.end());
vector<int> unordered2ordered = unordered_varset.mapTo(ordered_varset);

if (cache.count(minimal_repre) == 0)
Expand Down Expand Up @@ -183,8 +185,9 @@ bool QueryCache::checkCached(const Patterns &triple_pattern, const Varset &varse
unordered_varset.addVar(minimal_mapping[varset.vars[i]]);

Varset ordered_varset = unordered_varset;
sort(ordered_varset.vars.begin(), ordered_varset.vars.end());

//sort(ordered_varset.vars.begin(), ordered_varset.vars.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(ordered_varset.vars.begin(), ordered_varset.vars.end());
vector<int> unordered2ordered = unordered_varset.mapTo(ordered_varset);

if (cache.count(minimal_repre) != 0)
Expand Down
5 changes: 3 additions & 2 deletions StringIndex/StringIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ void StringIndexFile::trySequenceAccess()
{
cout << "sequence access." << endl;

sort(this->request.begin(), this->request.end());

//sort(this->request.begin(), this->request.end());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(this->request.begin(), this->request.end());
int pos = 0;
char *block = new char[MAX_BLOCK_SIZE];

Expand Down
8 changes: 6 additions & 2 deletions Util/Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ Stream::outputCache()
{
//DEBUG1
//sort and output to file
stable_sort(this->tempst.begin(), this->tempst.end(), mycmp);
//stable_sort(this->tempst.begin(), this->tempst.end(), mycmp);
omp_set_num_threads(thread_num);
__gnu_parallel::stable_sort(this->tempst.begin(), this->tempst.end(), mycmp);
unsigned size = this->tempst.size();
for(unsigned i = 0; i < size; ++i)
{
Expand Down Expand Up @@ -468,7 +470,9 @@ Stream::setEnd()
if(this->needSort)
{
//DEBUG2
stable_sort(this->ansMem, this->ansMem + this->rownum, mycmp);
//stable_sort(this->ansMem, this->ansMem + this->rownum, mycmp);
omp_set_num_threads(thread_num);
__gnu_parallel::stable_sort(this->ansMem, this->ansMem + this->rownum, mycmp);
}
return;
}
Expand Down
5 changes: 5 additions & 0 deletions Util/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ in the sparql query can point to the same node in data graph)
#include <random>
#include <type_traits>

//Added for __gnu_parallel::sort
#include <omp.h>
#include <parallel/algorithm>
#define thread_num 10

//NOTICE: hpp is different from static library(*.a) or dynamic library(*.so)
//It places the implementations totally in header file, hpp = *.h + *.cpp

Expand Down
4 changes: 3 additions & 1 deletion VSTree/VSTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,9 @@ VSTree::split(VNode* _p_node_being_split, const SigEntry& _insert_entry, VNode*

//label the child being removed with -1,
//and update the old node's entry.
sort(entryIndex_nearA.begin(), entryIndex_nearA.end(), less<int>());
//sort(entryIndex_nearA.begin(), entryIndex_nearA.end(), less<int>());
omp_set_num_threads(thread_num);
__gnu_parallel::sort(entryIndex_nearA.begin(), entryIndex_nearA.end(), less<int>());

#ifdef DEBUG_VSTREE
//stringstream _ss1;
Expand Down
4 changes: 2 additions & 2 deletions api/http/cpp/example/Benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <cstring>
#include <fstream>
using namespace std;
#define tnum 4000
#define tnum 3000
bool correctness = true;
pthread_mutex_t mutex;

Expand All @@ -33,7 +33,7 @@ void* MyThread_run(void* thread_args)
CHttpClient hc;
string res;
int ret;
ret = hc.Get("http://172.31.222.94:9000/?operation=query&format=json&sparql="+args->sparql,res);
ret = hc.Get("http://172.31.222.78:9000/?operation=query&format=json&sparql="+args->sparql,res);
int m = 0;
for(int i = 0; i<args->sparql.length(); ++i)
{
Expand Down
Loading

0 comments on commit d9d14f1

Please sign in to comment.