Skip to content

Commit

Permalink
tipc: transfer broadcast nacks in link state messages
Browse files Browse the repository at this point in the history
When we send broadcasts in clusters of more 70-80 nodes, we sometimes
see the broadcast link resetting because of an excessive number of
retransmissions. This is caused by a combination of two factors:

1) A 'NACK crunch", where loss of broadcast packets is discovered
   and NACK'ed by several nodes simultaneously, leading to multiple
   redundant broadcast retransmissions.

2) The fact that the NACKS as such also are sent as broadcast, leading
   to excessive load and packet loss on the transmitting switch/bridge.

This commit deals with the latter problem, by moving sending of
broadcast nacks from the dedicated BCAST_PROTOCOL/NACK message type
to regular unicast LINK_PROTOCOL/STATE messages. We allocate 10 unused
bits in word 8 of the said message for this purpose, and introduce a
new capability bit, TIPC_BCAST_STATE_NACK in order to keep the change
backwards compatible.

Reviewed-by: Ying Xue <[email protected]>
Signed-off-by: Jon Maloy <[email protected]>
Signed-off-by: David S. Miller <[email protected]>
  • Loading branch information
Jon Paul Maloy authored and davem330 committed Sep 3, 2016
1 parent 2c896fb commit 02d11ca
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 27 deletions.
8 changes: 5 additions & 3 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,19 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
*
* RCU is locked, no other locks set
*/
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr)
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr)
{
struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
struct sk_buff_head xmitq;
int rc = 0;

__skb_queue_head_init(&xmitq);

tipc_bcast_lock(net);
if (msg_type(hdr) == STATE_MSG) {
tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
tipc_link_bc_sync_rcv(l, hdr, &xmitq);
rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
} else {
tipc_link_bc_init_rcv(l, hdr);
}
Expand All @@ -291,6 +292,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
/* Any socket wakeup messages ? */
if (!skb_queue_empty(inputq))
tipc_sk_rcv(net, inputq);
return rc;
}

/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
Expand Down
4 changes: 2 additions & 2 deletions net/tipc/bcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ int tipc_bcast_get_mtu(struct net *net);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_bclink_reset_stats(struct net *net);
Expand Down
64 changes: 51 additions & 13 deletions net/tipc/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,18 @@ int tipc_link_bc_peers(struct tipc_link *l)
return l->ackers;
}

u16 link_bc_rcv_gap(struct tipc_link *l)
{
struct sk_buff *skb = skb_peek(&l->deferdq);
u16 gap = 0;

if (more(l->snd_nxt, l->rcv_nxt))
gap = l->snd_nxt - l->rcv_nxt;
if (skb)
gap = buf_seqno(skb) - l->rcv_nxt;
return gap;
}

void tipc_link_set_mtu(struct tipc_link *l, int mtu)
{
l->mtu = mtu;
Expand Down Expand Up @@ -1135,7 +1147,10 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
return 0;
l->rcv_unacked = 0;
return TIPC_LINK_SND_BC_ACK;

/* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */
l->snd_nxt = l->rcv_nxt;
return TIPC_LINK_SND_STATE;
}

/* Unicast ACK */
Expand Down Expand Up @@ -1236,7 +1251,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
rc |= tipc_link_input(l, skb, l->inputq);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
rc |= tipc_link_build_state_msg(l, xmitq);
if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
if (unlikely(rc & ~TIPC_LINK_SND_STATE))
break;
} while ((skb = __skb_dequeue(defq)));

Expand All @@ -1250,10 +1265,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
struct sk_buff_head *xmitq)
{
struct tipc_link *bcl = l->bc_rcvlink;
struct sk_buff *skb;
struct tipc_msg *hdr;
struct sk_buff_head *dfq = &l->deferdq;
bool node_up = link_is_up(l->bc_rcvlink);
bool node_up = link_is_up(bcl);
struct tipc_mon_state *mstate = &l->mon_state;
int dlen = 0;
void *data;
Expand Down Expand Up @@ -1281,7 +1297,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
msg_set_net_plane(hdr, l->net_plane);
msg_set_next_sent(hdr, l->snd_nxt);
msg_set_ack(hdr, l->rcv_nxt - 1);
msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
msg_set_link_tolerance(hdr, tolerance);
msg_set_linkprio(hdr, priority);
Expand All @@ -1291,6 +1307,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,

if (mtyp == STATE_MSG) {
msg_set_seq_gap(hdr, rcvgap);
msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
msg_set_probe(hdr, probe);
tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
msg_set_size(hdr, INT_H_SIZE + dlen);
Expand Down Expand Up @@ -1575,49 +1592,68 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)

/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
*/
void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
struct sk_buff_head *xmitq)
int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
struct sk_buff_head *xmitq)
{
u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
u16 from = msg_bcast_ack(hdr) + 1;
u16 to = from + msg_bc_gap(hdr) - 1;
int rc = 0;

if (!link_is_up(l))
return;
return rc;

if (!msg_peer_node_is_up(hdr))
return;
return rc;

/* Open when peer ackowledges our bcast init msg (pkt #1) */
if (msg_ack(hdr))
l->bc_peer_is_up = true;

if (!l->bc_peer_is_up)
return;
return rc;

/* Ignore if peers_snd_nxt goes beyond receive window */
if (more(peers_snd_nxt, l->rcv_nxt + l->window))
return;
return rc;

if (!less(to, from)) {
rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
l->stats.recv_nacks++;
}

l->snd_nxt = peers_snd_nxt;
if (link_bc_rcv_gap(l))
rc |= TIPC_LINK_SND_STATE;

/* Return now if sender supports nack via STATE messages */
if (l->peer_caps & TIPC_BCAST_STATE_NACK)
return rc;

/* Otherwise, be backwards compatible */

if (!more(peers_snd_nxt, l->rcv_nxt)) {
l->nack_state = BC_NACK_SND_CONDITIONAL;
return;
return 0;
}

/* Don't NACK if one was recently sent or peeked */
if (l->nack_state == BC_NACK_SND_SUPPRESS) {
l->nack_state = BC_NACK_SND_UNCONDITIONAL;
return;
return 0;
}

/* Conditionally delay NACK sending until next synch rcv */
if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
l->nack_state = BC_NACK_SND_UNCONDITIONAL;
if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
return;
return 0;
}

