Skip to content

Commit

Permalink
Don't LogFlush() in foreground threads
Browse files Browse the repository at this point in the history
Summary: So fflush() takes a lock which is heavyweight. I added flush_pending_, but more importantly, I removed LogFlush() from foreground threads.

Test Plan: ./db_test

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14535
  • Loading branch information
igorcanadi committed Dec 10, 2013
1 parent 4815468 commit 19f5463
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
3 changes: 0 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2643,7 +2643,6 @@ Status DBImpl::GetImpl(const ReadOptions& options,
delete m;
for (MemTable* v: to_delete) delete v;

LogFlush(options_.info_log);
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
RecordTick(options_.statistics.get(), BYTES_READ, value->size());
Expand Down Expand Up @@ -2729,7 +2728,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
delete m;
for (MemTable* v: to_delete) delete v;

LogFlush(options_.info_log);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytesRead);
Expand Down Expand Up @@ -2877,7 +2875,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
SetTickerCount(options_.statistics.get(),
SEQUENCE_NUMBER, last_sequence);
}
LogFlush(options_.info_log);
mutex_.Lock();
if (status.ok()) {
versions_->SetLastSequence(last_sequence);
Expand Down
10 changes: 8 additions & 2 deletions util/posix_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@ class PosixLogger : public Logger {
const static uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
bool flush_pending_;
public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) :
file_(f), gettid_(gettid), log_size_(0), fd_(fileno(f)),
last_flush_micros_(0), env_(env) { }
last_flush_micros_(0), env_(env), flush_pending_(false) { }
virtual ~PosixLogger() {
fclose(file_);
}
virtual void Flush() {
fflush(file_);
if (flush_pending_) {
flush_pending_ = false;
fflush(file_);
}
last_flush_micros_ = env_->NowMicros();
}
virtual void Logv(const char* format, va_list ap) {
Expand Down Expand Up @@ -124,13 +128,15 @@ class PosixLogger : public Logger {
#endif

size_t sz = fwrite(base, 1, write_size, file_);
flush_pending_ = true;
assert(sz == write_size);
if (sz > 0) {
log_size_ += write_size;
}
uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +
now_tv.tv_usec;
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
flush_pending_ = false;
fflush(file_);
last_flush_micros_ = now_micros;
}
Expand Down

0 comments on commit 19f5463

Please sign in to comment.