Skip to content

Commit

Permalink
stopwatch: Fix Windows incompatibility
Browse files Browse the repository at this point in the history
Stopwatch was implemented using a Unix-only pipe structure. This commit
changes to using a guarded list and latch in order to pass data between
threads.

Signed-off-by: Mark Michelson <[email protected]>
Signed-off-by: Ben Pfaff <[email protected]>
  • Loading branch information
putnopvut authored and blp committed Apr 10, 2018
1 parent a045e4b commit 484f7db
Showing 1 changed file with 54 additions and 38 deletions.
92 changes: 54 additions & 38 deletions lib/stopwatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <unistd.h>
#include "socket-util.h"
#include "util.h"
#include "latch.h"
#include "guarded-list.h"

VLOG_DEFINE_THIS_MODULE(stopwatch);

Expand Down Expand Up @@ -73,6 +75,7 @@ enum stopwatch_op {
};

struct stopwatch_packet {
struct ovs_list list_node;
enum stopwatch_op op;
char name[32];
unsigned long long time;
Expand All @@ -82,7 +85,8 @@ static struct shash stopwatches = SHASH_INITIALIZER(&stopwatches);
static struct ovs_mutex stopwatches_lock = OVS_MUTEX_INITIALIZER;
static pthread_cond_t stopwatches_sync = PTHREAD_COND_INITIALIZER;

static int stopwatch_pipe[2];
static struct latch stopwatch_latch;
static struct guarded_list stopwatch_commands;
static pthread_t stopwatch_thread_id;

static const char *unit_name[] = {
Expand Down Expand Up @@ -329,17 +333,33 @@ stopwatch_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
ds_destroy(&s);
}

static struct stopwatch_packet *
stopwatch_packet_create(enum stopwatch_op op)
{
struct stopwatch_packet *pkt;

pkt = xzalloc(sizeof *pkt);
pkt->op = op;

return pkt;
}

static void
stopwatch_packet_write(struct stopwatch_packet *pkt)
{
guarded_list_push_back(&stopwatch_commands, &pkt->list_node, SIZE_MAX);
latch_set(&stopwatch_latch);
}

static void
stopwatch_reset(struct unixctl_conn *conn, int argc OVS_UNUSED,
const char *argv[], void *aux OVS_UNUSED)
{
struct stopwatch_packet pkt = {
.op = OP_RESET,
};
struct stopwatch_packet *pkt = stopwatch_packet_create(OP_RESET);
if (argc > 1) {
ovs_strlcpy(pkt.name, argv[1], sizeof pkt.name);
ovs_strlcpy(pkt->name, argv[1], sizeof pkt->name);
}
ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
stopwatch_packet_write(pkt);
unixctl_command_reply(conn, "");
}

Expand Down Expand Up @@ -406,31 +426,34 @@ stopwatch_thread(void *ign OVS_UNUSED)
bool should_exit = false;

while (!should_exit) {
struct stopwatch_packet pkt;
while (read(stopwatch_pipe[0], &pkt, sizeof pkt) > 0) {
ovs_mutex_lock(&stopwatches_lock);
switch (pkt.op) {
struct ovs_list command_list;
struct stopwatch_packet *pkt;

guarded_list_pop_all(&stopwatch_commands, &command_list);
ovs_mutex_lock(&stopwatches_lock);
LIST_FOR_EACH_POP (pkt, list_node, &command_list) {
switch (pkt->op) {
case OP_START_SAMPLE:
stopwatch_start_sample_protected(&pkt);
stopwatch_start_sample_protected(pkt);
break;
case OP_END_SAMPLE:
stopwatch_end_sample_protected(&pkt);
stopwatch_end_sample_protected(pkt);
break;
case OP_SYNC:
xpthread_cond_signal(&stopwatches_sync);
break;
case OP_RESET:
stopwatch_reset_protected(&pkt);
stopwatch_reset_protected(pkt);
break;
case OP_SHUTDOWN:
should_exit = true;
break;
}
ovs_mutex_unlock(&stopwatches_lock);
}
ovs_mutex_unlock(&stopwatches_lock);

if (!should_exit) {
poll_fd_wait(stopwatch_pipe[0], POLLIN);
latch_wait(&stopwatch_latch);
poll_block();
}
}
Expand All @@ -442,11 +465,8 @@ static void
stopwatch_exit(void)
{
struct shash_node *node, *node_next;
struct stopwatch_packet pkt = {
.op = OP_SHUTDOWN,
};

ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
struct stopwatch_packet *pkt = stopwatch_packet_create(OP_SHUTDOWN);
stopwatch_packet_write(pkt);
xpthread_join(stopwatch_thread_id, NULL);

/* Process is exiting and we have joined the only
Expand All @@ -460,6 +480,8 @@ stopwatch_exit(void)
}
shash_destroy(&stopwatches);
ovs_mutex_destroy(&stopwatches_lock);
guarded_list_destroy(&stopwatch_commands);
latch_destroy(&stopwatch_latch);
}

static void
Expand All @@ -469,7 +491,8 @@ do_init_stopwatch(void)
stopwatch_show, NULL);
unixctl_command_register("stopwatch/reset", "[NAME]", 0, 1,
stopwatch_reset, NULL);
xpipe_nonblocking(stopwatch_pipe);
guarded_list_init(&stopwatch_commands);
latch_init(&stopwatch_latch);
stopwatch_thread_id = ovs_thread_create(
"stopwatch", stopwatch_thread, NULL);
atexit(stopwatch_exit);
Expand Down Expand Up @@ -503,34 +526,27 @@ stopwatch_create(const char *name, enum stopwatch_units units)
void
stopwatch_start(const char *name, unsigned long long ts)
{
struct stopwatch_packet pkt = {
.op = OP_START_SAMPLE,
.time = ts,
};
ovs_strlcpy(pkt.name, name, sizeof pkt.name);
ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
struct stopwatch_packet *pkt = stopwatch_packet_create(OP_START_SAMPLE);
ovs_strlcpy(pkt->name, name, sizeof pkt->name);
pkt->time = ts;
stopwatch_packet_write(pkt);
}

void
stopwatch_stop(const char *name, unsigned long long ts)
{
struct stopwatch_packet pkt = {
.op = OP_END_SAMPLE,
.time = ts,
};
ovs_strlcpy(pkt.name, name, sizeof pkt.name);
ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
struct stopwatch_packet *pkt = stopwatch_packet_create(OP_END_SAMPLE);
ovs_strlcpy(pkt->name, name, sizeof pkt->name);
pkt->time = ts;
stopwatch_packet_write(pkt);
}

void
stopwatch_sync(void)
{
struct stopwatch_packet pkt = {
.op = OP_SYNC,
};

struct stopwatch_packet *pkt = stopwatch_packet_create(OP_SYNC);
ovs_mutex_lock(&stopwatches_lock);
ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
stopwatch_packet_write(pkt);
ovs_mutex_cond_wait(&stopwatches_sync, &stopwatches_lock);
ovs_mutex_unlock(&stopwatches_lock);
}

0 comments on commit 484f7db

Please sign in to comment.