Skip to content

Commit

Permalink
net: Avoid enqueuing skb for default qdiscs
Browse files Browse the repository at this point in the history
dev_queue_xmit enqueue's a skb and calls qdisc_run which
dequeue's the skb and xmits it. In most cases, the skb that
is enqueue'd is the same one that is dequeue'd (unless the
queue gets stopped or multiple cpu's write to the same queue
and ends in a race with qdisc_run). For default qdiscs, we
can remove the redundant enqueue/dequeue and simply xmit the
skb since the default qdisc is work-conserving.

The patch uses a new flag - TCQ_F_CAN_BYPASS to identify the
default fast queue. The controversial part of the patch is
incrementing qlen when a skb is requeued - this is to avoid
checks like the second line below:

+  } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
>>         !q->gso_skb &&
+          !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {

Results of a 2 hour testing for multiple netperf sessions (1,
2, 4, 8, 12 sessions on a 4 cpu system-X). The BW numbers are
aggregate Mb/s across iterations tested with this version on
System-X boxes with Chelsio 10gbps cards:

----------------------------------
Size |  ORG BW          NEW BW   |
----------------------------------
128K |  156964          159381   |
256K |  158650          162042   |
----------------------------------

Changes from ver1:

1. Move sch_direct_xmit declaration from sch_generic.h to
   pkt_sched.h
2. Update qdisc basic statistics for direct xmit path.
3. Set qlen to zero in qdisc_reset.
4. Changed some function names to more meaningful ones.

Signed-off-by: Krishna Kumar <[email protected]>
Signed-off-by: David S. Miller <[email protected]>
  • Loading branch information
krkumar authored and davem330 committed Aug 7, 2009
1 parent 9f519f6 commit bbd8a0d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 51 deletions.
3 changes: 3 additions & 0 deletions include/net/pkt_sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_get_rtab(struct tc_ratespec *r,
extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
extern void qdisc_put_stab(struct qdisc_size_table *tab);
extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock);

extern void __qdisc_run(struct Qdisc *q);

Expand Down
15 changes: 13 additions & 2 deletions include/net/sch_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct Qdisc
#define TCQ_F_BUILTIN 1
#define TCQ_F_THROTTLED 2
#define TCQ_F_INGRESS 4
#define TCQ_F_CAN_BYPASS 8
#define TCQ_F_WARN_NONWC (1 << 16)
int padded;
struct Qdisc_ops *ops;
Expand Down Expand Up @@ -182,6 +183,11 @@ struct qdisc_skb_cb {
char data[];
};

static inline int qdisc_qlen(struct Qdisc *q)
{
return q->q.qlen;
}

static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
{
return (struct qdisc_skb_cb *)skb->cb;
Expand Down Expand Up @@ -387,13 +393,18 @@ static inline int qdisc_enqueue_root(struct sk_buff *skb, struct Qdisc *sch)
return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
}

static inline void __qdisc_update_bstats(struct Qdisc *sch, unsigned int len)
{
sch->bstats.bytes += len;
sch->bstats.packets++;
}

static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch,
struct sk_buff_head *list)
{
__skb_queue_tail(list, skb);
sch->qstats.backlog += qdisc_pkt_len(skb);
sch->bstats.bytes += qdisc_pkt_len(skb);
sch->bstats.packets++;
__qdisc_update_bstats(sch, qdisc_pkt_len(skb));

return NET_XMIT_SUCCESS;
}
Expand Down
48 changes: 35 additions & 13 deletions net/core/dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,40 @@ static struct netdev_queue *dev_pick_tx(struct net_device *dev,
return netdev_get_tx_queue(dev, queue_index);
}

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev,
struct netdev_queue *txq)
{
spinlock_t *root_lock = qdisc_lock(q);
int rc;

spin_lock(root_lock);
if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
kfree_skb(skb);
rc = NET_XMIT_DROP;
} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
/*
* This is a work-conserving queue; there are no old skbs
* waiting to be sent out; and the qdisc is not running -
* xmit the skb directly.
*/
__qdisc_update_bstats(q, skb->len);
if (sch_direct_xmit(skb, q, dev, txq, root_lock))
__qdisc_run(q);
else
clear_bit(__QDISC_STATE_RUNNING, &q->state);

rc = NET_XMIT_SUCCESS;
} else {
rc = qdisc_enqueue_root(skb, q);
qdisc_run(q);
}
spin_unlock(root_lock);

return rc;
}

