Skip to content

Commit

Permalink
db_bench periodically writes QPS to CSV file
Browse files Browse the repository at this point in the history
Summary:
This is part of an effort to better understand and optimize RocksDB stalls under high load. I added a feature to db_bench to periodically write QPS to CSV files. That way we can nicely see how our QPS changes in time (especially when DB is stalled) and can do a better job of evaluating our stall system (i.e. we want the QPS to be as constant as possible, as opposed to having bunch of stalls)

Cool part of CSV files is that we can easily graph them -- there are a bunch of tools available.

Test Plan:
Ran ./db_bench --report_interval_seconds=10 --benchmarks=fillrandom --num=10000000
and observed this in report.csv:

secs_elapsed,interval_qps
10,2725860
20,1980480
30,1863456
40,1454359
50,1460389

Reviewers: sdong, MarkCallaghan, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D40047
  • Loading branch information
igorcanadi committed Jun 12, 2015
1 parent 46296cc commit d59d90b
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### New Features
* Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info.
* Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds)

### Public API changes
* EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters.
Expand Down
119 changes: 119 additions & 0 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ int main() {
#include <stdio.h>
#include <stdlib.h>
#include <gflags/gflags.h>

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>

#include "db/db_impl.h"
#include "db/version_set.h"
#include "rocksdb/options.h"
Expand Down Expand Up @@ -522,6 +528,14 @@ DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
" this is greater than 0.");

DEFINE_int64(report_interval_seconds, 0,
"If greater than zero, it will write simple stats in CVS format "
"to --report_file every N seconds");

DEFINE_string(report_file, "report.csv",
"Filename where some simple stats are reported to (if "
"--report_interval_seconds is bigger than 0)");

DEFINE_int32(thread_status_per_interval, 0,
"Takes and report a snapshot of the current status of each thread"
" when this is greater than 0.");
Expand Down Expand Up @@ -950,6 +964,96 @@ struct DBWithColumnFamilies {
}
};

// a class that reports stats to CSV file
class ReporterAgent {
public:
ReporterAgent(Env* env, const std::string& fname,
uint64_t report_interval_secs)
: env_(env),
total_ops_done_(0),
last_report_(0),
report_interval_secs_(report_interval_secs),
stop_(false) {
auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
if (s.ok()) {
s = report_file_->Append(Header() + "\n");
}
if (s.ok()) {
s = report_file_->Flush();
}
if (!s.ok()) {
fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
s.ToString().c_str());
abort();
}

reporting_thread_ = std::thread([&]() { SleepAndReport(); });
}

~ReporterAgent() {
{
std::unique_lock<std::mutex> lk(mutex_);
stop_ = true;
stop_cv_.notify_all();
}
reporting_thread_.join();
}

// thread safe
void ReportFinishedOps(int64_t num_ops) {
total_ops_done_.fetch_add(num_ops);
}

private:
std::string Header() const { return "secs_elapsed,interval_qps"; }
void SleepAndReport() {
uint64_t kMicrosInSecond = 1000 * 1000;
auto time_started = env_->NowMicros();
while (true) {
{
std::unique_lock<std::mutex> lk(mutex_);
if (stop_ ||
stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
[&]() { return stop_; })) {
// stopping
break;
}
// else -> timeout, which means time for a report!
}
auto total_ops_done_snapshot = total_ops_done_.load();
// round the seconds elapsed
auto secs_elapsed =
(env_->NowMicros() - time_started + kMicrosInSecond / 2) /
kMicrosInSecond;
std::string report = ToString(secs_elapsed) + "," +
ToString(total_ops_done_snapshot - last_report_) +
"\n";
auto s = report_file_->Append(report);
if (s.ok()) {
s = report_file_->Flush();
}
if (!s.ok()) {
fprintf(stderr,
"Can't write to report file (%s), stopping the reporting\n",
s.ToString().c_str());
break;
}
last_report_ = total_ops_done_snapshot;
}
}

Env* env_;
std::unique_ptr<WritableFile> report_file_;
std::atomic<int64_t> total_ops_done_;
int64_t last_report_;
const uint64_t report_interval_secs_;
std::thread reporting_thread_;
std::mutex mutex_;
// will notify on stop
std::condition_variable stop_cv_;
bool stop_;
};

class Stats {
private:
int id_;
Expand All @@ -965,10 +1069,15 @@ class Stats {
HistogramImpl hist_;
std::string message_;
bool exclude_from_merge_;
ReporterAgent* reporter_agent_; // does not own

public:
Stats() { Start(-1); }

void SetReporterAgent(ReporterAgent* reporter_agent) {
reporter_agent_ = reporter_agent;
}

void Start(int id) {
id_ = id;
next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
Expand Down Expand Up @@ -1044,6 +1153,9 @@ class Stats {
}

void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops) {
if (reporter_agent_) {
reporter_agent_->ReportFinishedOps(num_ops);
}
if (FLAGS_histogram) {
double now = FLAGS_env->NowMicros();
double micros = now - last_op_finish_;
Expand Down Expand Up @@ -1839,6 +1951,12 @@ class Benchmark {
shared.num_done = 0;
shared.start = false;

std::unique_ptr<ReporterAgent> reporter_agent;
if (FLAGS_report_interval_seconds > 0) {
reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
FLAGS_report_interval_seconds));
}

ThreadArg* arg = new ThreadArg[n];

for (int i = 0; i < n; i++) {
Expand All @@ -1863,6 +1981,7 @@ class Benchmark {
arg[i].method = method;
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
arg[i].thread->shared = &shared;
FLAGS_env->StartThread(ThreadBody, &arg[i]);
}
Expand Down

0 comments on commit d59d90b

Please sign in to comment.