Skip to content

Commit 7d335ba

Browse files
author
tanabe takayuki
committed
update about backoff and maintain codes.
1 parent 8c3a838 commit 7d335ba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+438
-263
lines changed

cicada/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PROG1 = cicada.exe
2-
CICADA_SRCS1 := cicada.cc transaction.cc util.cc
2+
CICADA_SRCS1 := cicada.cc transaction.cc util.cc result.cc
33

44
REL := ../common/
55
include $(REL)Makefile

cicada/cicada.cc

+39-17
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@
2727
#include "../include/util.hh"
2828
#include "../include/zipf.hh"
2929
#include "include/common.hh"
30+
#include "include/result.hh"
3031
#include "include/transaction.hh"
3132
#include "include/util.hh"
3233

3334
using namespace std;
3435

35-
void worker(size_t thid, char& ready, const bool& start, const bool& quit,
36-
std::vector<Result>& res) {
36+
void worker(size_t thid, char& ready, const bool& start, const bool& quit) {
3737
Xoroshiro128Plus rnd;
3838
rnd.init();
39-
TxExecutor trans(thid, (Result*)&res[thid]);
40-
Result& myres = std::ref(res[thid]);
39+
TxExecutor trans(thid, (Result*)&CicadaResult[thid]);
40+
Result& myres = std::ref(CicadaResult[thid]);
4141
FastZipf zipf(&rnd, ZIPF_SKEW, TUPLE_NUM);
4242
Backoff backoff(CLOCKS_PER_US);
4343

@@ -80,7 +80,12 @@ void worker(size_t thid, char& ready, const bool& start, const bool& quit,
8080
#endif
8181

8282
RETRY:
83-
if (thid == 0) leaderWork(std::ref(backoff), std::ref(res));
83+
if (thid == 0) {
84+
leaderWork(std::ref(backoff));
85+
#if BACK_OFF
86+
leaderBackoffWork(backoff, CicadaResult);
87+
#endif
88+
}
8489
if (loadAcquire(quit)) break;
8590

8691
trans.tbegin();
@@ -117,9 +122,16 @@ void worker(size_t thid, char& ready, const bool& start, const bool& quit,
117122
// write phase execute logging and commit pending versions, but r-only tx
118123
// can skip it.
119124
if ((*trans.pro_set_.begin()).ronly_) {
120-
++res[thid].local_commit_counts_;
125+
/**
126+
* local_commit_counts is used at ../include/backoff.hh to calcurate about
127+
* backoff.
128+
*/
129+
storeRelease(myres.local_commit_counts_,
130+
loadAcquire(myres.local_commit_counts_) + 1);
121131
} else {
122-
// Validation phase
132+
/**
133+
* Validation phase
134+
*/
123135
if (!trans.validation()) {
124136
trans.abort();
125137
#if SINGLE_EXEC
@@ -129,13 +141,23 @@ void worker(size_t thid, char& ready, const bool& start, const bool& quit,
129141
goto RETRY;
130142
}
131143

132-
// Write phase
144+
/**
145+
* Write phase
146+
*/
133147
trans.writePhase();
134-
135-
// Maintenance
136-
// Schedule garbage collection
137-
// Declare quiescent state
138-
// Collect garbage created by prior transactions
148+
/**
149+
* local_commit_counts is used at ../include/backoff.hh to calcurate about
150+
* backoff.
151+
*/
152+
storeRelease(myres.local_commit_counts_,
153+
loadAcquire(myres.local_commit_counts_) + 1);
154+
155+
/**
156+
* Maintenance
157+
* Schedule garbage collection
158+
* Declare quiescent state
159+
* Collect garbage created by prior transactions
160+
*/
139161
#if SINGLE_EXEC
140162
#else
141163
trans.mainte();
@@ -154,12 +176,12 @@ int main(int argc, char* argv[]) try {
154176

155177
alignas(CACHE_LINE_SIZE) bool start = false;
156178
alignas(CACHE_LINE_SIZE) bool quit = false;
157-
alignas(CACHE_LINE_SIZE) std::vector<Result> res(THREAD_NUM);
179+
initResult();
158180
std::vector<char> readys(THREAD_NUM);
159181
std::vector<std::thread> thv;
160182
for (size_t i = 0; i < THREAD_NUM; ++i)
161183
thv.emplace_back(worker, i, std::ref(readys[i]), std::ref(start),
162-
std::ref(quit), std::ref(res));
184+
std::ref(quit));
163185
waitForReady(readys);
164186
storeRelease(start, true);
165187
for (size_t i = 0; i < EXTIME; ++i) {
@@ -169,10 +191,10 @@ int main(int argc, char* argv[]) try {
169191
for (auto& th : thv) th.join();
170192

171193
for (unsigned int i = 0; i < THREAD_NUM; ++i) {
172-
res[0].addLocalAllResult(res[i]);
194+
CicadaResult[0].addLocalAllResult(CicadaResult[i]);
173195
}
174196
ShowOptParameters();
175-
res[0].displayAllResult(CLOCKS_PER_US, EXTIME, THREAD_NUM);
197+
CicadaResult[0].displayAllResult(CLOCKS_PER_US, EXTIME, THREAD_NUM);
176198
deleteDB();
177199

178200
return 0;

cicada/include/common.hh

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ GLOBAL size_t PRE_RESERVE_VERSION;
5050
/**
5151
* worker 1 insert delay in the end of read phase[us].
5252
*/
53-
GLOBAL size_t WORKER1_INSERT_DELAY_RPHASE_US;
53+
GLOBAL size_t WORKER1_INSERT_DELAY_RPHASE_US;
5454
GLOBAL size_t EXTIME;
5555

5656
alignas(CACHE_LINE_SIZE) GLOBAL uint64_t_64byte *ThreadWtsArray;

cicada/include/result.hh

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <vector>
4+
5+
#include "../../include/result.hh"
6+
7+
extern std::vector<Result> CicadaResult;
8+
9+
extern void initResult();

cicada/include/transaction.hh

+1-3
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class TxExecutor {
100100
void displayWriteSet();
101101
void earlyAbort();
102102
void mainte(); // maintenance
103-
void gcpv(); // group commit pending versions
103+
void gcpv(); // group commit pending versions
104104
void precpv(); // pre-commit pending versions
105105
void pwal(); // parallel write ahead log.
106106
void swal();
@@ -148,7 +148,6 @@ class TxExecutor {
148148
}
149149
}
150150

151-
152151
#if INLINE_VERSION_OPT
153152
#if INLINE_VERSION_PROMOTION
154153
void inlineVersionPromotion(const uint64_t key, Tuple* tuple,
@@ -167,7 +166,6 @@ class TxExecutor {
167166
#endif
168167
#endif
169168

170-
171169
Version* newVersionGeneration([[maybe_unused]] Tuple* tuple) {
172170
#if INLINE_VERSION_OPT
173171
if (tuple->getInlineVersionRight()) {

cicada/include/util.hh

+5-3
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ extern void displayThreadRtsArray();
2020

2121
extern void displaySLogSet();
2222

23-
extern void leaderWork([[maybe_unused]] Backoff &backoff, [[maybe_unused]] std::vector<Result> &res);
23+
extern void leaderWork([[maybe_unused]] Backoff &backoff);
2424

2525
extern void makeDB(uint64_t *initial_wts);
2626

27-
extern void partTableDelete([[maybe_unused]] size_t thid, uint64_t start, uint64_t end);
27+
extern void partTableDelete([[maybe_unused]] size_t thid, uint64_t start,
28+
uint64_t end);
2829

29-
extern void partTableInit([[maybe_unused]] size_t thid, uint64_t initts, uint64_t start, uint64_t end);
30+
extern void partTableInit([[maybe_unused]] size_t thid, uint64_t initts,
31+
uint64_t start, uint64_t end);
3032

3133
extern void ShowOptParameters();

cicada/result.cc

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#include "include/result.hh"
2+
#include "include/common.hh"
3+
4+
#include "../include/cache_line_size.hh"
5+
#include "../include/result.hh"
6+
7+
using namespace std;
8+
9+
alignas(CACHE_LINE_SIZE) std::vector<Result> CicadaResult;
10+
11+
void initResult() { CicadaResult.resize(THREAD_NUM); }

cicada/transaction.cc

+17-13
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,17 @@ void TxExecutor::twrite(const uint64_t key) {
165165
if (re) {
166166
rmw = true;
167167
tuple = re->rcdptr_;
168-
/* 現時点で,シンプルな改造で re->later_ver_
168+
/* 現時点で,シンプルな改造で re->later_ver_
169169
* 情報を受け取って利用することは難しい.
170170
* 何故なら,read only tx で発生した inline version promotion による
171171
* write であれば,read-only 専用のタイムスタンプによるサーチにおいて
172172
* later_ver であるが,write の later_ver には不適格.
173173
* ここを考慮してゴニョゴニョしても,利益が小さくて性能が上がらなかった.*/
174174
/* Now, it is difficult to use re->later_ver_ by simple customize.
175-
* Because if this write is from inline version promotion which is from read only tx,
176-
* the re->later_ver is suitable for read only search by read only timestamp.
177-
* Of course, it is unsuitable for search for write.
178-
* It is high cost to consider these things.
175+
* Because if this write is from inline version promotion which is from read
176+
* only tx, the re->later_ver is suitable for read only search by read only
177+
* timestamp. Of course, it is unsuitable for search for write. It is high
178+
* cost to consider these things.
179179
* I try many somethings but it can't improve performance. cost > profit.*/
180180
} else {
181181
#if MASSTREE_USE
@@ -321,8 +321,10 @@ bool TxExecutor::validation() {
321321
;
322322
}
323323
/* この部分は,オリジナル手法と異なる.
324-
* 現在 view となるバージョンが,read operation 時の view と同一かをチェックしている.
325-
* オリジナル手法はこれがないため,この実装より性能は良いが view が壊れている.*/
324+
* 現在 view となるバージョンが,read operation 時の view
325+
* と同一かをチェックしている.
326+
* オリジナル手法はこれがないため,この実装より性能は良いが view
327+
* が壊れている.*/
326328
if ((*itr).ver_ != ver) {
327329
result = false;
328330
goto FINISH_VALIDATION;
@@ -442,10 +444,13 @@ inline void TxExecutor::cpv() // commit pending versions
442444
* 従って, commit 確定前に書く場合,read phase 区間が伸びることで,
443445
* updater に割り込まれやすくなって read validation failure 頻度が高くなり,
444446
* 性能劣化する性質と,逆に read validation failure 頻度が高くなることで
445-
* committed updater が減少し,競合が減少して性能向上という複雑な二面性を持つ.
446-
* 確定後に書く気持ちは,read phase 区間をなるべく短くして,read validation failure
447+
* committed updater
448+
* が減少し,競合が減少して性能向上という複雑な二面性を持つ.
449+
* 確定後に書く気持ちは,read phase 区間をなるべく短くして,read validation
450+
* failure
447451
* を起こしにくくして性能向上させる.しかし,この場合は区間がオーバーラップする
448-
* tx が増えるため競合増加して性能劣化するかもしれないという複雑な二面性を持つ.
452+
* tx
453+
* が増えるため競合増加して性能劣化するかもしれないという複雑な二面性を持つ.
449454
* 両方試してみた結果,特に変わらなかったため,確定後に書く.*/
450455
memcpy((*itr).new_ver_->val_, write_val_, VAL_SIZE);
451456
#if SINGLE_EXEC
@@ -597,8 +602,8 @@ void TxExecutor::mainte() {
597602
}
598603

599604
this->gcstop_ = rdtscp();
600-
if (chkClkSpan(this->gcstart_, this->gcstop_, GC_INTER_US * CLOCKS_PER_US)
601-
&& (loadAcquire(GCFlag[thid_].obj_) == 0)) {
605+
if (chkClkSpan(this->gcstart_, this->gcstop_, GC_INTER_US * CLOCKS_PER_US) &&
606+
(loadAcquire(GCFlag[thid_].obj_) == 0)) {
602607
storeRelease(GCFlag[thid_].obj_, 1);
603608
this->gcstart_ = this->gcstop_;
604609
}
@@ -626,7 +631,6 @@ void TxExecutor::writePhase() {
626631
this->wts_.set_clockBoost(0);
627632
read_set_.clear();
628633
write_set_.clear();
629-
++cres_->local_commit_counts_;
630634
#if ADD_ANALYSIS
631635
cres_->local_commit_latency_ += rdtscp() - start;
632636
#endif

cicada/util.cc

+16-24
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ void chkArg(const int argc, char *argv[]) {
3333
if (argc != 17) {
3434
cout << "usage: ./cicada.exe TUPLE_NUM MAX_OPE THREAD_NUM RRATIO RMW "
3535
"ZIPF_SKEW YCSB WAL GROUP_COMMIT CPU_MHZ IO_TIME_NS "
36-
"GROUP_COMMIT_TIMEOUT_US GC_INTER_US PRE_RESERVE_VERSION WORKER1_INSERT_DELAY_RPHASE_US EXTIME"
36+
"GROUP_COMMIT_TIMEOUT_US GC_INTER_US PRE_RESERVE_VERSION "
37+
"WORKER1_INSERT_DELAY_RPHASE_US EXTIME"
3738
<< endl
3839
<< endl;
3940
cout << "example:./main 200 10 24 50 off 0 on off off 2100 5 2 10 10000 0 3"
@@ -63,7 +64,9 @@ void chkArg(const int argc, char *argv[]) {
6364
cout << "GC_INTER_US: garbage collection interval [usec]" << endl;
6465
cout << "PRE_RESERVE_VERSION: pre-prepare memory for version generation."
6566
<< endl;
66-
cout << "WORKER1_INSERT_DELAY_RPHASE_US : worker 1 insert delay in the end of read phase[us]." << endl;
67+
cout << "WORKER1_INSERT_DELAY_RPHASE_US : worker 1 insert delay in the end "
68+
"of read phase[us]."
69+
<< endl;
6770
cout << "EXTIME: execution time [sec]" << endl << endl;
6871
ShowOptParameters();
6972
exit(0);
@@ -374,8 +377,7 @@ void makeDB(uint64_t *initial_wts) {
374377
for (auto &th : thv) th.join();
375378
}
376379

377-
void leaderWork([[maybe_unused]] Backoff &backoff,
378-
[[maybe_unused]] std::vector<Result> &res) {
380+
void leaderWork([[maybe_unused]] Backoff &backoff) {
379381
bool gc_update = true;
380382
for (unsigned int i = 0; i < THREAD_NUM; ++i) {
381383
// check all thread's flag raising
@@ -418,27 +420,17 @@ void leaderWork([[maybe_unused]] Backoff &backoff,
418420
__atomic_store_n(&(GCExecuteFlag[i].obj_), 1, __ATOMIC_RELEASE);
419421
}
420422
}
421-
422-
#if BACK_OFF
423-
leaderBackoffWork(backoff, res);
424-
#endif
425423
}
426424

427-
void
428-
ShowOptParameters()
429-
{
425+
void ShowOptParameters() {
430426
cout << "ShowOptParameters()"
431-
<< ": ADD_ANALYSIS " << ADD_ANALYSIS
432-
<< ": BACK_OFF " << BACK_OFF
433-
<< ": INLINE_VERSION_OPT " << INLINE_VERSION_OPT
434-
<< ": INLINE_VERSION_PROMOTION " << INLINE_VERSION_PROMOTION
435-
<< ": MASSTREE_USE " << MASSTREE_USE
436-
<< ": PARTITION_TABLE " << PARTITION_TABLE
437-
<< ": REUSE_VERSION " << REUSE_VERSION
438-
<< ": SINGLE_EXEC " << SINGLE_EXEC
439-
<< ": KEY_SIZE " << KEY_SIZE
440-
<< ": VAL_SIZE " << VAL_SIZE
441-
<< ": WRITE_LATEST_ONLY " << WRITE_LATEST_ONLY
442-
<< ": WORKER1_INSERT_DELAY_RPHASE " << WORKER1_INSERT_DELAY_RPHASE
443-
<< endl;
427+
<< ": ADD_ANALYSIS " << ADD_ANALYSIS << ": BACK_OFF " << BACK_OFF
428+
<< ": INLINE_VERSION_OPT " << INLINE_VERSION_OPT
429+
<< ": INLINE_VERSION_PROMOTION " << INLINE_VERSION_PROMOTION
430+
<< ": MASSTREE_USE " << MASSTREE_USE << ": PARTITION_TABLE "
431+
<< PARTITION_TABLE << ": REUSE_VERSION " << REUSE_VERSION
432+
<< ": SINGLE_EXEC " << SINGLE_EXEC << ": KEY_SIZE " << KEY_SIZE
433+
<< ": VAL_SIZE " << VAL_SIZE << ": WRITE_LATEST_ONLY "
434+
<< WRITE_LATEST_ONLY << ": WORKER1_INSERT_DELAY_RPHASE "
435+
<< WORKER1_INSERT_DELAY_RPHASE << endl;
444436
}

common/result.cc

+10-10
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,14 @@ void Result::displayBackoffLatencyRate(size_t clocks_per_us, size_t extime,
8484

8585
void Result::displayEarlyAbortRate() {
8686
if (total_early_aborts_) {
87-
cout << fixed << setprecision(4) << "early_abort_rate:\t" <<
88-
(long double)total_early_aborts_ / (long double)total_abort_counts_ << endl;
87+
cout << fixed << setprecision(4) << "early_abort_rate:\t"
88+
<< (long double)total_early_aborts_ / (long double)total_abort_counts_
89+
<< endl;
8990
}
9091
}
9192

9293
void Result::displayExtraReads() {
93-
cout << "extra_reads:\t" << total_extra_reads_ << endl;
94+
cout << "extra_reads:\t" << total_extra_reads_ << endl;
9495
}
9596

9697
void Result::displayGCCounts() {
@@ -136,7 +137,7 @@ void Result::displayMakeProcedureLatencyRate(size_t clocks_per_us,
136137
void Result::displayMemcpys() {
137138
if (total_memcpys) {
138139
cout << "memcpys:\t" << total_memcpys << endl;
139-
}
140+
}
140141
}
141142

142143
void Result::displayOtherWorkLatencyRate(size_t clocks_per_us, size_t extime,
@@ -284,7 +285,7 @@ void Result::displayValidationFailureByWritelockRate() {
284285
}
285286

286287
void Result::displayVersionMalloc() {
287-
cout << "version_malloc:\t" << total_version_malloc_ << endl;
288+
cout << "version_malloc:\t" << total_version_malloc_ << endl;
288289
}
289290

290291
void Result::displayVersionReuse() {
@@ -357,9 +358,7 @@ void Result::addLocalMakeProcedureLatency(const uint64_t count) {
357358
total_make_procedure_latency_ += count;
358359
}
359360

360-
void Result::addLocalMemcpys(const uint64_t count) {
361-
total_memcpys += count;
362-
}
361+
void Result::addLocalMemcpys(const uint64_t count) { total_memcpys += count; }
363362

364363
void Result::addLocalPreemptiveAbortsCounts(const uint64_t count) {
365364
total_preemptive_aborts_counts_ += count;
@@ -424,8 +423,9 @@ void Result::addLocalWriteLatency(const uint64_t count) {
424423
}
425424
#endif
426425

427-
void Result::displayAllResult(size_t clocks_per_us, size_t extime,
428-
size_t thread_num) {
426+
void Result::displayAllResult([[maybe_unused]] size_t clocks_per_us,
427+
size_t extime,
428+
[[maybe_unused]] size_t thread_num) {
429429
#if ADD_ANALYSIS
430430
displayAbortByOperationRate();
431431
displayAbortByValidationRate();

0 commit comments

Comments
 (0)