Skip to content

Commit

Permalink
connmgr: Generalize ofproto_packet_in to ofproto_async_msg.
Browse files Browse the repository at this point in the history
An upcoming commit will add another kind of asynchronous message that
should be handled in the same way as packet-ins.

Signed-off-by: Ben Pfaff <[email protected]>
  • Loading branch information
blp committed Jan 20, 2016
1 parent 9bfe933 commit a2b53de
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 80 deletions.
30 changes: 19 additions & 11 deletions ofproto/connmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1653,31 +1653,32 @@ connmgr_send_flow_removed(struct connmgr *mgr,
*
* The caller doesn't need to fill in pin->buffer_id or pin->total_len. */
void
connmgr_send_packet_in(struct connmgr *mgr,
const struct ofproto_packet_in *pin)
connmgr_send_async_msg(struct connmgr *mgr,
const struct ofproto_async_msg *am)
{
struct ofconn *ofconn;

LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
enum ofputil_protocol protocol = ofconn_get_protocol(ofconn);
if (protocol == OFPUTIL_P_NONE || !rconn_is_connected(ofconn->rconn)
|| ofconn->controller_id != pin->controller_id
|| !ofconn_receives_async_msg(ofconn, OAM_PACKET_IN,
pin->up.reason)) {
|| ofconn->controller_id != am->controller_id
|| !ofconn_receives_async_msg(ofconn, am->oam,
am->pin.up.reason)) {
continue;
}

struct ofpbuf *msg = ofputil_encode_packet_in(
&pin->up, protocol, ofconn->packet_in_format,
pin->max_len >= 0 ? pin->max_len : ofconn->miss_send_len,
&am->pin.up, protocol, ofconn->packet_in_format,
am->pin.max_len >= 0 ? am->pin.max_len : ofconn->miss_send_len,
ofconn->pktbuf);

struct ovs_list txq;
bool is_miss = (pin->up.reason == OFPR_NO_MATCH ||
pin->up.reason == OFPR_EXPLICIT_MISS ||
pin->up.reason == OFPR_IMPLICIT_MISS);
bool is_miss = (am->pin.up.reason == OFPR_NO_MATCH ||
am->pin.up.reason == OFPR_EXPLICIT_MISS ||
am->pin.up.reason == OFPR_IMPLICIT_MISS);
pinsched_send(ofconn->schedulers[is_miss],
pin->up.flow_metadata.flow.in_port.ofp_port, msg, &txq);
am->pin.up.flow_metadata.flow.in_port.ofp_port,
msg, &txq);
do_send_packet_ins(ofconn, &txq);
}
}
Expand Down Expand Up @@ -2242,3 +2243,10 @@ ofmonitor_wait(struct connmgr *mgr)
}
ovs_mutex_unlock(&ofproto_mutex);
}

void
ofproto_async_msg_free(struct ofproto_async_msg *am)
{
free(CONST_CAST(void *, am->pin.up.packet));
free(am);
}
20 changes: 14 additions & 6 deletions ofproto/connmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,21 @@ enum ofconn_type {
OFCONN_SERVICE /* A service connection, e.g. "ovs-ofctl". */
};