/* Send NACK now but suppress next one */
tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
l->nack_state = BC_NACK_SND_SUPPRESS;
return 0;
}

void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
Expand Down Expand Up @@ -1654,6 +1690,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
}

/* tipc_link_bc_nack_rcv(): receive broadcast nack message
* This function is here for backwards compatibility, since
* no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5.
*/
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
Expand Down
6 changes: 3 additions & 3 deletions net/tipc/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ enum {
enum {
TIPC_LINK_UP_EVT = 1,
TIPC_LINK_DOWN_EVT = (1 << 1),
TIPC_LINK_SND_BC_ACK = (1 << 2)
TIPC_LINK_SND_STATE = (1 << 2)
};

/* Starting value for maximum packet size negotiation on unicast links
Expand Down Expand Up @@ -138,8 +138,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
void tipc_link_build_bc_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
struct sk_buff_head *xmitq);
int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
struct sk_buff_head *xmitq);
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
#endif
10 changes: 10 additions & 0 deletions net/tipc/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,16 @@ static inline char *msg_media_addr(struct tipc_msg *m)
return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];
}

static inline u32 msg_bc_gap(struct tipc_msg *m)
{
return msg_bits(m, 8, 0, 0x3ff);
}

static inline void msg_set_bc_gap(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 8, 0, 0x3ff, n);
}

/*
* Word 9
*/
Expand Down
32 changes: 30 additions & 2 deletions net/tipc/node.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,34 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
kfree_skb(skb);
}

static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
int bearer_id, struct sk_buff_head *xmitq)
{
struct tipc_link *ucl;
int rc;

rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr);

if (rc & TIPC_LINK_DOWN_EVT) {
tipc_bearer_reset_all(n->net);
return;
}

if (!(rc & TIPC_LINK_SND_STATE))
return;

/* If probe message, a STATE response will be sent anyway */
if (msg_probe(hdr))
return;

/* Produce a STATE message carrying broadcast NACK */
tipc_node_read_lock(n);
ucl = n->links[bearer_id].link;
if (ucl)
tipc_link_build_state_msg(ucl, xmitq);
tipc_node_read_unlock(n);
}

/**
* tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
* @net: the applicable net namespace
Expand Down Expand Up @@ -1298,7 +1326,7 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
rc = tipc_bcast_rcv(net, be->link, skb);

/* Broadcast ACKs are sent on a unicast link */
if (rc & TIPC_LINK_SND_BC_ACK) {
if (rc & TIPC_LINK_SND_STATE) {
tipc_node_read_lock(n);
tipc_link_build_state_msg(le->link, &xmitq);
tipc_node_read_unlock(n);
Expand Down Expand Up @@ -1505,7 +1533,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)

/* Ensure broadcast reception is in synch with peer's send state */
if (unlikely(usr == LINK_PROTOCOL))
tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq);
else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);

Expand Down
11 changes: 7 additions & 4 deletions net/tipc/node.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* net/tipc/node.h: Include file for TIPC node management routines
*
* Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2000-2006, 2014-2016, Ericsson AB
* Copyright (c) 2005, 2010-2014, Wind River Systems
* All rights reserved.
*
Expand Down Expand Up @@ -45,11 +45,14 @@
/* Optional capabilities supported by this code version
*/
enum {
TIPC_BCAST_SYNCH = (1 << 1),
TIPC_BLOCK_FLOWCTL = (2 << 1)
TIPC_BCAST_SYNCH = (1 << 1),
TIPC_BCAST_STATE_NACK = (1 << 2),
TIPC_BLOCK_FLOWCTL = (1 << 3)
};

#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
TIPC_BCAST_STATE_NACK | \
TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1

void tipc_node_stop(struct net *net);
Expand Down

0 comments on commit 02d11ca

Please sign in to comment.