Skip to content

Commit

Permalink
ovsdb-server: Make database connections configurable from database it…
Browse files Browse the repository at this point in the history
…self.

Most importantly this adds a "managers" column to the vswitch database
that specifies where the ovsdb-server should connect.
  • Loading branch information
blp committed Jan 4, 2010
1 parent 4931f33 commit 0b1fae1
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 167 deletions.
2 changes: 1 addition & 1 deletion debian/openvswitch-switch.init
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ case "$1" in
set -- "$@" --verbose=ANY:console:emer --verbose=ANY:syslog:err
set -- "$@" --log-file
set -- "$@" --detach --pidfile
set -- "$@" --listen punix:/var/run/ovsdb-server
set -- "$@" --remote punix:/var/run/ovsdb-server
set -- "$@" /etc/openvswitch-switch/conf
set -- "$@" $OVSDB_SERVER_OPTS
echo -n "Starting ovsdb-server: "
Expand Down
213 changes: 132 additions & 81 deletions ovsdb/jsonrpc-server.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2009 Nicira Networks
/* Copyright (c) 2009, 2010 Nicira Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,15 +36,18 @@
#define THIS_MODULE VLM_ovsdb_jsonrpc_server
#include "vlog.h"

struct ovsdb_jsonrpc_remote;
struct ovsdb_jsonrpc_session;

/* Message rate-limiting. */
struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);

/* Sessions. */
static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
const char *name);
static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
struct stream *);
static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);

/* Triggers. */
static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
Expand All @@ -69,89 +72,141 @@ static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);

struct ovsdb_jsonrpc_server {
struct ovsdb *db;

struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
unsigned int n_sessions, max_sessions;
struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
};

struct pstream **listeners;
size_t n_listeners, allocated_listeners;
/* A configured remote. This is either a passive stream listener plus a list
* of the currently connected sessions, or a list of exactly one active
* session. */
struct ovsdb_jsonrpc_remote {
struct ovsdb_jsonrpc_server *server;
struct pstream *listener; /* Listener, if passive. */
struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
};

static void ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *,
const char *name);
static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);

struct ovsdb_jsonrpc_server *
ovsdb_jsonrpc_server_create(struct ovsdb *db)
{
struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
server->db = db;
server->max_sessions = 64;
list_init(&server->sessions);
shash_init(&server->remotes);
return server;
}

/* Sets 'svr''s current set of remotes to the names in 'new_remotes'. The data
* values in 'new_remotes' are ignored.
*
* A remote is an active or passive stream connection method, e.g. "pssl:" or
* "tcp:1.2.3.4". */
void
ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
struct pstream *pstream)
ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
const struct shash *new_remotes)
{
if (svr->n_listeners >= svr->allocated_listeners) {
svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
sizeof *svr->listeners);
struct shash_node *node, *next;

SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
if (!shash_find(new_remotes, node->name)) {
ovsdb_jsonrpc_server_del_remote(node);
}
}
SHASH_FOR_EACH (node, new_remotes) {
if (!shash_find(&svr->remotes, node->name)) {
ovsdb_jsonrpc_server_add_remote(svr, node->name);
}
}
svr->listeners[svr->n_listeners++] = pstream;
}

void
ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
const char *name)
static void
ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
const char *name)
{
ovsdb_jsonrpc_session_create_active(svr, name);
struct ovsdb_jsonrpc_remote *remote;
struct pstream *listener;
int error;

error = pstream_open(name, &listener);
if (error && error != EAFNOSUPPORT) {
VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
return;
}

remote = xmalloc(sizeof *remote);
remote->server = svr;
remote->listener = listener;
list_init(&remote->sessions);
shash_add(&svr->remotes, name, remote);

if (!listener) {
ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
}
}

static void
ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
{
struct ovsdb_jsonrpc_remote *remote = node->data;

ovsdb_jsonrpc_session_close_all(remote);
pstream_close(remote->listener);
shash_delete(&remote->server->remotes, node);
free(remote);
}

void
ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
{
size_t i;
struct shash_node *node;

/* Accept new connections. */
for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
struct pstream *listener = svr->listeners[i];
struct stream *stream;
int error;

error = pstream_accept(listener, &stream);
if (!error) {
ovsdb_jsonrpc_session_create_passive(svr, stream);
} else if (error == EAGAIN) {
i++;
} else if (error) {
VLOG_WARN("%s: accept failed: %s",
pstream_get_name(listener), strerror(error));
pstream_close(listener);
svr->listeners[i] = svr->listeners[--svr->n_listeners];
SHASH_FOR_EACH (node, &svr->remotes) {
struct ovsdb_jsonrpc_remote *remote = node->data;

if (remote->listener && svr->n_sessions < svr->max_sessions) {
struct stream *stream;
int error;

error = pstream_accept(remote->listener, &stream);
if (!error) {
struct jsonrpc_session *js;
js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
ovsdb_jsonrpc_session_create(remote, js);
} else if (error != EAGAIN) {
VLOG_WARN_RL(&rl, "%s: accept failed: %s",
pstream_get_name(remote->listener),
strerror(error));
}
}
}

/* Handle each session. */
ovsdb_jsonrpc_session_run_all(svr);
ovsdb_jsonrpc_session_run_all(remote);
}
}