/* A packet_in, with extra members to assist in queuing and routing it. */
struct ofproto_packet_in {
struct ofputil_packet_in up;
/* An asynchronous message that might need to be queued between threads. */
struct ofproto_async_msg {
struct ovs_list list_node; /* For queuing. */
uint16_t controller_id; /* Controller ID to send to. */
int max_len; /* From action, or -1 if none. */

enum ofputil_async_msg_type oam;
union {
/* OAM_PACKET_IN. */
struct {
struct ofputil_packet_in up;
int max_len; /* From action, or -1 if none. */
} pin;
};
};
void ofproto_async_msg_free(struct ofproto_async_msg *);

/* Basics. */
struct connmgr *connmgr_create(struct ofproto *ofproto,
Expand Down Expand Up @@ -140,8 +148,8 @@ void connmgr_send_port_status(struct connmgr *, struct ofconn *source,
const struct ofputil_phy_port *, uint8_t reason);
void connmgr_send_flow_removed(struct connmgr *,
const struct ofputil_flow_removed *);
void connmgr_send_packet_in(struct connmgr *,
const struct ofproto_packet_in *);
void connmgr_send_async_msg(struct connmgr *,
const struct ofproto_async_msg *);
void ofconn_send_role_status(struct ofconn *ofconn, uint32_t role,
uint8_t reason);

Expand Down
28 changes: 16 additions & 12 deletions ofproto/fail-open.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,23 @@ send_bogus_packet_ins(struct fail_open *fo)
eth_addr_nicira_random(&mac);
compose_rarp(&b, mac);

struct ofproto_packet_in pin = {
.up = {
.packet = dp_packet_data(&b),
.len = dp_packet_size(&b),
.flow_metadata = MATCH_CATCHALL_INITIALIZER,
.flow_metadata.flow.in_port.ofp_port = OFPP_LOCAL,
.flow_metadata.wc.masks.in_port.ofp_port = u16_to_ofp(UINT16_MAX),
.reason = OFPR_NO_MATCH,
.cookie = OVS_BE64_MAX,
},
.max_len = UINT16_MAX,
struct ofproto_async_msg am = {
.oam = OAM_PACKET_IN,
.pin = {
.up = {
.packet = dp_packet_data(&b),
.len = dp_packet_size(&b),
.flow_metadata = MATCH_CATCHALL_INITIALIZER,
.flow_metadata.flow.in_port.ofp_port = OFPP_LOCAL,
.flow_metadata.wc.masks.in_port.ofp_port
= u16_to_ofp(UINT16_MAX),
.reason = OFPR_NO_MATCH,
.cookie = OVS_BE64_MAX,
},
.max_len = UINT16_MAX,
}
};
connmgr_send_packet_in(fo->connmgr, &pin);
connmgr_send_async_msg(fo->connmgr, &am);

dp_packet_uninit(&b);
}
Expand Down
32 changes: 17 additions & 15 deletions ofproto/ofproto-dpif-xlate.c
Original file line number Diff line number Diff line change
Expand Up @@ -3568,7 +3568,6 @@ execute_controller_action(struct xlate_ctx *ctx, int len,
enum ofp_packet_in_reason reason,
uint16_t controller_id)
{
struct ofproto_packet_in *pin;
struct dp_packet *packet;

ctx->xout->slow |= SLOW_CONTROLLER;
Expand All @@ -3592,21 +3591,24 @@ execute_controller_action(struct xlate_ctx *ctx, int len,

size_t packet_len = dp_packet_size(packet);

pin = xmalloc(sizeof *pin);
*pin = (struct ofproto_packet_in) {
struct ofproto_async_msg *am = xmalloc(sizeof *am);
*am = (struct ofproto_async_msg) {
.controller_id = controller_id,
.up = {
.packet = dp_packet_steal_data(packet),
.len = packet_len,
.reason = reason,
.table_id = ctx->table_id,
.cookie = ctx->rule_cookie,
.oam = OAM_PACKET_IN,
.pin = {
.up = {
.packet = dp_packet_steal_data(packet),
.len = packet_len,
.reason = reason,
.table_id = ctx->table_id,
.cookie = ctx->rule_cookie,
},
.max_len = len,
},
.max_len = len,
};
flow_get_metadata(&ctx->xin->flow, &pin->up.flow_metadata);
flow_get_metadata(&ctx->xin->flow, &am->pin.up.flow_metadata);

ofproto_dpif_send_packet_in(ctx->xbridge->ofproto, pin);
ofproto_dpif_send_async_msg(ctx->xbridge->ofproto, am);
dp_packet_delete(packet);
}

Expand Down Expand Up @@ -4141,8 +4143,8 @@ recirc_put_unroll_xlate(struct xlate_ctx *ctx)

/* Copy remaining actions to the action_set to be executed after recirculation.
* UNROLL_XLATE action is inserted, if not already done so, before actions that
* may generate PACKET_INs from the current table and without matching another
* rule. */
* may generate asynchronous messages from the current table and without
* matching another rule. */
static void
recirc_unroll_actions(const struct ofpact *ofpacts, size_t ofpacts_len,
struct xlate_ctx *ctx)
Expand All @@ -4151,7 +4153,7 @@ recirc_unroll_actions(const struct ofpact *ofpacts, size_t ofpacts_len,

OFPACT_FOR_EACH (a, ofpacts, ofpacts_len) {
switch (a->type) {
/* May generate PACKET INs. */
/* May generate asynchronous messages. */
case OFPACT_OUTPUT_REG:
case OFPACT_GROUP:
case OFPACT_OUTPUT:
Expand Down
63 changes: 30 additions & 33 deletions ofproto/ofproto-dpif.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ struct ofproto_dpif {
uint64_t change_seq; /* Connectivity status changes. */

/* Work queues. */
struct guarded_list pins; /* Contains "struct ofputil_packet_in"s. */
struct seq *pins_seq; /* For notifying 'pins' reception. */
uint64_t pins_seqno;
struct guarded_list ams; /* Contains "struct ofproto_async_msgs"s. */
struct seq *ams_seq; /* For notifying 'ams' reception. */
uint64_t ams_seqno;
};

/* All existing ofproto_dpif instances, indexed by ->up.name. */
Expand Down Expand Up @@ -400,20 +400,19 @@ ofproto_dpif_flow_mod(struct ofproto_dpif *ofproto,
ofproto_flow_mod(&ofproto->up, &ofm);
}

/* Appends 'pin' to the queue of "packet ins" to be sent to the controller.
* Takes ownership of 'pin' and pin->packet. */
/* Appends 'am' to the queue of asynchronous messages to be sent to the
* controller. Takes ownership of 'am' and any data it points to. */
void
ofproto_dpif_send_packet_in(struct ofproto_dpif *ofproto,
struct ofproto_packet_in *pin)
ofproto_dpif_send_async_msg(struct ofproto_dpif *ofproto,
struct ofproto_async_msg *am)
{
if (!guarded_list_push_back(&ofproto->pins, &pin->list_node, 1024)) {
if (!guarded_list_push_back(&ofproto->ams, &am->list_node, 1024)) {
COVERAGE_INC(packet_in_overflow);
free(CONST_CAST(void *, pin->up.packet));
free(pin);
ofproto_async_msg_free(am);
}

/* Wakes up main thread for packet-in I/O. */
seq_change(ofproto->pins_seq);
seq_change(ofproto->ams_seq);
}

/* The default "table-miss" behaviour for OpenFlow1.3+ is to drop the
Expand Down Expand Up @@ -1329,7 +1328,7 @@ construct(struct ofproto *ofproto_)
ovs_mutex_init_adaptive(&ofproto->stats_mutex);
ovs_mutex_init(&ofproto->vsp_mutex);

guarded_list_init(&ofproto->pins);
guarded_list_init(&ofproto->ams);

hmap_init(&ofproto->vlandev_map);
hmap_init(&ofproto->realdev_vid_map);
Expand All @@ -1339,8 +1338,8 @@ construct(struct ofproto *ofproto_)
sset_init(&ofproto->port_poll_set);
ofproto->port_poll_errno = 0;
ofproto->change_seq = 0;
ofproto->pins_seq = seq_create();
ofproto->pins_seqno = seq_read(ofproto->pins_seq);
ofproto->ams_seq = seq_create();
ofproto->ams_seqno = seq_read(ofproto->ams_seq);


SHASH_FOR_EACH_SAFE (node, next, &init_ofp_ports) {
Expand Down Expand Up @@ -1444,10 +1443,10 @@ static void
destruct(struct ofproto *ofproto_)
{
struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
struct ofproto_packet_in *pin;
struct ofproto_async_msg *am;
struct rule_dpif *rule;
struct oftable *table;
struct ovs_list pins;
struct ovs_list ams;

ofproto->backer->need_revalidate = REV_RECONFIGURE;
xlate_txn_start();
Expand All @@ -1467,12 +1466,11 @@ destruct(struct ofproto *ofproto_)
}
ofproto_group_delete_all(&ofproto->up);

guarded_list_pop_all(&ofproto->pins, &pins);
LIST_FOR_EACH_POP (pin, list_node, &pins) {
free(CONST_CAST(void *, pin->up.packet));
free(pin);
guarded_list_pop_all(&ofproto->ams, &ams);
LIST_FOR_EACH_POP (am, list_node, &ams) {
ofproto_async_msg_free(am);
}
guarded_list_destroy(&ofproto->pins);
guarded_list_destroy(&ofproto->ams);

recirc_free_ofproto(ofproto, ofproto->up.name);

Expand All @@ -1495,7 +1493,7 @@ destruct(struct ofproto *ofproto_)
ovs_mutex_destroy(&ofproto->stats_mutex);
ovs_mutex_destroy(&ofproto->vsp_mutex);

seq_destroy(ofproto->pins_seq);
seq_destroy(ofproto->ams_seq);

close_dpif_backer(ofproto->backer);
}
Expand All @@ -1514,23 +1512,22 @@ run(struct ofproto *ofproto_)
mcast_snooping_mdb_flush(ofproto->ms);
}

/* Always updates the ofproto->pins_seqno to avoid frequent wakeup during
/* Always updates the ofproto->ams_seqno to avoid frequent wakeup during
* flow restore. Even though nothing is processed during flow restore,
* all queued 'pins' will be handled immediately when flow restore
* all queued 'ams' will be handled immediately when flow restore
* completes. */
ofproto->pins_seqno = seq_read(ofproto->pins_seq);
ofproto->ams_seqno = seq_read(ofproto->ams_seq);

/* Do not perform any periodic activity required by 'ofproto' while
* waiting for flow restore to complete. */
if (!ofproto_get_flow_restore_wait()) {
struct ofproto_packet_in *pin;
struct ovs_list pins;
struct ofproto_async_msg *am;
struct ovs_list ams;

guarded_list_pop_all(&ofproto->pins, &pins);
LIST_FOR_EACH_POP (pin, list_node, &pins) {
connmgr_send_packet_in(ofproto->up.connmgr, pin);
free(CONST_CAST(void *, pin->up.packet));
free(pin);
guarded_list_pop_all(&ofproto->ams, &ams);
LIST_FOR_EACH_POP (am, list_node, &ams) {
connmgr_send_async_msg(ofproto->up.connmgr, am);
ofproto_async_msg_free(am);
}
}

Expand Down Expand Up @@ -1641,7 +1638,7 @@ wait(struct ofproto *ofproto_)
}

seq_wait(udpif_dump_seq(ofproto->backer->udpif), ofproto->dump_seq);
seq_wait(ofproto->pins_seq, ofproto->pins_seqno);
seq_wait(ofproto->ams_seq, ofproto->ams_seqno);
}

static void
Expand Down
6 changes: 3 additions & 3 deletions ofproto/ofproto-dpif.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
union user_action_cookie;
struct dpif_flow_stats;
struct ofproto;
struct ofproto_async_msg;
struct ofproto_dpif;
struct ofproto_packet_in;
struct ofport_dpif;
struct dpif_backer;
struct OVS_LOCKABLE rule_dpif;
Expand Down Expand Up @@ -157,8 +157,8 @@ int ofproto_dpif_execute_actions__(struct ofproto_dpif *, const struct flow *,
struct rule_dpif *, const struct ofpact *,
size_t ofpacts_len, int recurse,
int resubmits, struct dp_packet *);
void ofproto_dpif_send_packet_in(struct ofproto_dpif *,
struct ofproto_packet_in *);
void ofproto_dpif_send_async_msg(struct ofproto_dpif *,
struct ofproto_async_msg *);
bool ofproto_dpif_wants_packet_in_on_miss(struct ofproto_dpif *);
int ofproto_dpif_send_packet(const struct ofport_dpif *, struct dp_packet *);
void ofproto_dpif_flow_mod(struct ofproto_dpif *,
Expand Down

0 comments on commit a2b53de

Please sign in to comment.