/**
* dev_queue_xmit - transmit a buffer
* @skb: buffer to transmit
Expand Down Expand Up @@ -1859,19 +1893,7 @@ int dev_queue_xmit(struct sk_buff *skb)
skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
#endif
if (q->enqueue) {
spinlock_t *root_lock = qdisc_lock(q);

spin_lock(root_lock);

if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
kfree_skb(skb);
rc = NET_XMIT_DROP;
} else {
rc = qdisc_enqueue_root(skb, q);
qdisc_run(q);
}
spin_unlock(root_lock);

rc = __dev_xmit_skb(skb, q, dev, txq);
goto out;
}

Expand Down
93 changes: 57 additions & 36 deletions net/sched/sch_generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,11 @@
* - updates to tree and tree walking are only done under the rtnl mutex.
*/

static inline int qdisc_qlen(struct Qdisc *q)
{
return q->q.qlen;
}

static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
{
q->gso_skb = skb;
q->qstats.requeues++;
q->q.qlen++; /* it's still part of the queue */
__netif_schedule(q);

return 0;
Expand All @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_skb(struct Qdisc *q)

/* check the reason of requeuing without tx lock first */
txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
if (!netif_tx_queue_stopped(txq) &&
!netif_tx_queue_frozen(txq)) {
q->gso_skb = NULL;
else
q->q.qlen--;
} else
skb = NULL;
} else {
skb = q->dequeue(q);
Expand Down Expand Up @@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collision(struct sk_buff *skb,
}

/*
* NOTE: Called under qdisc_lock(q) with locally disabled BH.
*
* __QDISC_STATE_RUNNING guarantees only one CPU can process
* this qdisc at a time. qdisc_lock(q) serializes queue accesses for
* this queue.
*
* netif_tx_lock serializes accesses to device driver.
*
* qdisc_lock(q) and netif_tx_lock are mutually exclusive,
* if one is grabbed, another must be free.
*
* Note, that this procedure can be called by a watchdog timer
* Transmit one skb, and handle the return status as required. Holding the
* __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
* function.
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
*
*/
static inline int qdisc_restart(struct Qdisc *q)
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock)
{
struct netdev_queue *txq;
int ret = NETDEV_TX_BUSY;
struct net_device *dev;
spinlock_t *root_lock;
struct sk_buff *skb;

/* Dequeue packet */
if (unlikely((skb = dequeue_skb(q)) == NULL))
return 0;

root_lock = qdisc_lock(q);

/* And release qdisc */
spin_unlock(root_lock);

dev = qdisc_dev(q);
txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

HARD_TX_LOCK(dev, txq, smp_processor_id());
if (!netif_tx_queue_stopped(txq) &&
!netif_tx_queue_frozen(txq))
Expand Down Expand Up @@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Qdisc *q)
return ret;
}

/*
* NOTE: Called under qdisc_lock(q) with locally disabled BH.
*
* __QDISC_STATE_RUNNING guarantees only one CPU can process
* this qdisc at a time. qdisc_lock(q) serializes queue accesses for
* this queue.
*
* netif_tx_lock serializes accesses to device driver.
*
* qdisc_lock(q) and netif_tx_lock are mutually exclusive,
* if one is grabbed, another must be free.
*
* Note, that this procedure can be called by a watchdog timer
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
*
*/
static inline int qdisc_restart(struct Qdisc *q)
{
struct netdev_queue *txq;
struct net_device *dev;
spinlock_t *root_lock;
struct sk_buff *skb;

/* Dequeue packet */
skb = dequeue_skb(q);
if (unlikely(!skb))
return 0;

root_lock = qdisc_lock(q);
dev = qdisc_dev(q);
txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

return sch_direct_xmit(skb, q, dev, txq, root_lock);
}

void __qdisc_run(struct Qdisc *q)
{
unsigned long start_time = jiffies;
Expand Down Expand Up @@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
if (ops->reset)
ops->reset(qdisc);

kfree_skb(qdisc->gso_skb);
qdisc->gso_skb = NULL;
if (qdisc->gso_skb) {
kfree_skb(qdisc->gso_skb);
qdisc->gso_skb = NULL;
qdisc->q.qlen = 0;
}
}
EXPORT_SYMBOL(qdisc_reset);

Expand Down Expand Up @@ -605,6 +623,9 @@ static void attach_one_default_qdisc(struct net_device *dev,
printk(KERN_INFO "%s: activation failed\n", dev->name);
return;
}

/* Can by-pass the queue discipline for default qdisc */
qdisc->flags |= TCQ_F_CAN_BYPASS;
} else {
qdisc = &noqueue_qdisc;
}
Expand Down

0 comments on commit bbd8a0d

Please sign in to comment.