void
ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
{
if (svr->n_sessions < svr->max_sessions) {
size_t i;
struct shash_node *node;

SHASH_FOR_EACH (node, &svr->remotes) {
struct ovsdb_jsonrpc_remote *remote = node->data;

for (i = 0; i < svr->n_listeners; i++) {
pstream_wait(svr->listeners[i]);
if (remote->listener && svr->n_sessions < svr->max_sessions) {
pstream_wait(remote->listener);
}
}

ovsdb_jsonrpc_session_wait_all(svr);
ovsdb_jsonrpc_session_wait_all(remote);
}
}

/* JSON-RPC database server session. */

struct ovsdb_jsonrpc_session {
struct ovsdb_jsonrpc_server *server;
struct list node; /* Element in server's sessions list. */
struct ovsdb_jsonrpc_remote *remote;
struct list node; /* Element in remote's sessions list. */

/* Triggers. */
struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
Expand All @@ -174,46 +229,31 @@ static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
struct jsonrpc_msg *);

static struct ovsdb_jsonrpc_session *
ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
struct jsonrpc_session *js)
{
struct ovsdb_jsonrpc_session *s;

s = xzalloc(sizeof *s);
s->server = svr;
list_push_back(&svr->sessions, &s->node);
s->remote = remote;
list_push_back(&remote->sessions, &s->node);
hmap_init(&s->triggers);
hmap_init(&s->monitors);
list_init(&s->completions);
s->js = js;
s->js_seqno = jsonrpc_session_get_seqno(js);

svr->n_sessions++;
remote->server->n_sessions++;

return s;
}

static void
ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
const char *name)
{
ovsdb_jsonrpc_session_create(svr, jsonrpc_session_open(name));
}

static void
ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
struct stream *stream)
{
ovsdb_jsonrpc_session_create(
svr, jsonrpc_session_open_unreliably(jsonrpc_open(stream)));
}

static void
ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
{
jsonrpc_session_close(s->js);
list_remove(&s->node);
s->server->n_sessions--;
s->remote->server->n_sessions--;
}

static int
Expand Down Expand Up @@ -248,12 +288,12 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
}

static void
ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
{
struct ovsdb_jsonrpc_session *s, *next;

LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
&svr->sessions) {
&remote->sessions) {
int error = ovsdb_jsonrpc_session_run(s);
if (error) {
ovsdb_jsonrpc_session_close(s);
Expand All @@ -271,15 +311,26 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
}

static void
ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
{
struct ovsdb_jsonrpc_session *s;

LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &remote->sessions) {
ovsdb_jsonrpc_session_wait(s);
}
}

static void
ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
{
struct ovsdb_jsonrpc_session *s, *next;

LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
&remote->sessions) {
ovsdb_jsonrpc_session_close(s);
}
}

static struct jsonrpc_msg *
execute_transaction(struct ovsdb_jsonrpc_session *s,
struct jsonrpc_msg *request)
Expand All @@ -306,7 +357,7 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
request->id);
} else if (!strcmp(request->method, "get_schema")) {
reply = jsonrpc_create_reply(
ovsdb_schema_to_json(s->server->db->schema), request->id);
ovsdb_schema_to_json(s->remote->server->db->schema), request->id);
} else if (!strcmp(request->method, "echo")) {
reply = jsonrpc_create_reply(json_clone(request->params), request->id);
} else {
Expand Down Expand Up @@ -380,7 +431,7 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,

/* Insert into trigger table. */
t = xmalloc(sizeof *t);
ovsdb_trigger_init(s->server->db,
ovsdb_trigger_init(s->remote->server->db,
&t->trigger, params, &s->completions,
time_msec());
t->session = s;
Expand Down Expand Up @@ -541,7 +592,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,

m = xzalloc(sizeof *m);
ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
ovsdb_add_replica(s->server->db, &m->replica);
ovsdb_add_replica(s->remote->server->db, &m->replica);
m->session = s;
hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
m->monitor_id = json_clone(monitor_id);
Expand All @@ -553,7 +604,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
const struct json *columns_json, *select_json;
struct ovsdb_parser parser;

table = ovsdb_get_table(s->server->db, node->name);
table = ovsdb_get_table(s->remote->server->db, node->name);
if (!table) {
error = ovsdb_syntax_error(NULL, NULL,
"no table named %s", node->name);
Expand Down Expand Up @@ -620,7 +671,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,

error:
if (m) {
ovsdb_remove_replica(s->server->db, &m->replica);
ovsdb_remove_replica(s->remote->server->db, &m->replica);
}

json = ovsdb_error_to_json(error);
Expand All @@ -644,7 +695,7 @@ ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
return jsonrpc_create_error(json_string_create("unknown monitor"),
request_id);
} else {
ovsdb_remove_replica(s->server->db, &m->replica);
ovsdb_remove_replica(s->remote->server->db, &m->replica);
return jsonrpc_create_reply(json_object_create(), request_id);
}
}
Expand All @@ -657,7 +708,7 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)

HMAP_FOR_EACH_SAFE (m, next,
struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
ovsdb_remove_replica(s->server->db, &m->replica);
ovsdb_remove_replica(s->remote->server->db, &m->replica);
}
}

Expand Down
Loading

0 comments on commit 0b1fae1

Please sign in to comment.