Skip to content

Commit

Permalink
remove calibration & improve CO correction
Browse files Browse the repository at this point in the history
  • Loading branch information
wg committed Feb 21, 2015
1 parent 57f3b33 commit ef6a836
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ wrk next
* Record latency for every request instead of random samples.
* Latency and requests in done() are now callable, not indexable.
* Only record timeouts when a response is actually received.
* Remove calibration phase and record rate at fixed interval.
* Improve correction of coordinated omission.
1 change: 0 additions & 1 deletion src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ static void *thread_main(void *);
static int connect_socket(thread *, connection *);
static int reconnect_socket(thread *, connection *);

static int calibrate(aeEventLoop *, long long, void *);
static int record_rate(aeEventLoop *, long long, void *);

static void socket_connected(aeEventLoop *, int, void *, int);
Expand Down
12 changes: 12 additions & 0 deletions src/stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ int stats_record(stats *stats, uint64_t n) {
return 1;
}

void stats_correct(stats *stats, int64_t expected) {
for (uint64_t n = expected * 2; n <= stats->max; n++) {
uint64_t count = stats->data[n];
int64_t m = (int64_t) n - expected;
while (count && m > expected) {
stats->data[m] += count;
stats->count += count;
m -= expected;
}
}
}

long double stats_mean(stats *stats) {
if (stats->count == 0) return 0.0;

Expand Down
1 change: 1 addition & 0 deletions src/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ stats *stats_alloc(uint64_t);
void stats_free(stats *);

int stats_record(stats *, uint64_t);
void stats_correct(stats *, int64_t);

long double stats_mean(stats *);
long double stats_stdev(stats *stats, long double);
Expand Down
53 changes: 15 additions & 38 deletions src/wrk.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ int main(int argc, char **argv) {
thread *t = &threads[i];
t->loop = aeCreateEventLoop(10 + cfg.connections * 3);
t->connections = cfg.connections / cfg.threads;
t->latency = stats_alloc(cfg.timeout * 1000);

t->L = script_create(cfg.script, url, headers);
script_init(L, t, argc - optind, &argv[optind]);
Expand Down Expand Up @@ -161,6 +160,11 @@ int main(int argc, char **argv) {
long double req_per_s = complete / runtime_s;
long double bytes_per_s = bytes / runtime_s;

if (complete / cfg.connections > 0) {
int64_t interval = runtime_us / (complete / cfg.connections);
stats_correct(statistics.latency, interval);
}

print_stats_header();
print_stats("Latency", statistics.latency, format_time_us);
print_stats("Req/Sec", statistics.requests, format_metric);
Expand Down Expand Up @@ -212,19 +216,14 @@ void *thread_main(void *arg) {
}

aeEventLoop *loop = thread->loop;
aeCreateTimeEvent(loop, CALIBRATE_DELAY_MS, calibrate, thread, NULL);
aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL);

thread->start = time_us();
aeMain(loop);

aeDeleteEventLoop(loop);
zfree(thread->cs);

uint64_t max = thread->latency->max;
for (uint64_t i = 0; i < thread->missed; i++) {
stats_record(statistics.latency, max);
}

return NULL;
}

Expand Down Expand Up @@ -265,44 +264,22 @@ static int reconnect_socket(thread *thread, connection *c) {
return connect_socket(thread, c);
}

static int calibrate(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;

long double latency = stats_percentile(thread->latency, 90.0) / 1000.0L;
long double interval = MAX(latency * 2, 10);
long double rate = (interval / latency) * thread->connections;

if (latency == 0 && !stop) return CALIBRATE_DELAY_MS;

stats_free(thread->latency);

thread->interval = interval;
thread->rate = ceil(rate / 10);
thread->start = time_us();
thread->requests = 0;
thread->latency = statistics.latency;

aeCreateTimeEvent(loop, thread->interval, record_rate, thread, NULL);

return AE_NOMORE;
}

static int record_rate(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;

uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
uint64_t missed = thread->rate - MIN(thread->rate, thread->latency->count);
if (thread->requests > 0) {
uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;

stats_record(statistics.requests, requests);
stats_record(statistics.requests, requests);

thread->missed += missed;
thread->requests = 0;
thread->start = time_us();
thread->requests = 0;
thread->start = time_us();
}

if (stop) aeStop(loop);

return thread->interval;
return RECORD_INTERVAL_MS;
}

static int header_field(http_parser *parser, const char *at, size_t len) {
Expand Down Expand Up @@ -351,7 +328,7 @@ static int response_complete(http_parser *parser) {
}

if (--c->pending == 0) {
if (!stats_record(thread->latency, now - c->start)) {
if (!stats_record(statistics.latency, now - c->start)) {
thread->errors.timeout++;
}
aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
Expand Down
6 changes: 1 addition & 5 deletions src/wrk.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@

#define MAX_THREAD_RATE_S 10000000
#define SOCKET_TIMEOUT_MS 2000
#define CALIBRATE_DELAY_MS 500
#define RECORD_INTERVAL_MS 100

typedef struct {
pthread_t thread;
aeEventLoop *loop;
struct addrinfo *addr;
uint64_t connections;
int interval;
uint64_t complete;
uint64_t requests;
uint64_t bytes;
uint64_t start;
uint64_t rate;
uint64_t missed;
stats *latency;
lua_State *L;
errors errors;
struct connection *cs;
Expand Down

0 comments on commit ef6a836

Please sign in to comment.