Skip to content

Commit

Permalink
dpif-netdev: Time based output batching.
Browse files Browse the repository at this point in the history
This allows to collect packets from more than one RX burst
and send them together with a configurable intervals.

'other_config:tx-flush-interval' can be used to configure
time that a packet can wait in output batch for sending.

'tx-flush-interval' has microsecond resolution.

Tested-by: Jan Scheurich <[email protected]>
Acked-by: Jan Scheurich <[email protected]>
Signed-off-by: Ilya Maximets <[email protected]>
Signed-off-by: Ian Stokes <[email protected]>
  • Loading branch information
igsilya authored and istokes committed Jan 17, 2018
1 parent 58ed6df commit c71ea3c
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 22 deletions.
108 changes: 86 additions & 22 deletions lib/dpif-netdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
#define MAX_RECIRC_DEPTH 6
DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)

/* Use instant packet send by default. */
#define DEFAULT_TX_FLUSH_INTERVAL 0

/* Configuration parameters. */
enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
enum { MAX_METERS = 65536 }; /* Maximum number of meters. */
Expand Down Expand Up @@ -269,6 +272,9 @@ struct dp_netdev {
struct hmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */

/* The time that a packet can wait in output batch for sending. */
atomic_uint32_t tx_flush_interval;

/* Meters. */
struct ovs_mutex meter_locks[N_METER_LOCKS];
struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
Expand Down Expand Up @@ -496,6 +502,7 @@ struct tx_port {
int qid;
long long last_used;
struct hmap_node node;
long long flush_time;
struct dp_packet_batch output_pkts;
struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST];
};
Expand Down Expand Up @@ -577,6 +584,9 @@ struct dp_netdev_pmd_thread {
* than 'cmap_count(dp->poll_threads)'. */
uint32_t static_tx_qid;

/* Number of filled output batches. */
int n_output_batches;

struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
struct hmap poll_list OVS_GUARDED;
Expand Down Expand Up @@ -666,8 +676,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
struct rxq_poll *poll)
OVS_REQUIRES(pmd->port_mutex);
static void
dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd);
static int
dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
bool force);

static void reconfigure_datapath(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
Expand Down Expand Up @@ -1241,6 +1252,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
conntrack_init(&dp->conntrack);

atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);

cmap_init(&dp->poll_threads);

Expand Down Expand Up @@ -2907,7 +2919,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
dp_packet_batch_init_packet(&pp, execute->packet);
dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
execute->actions, execute->actions_len);
dp_netdev_pmd_flush_output_packets(pmd);
dp_netdev_pmd_flush_output_packets(pmd, true);

if (pmd->core_id == NON_PMD_CORE_ID) {
ovs_mutex_unlock(&dp->non_pmd_mutex);
Expand Down Expand Up @@ -2956,6 +2968,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
smap_get_ullong(other_config, "emc-insert-inv-prob",
DEFAULT_EM_FLOW_INSERT_INV_PROB);
uint32_t insert_min, cur_min;
uint32_t tx_flush_interval, cur_tx_flush_interval;

tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
DEFAULT_TX_FLUSH_INTERVAL);
atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
if (tx_flush_interval != cur_tx_flush_interval) {
atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
tx_flush_interval);
}

if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
free(dp->pmd_cmask);
Expand Down Expand Up @@ -3150,7 +3172,7 @@ dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
return processing_cycles;
}

static void
static int
dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
struct tx_port *p)
{
Expand All @@ -3160,6 +3182,7 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
bool dynamic_txqs;
struct cycle_timer timer;
uint64_t cycles;
uint32_t tx_flush_interval;

cycle_timer_start(&pmd->perf_stats, &timer);

Expand All @@ -3176,6 +3199,13 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
dp_packet_batch_init(&p->output_pkts);

/* Update time of the next flush. */
atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval);
p->flush_time = pmd->ctx.now + tx_flush_interval;

ovs_assert(pmd->n_output_batches > 0);
pmd->n_output_batches--;

pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_PKTS, output_cnt);
pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_BATCHES, 1);

Expand All @@ -3188,18 +3218,28 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
RXQ_CYCLES_PROC_CURR, cycles);
}
}

return output_cnt;
}

static void
dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd)
static int
dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
bool force)
{
struct tx_port *p;
int output_cnt = 0;

if (!pmd->n_output_batches) {
return 0;
}

HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
if (!dp_packet_batch_is_empty(&p->output_pkts)) {
dp_netdev_pmd_flush_output_on_port(pmd, p);
if (!dp_packet_batch_is_empty(&p->output_pkts)
&& (force || pmd->ctx.now >= p->flush_time)) {
output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
}
}
return output_cnt;
}

static int
Expand All @@ -3210,7 +3250,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch batch;
struct cycle_timer timer;
int error;
int batch_cnt = 0;
int batch_cnt = 0, output_cnt = 0;
uint64_t cycles;

/* Measure duration for polling and processing rx burst. */
Expand All @@ -3232,7 +3272,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
cycles = cycle_timer_stop(&pmd->perf_stats, &timer);
dp_netdev_rxq_add_cycles(rxq, RXQ_CYCLES_PROC_CURR, cycles);

dp_netdev_pmd_flush_output_packets(pmd);
output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
} else {
/* Discard cycles. */
cycle_timer_stop(&pmd->perf_stats, &timer);
Expand All @@ -3246,7 +3286,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,

pmd->ctx.last_rxq = NULL;

return batch_cnt;
return batch_cnt + output_cnt;
}

static struct tx_port *
Expand Down Expand Up @@ -3869,6 +3909,7 @@ dpif_netdev_run(struct dpif *dpif)
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *non_pmd;
uint64_t new_tnl_seq;
bool need_to_flush = true;

ovs_mutex_lock(&dp->port_mutex);
non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
Expand All @@ -3879,13 +3920,22 @@ dpif_netdev_run(struct dpif *dpif)
int i;

for (i = 0; i < port->n_rxq; i++) {
dp_netdev_process_rxq_port(non_pmd,
&port->rxqs[i],
port->port_no);
if (dp_netdev_process_rxq_port(non_pmd,
&port->rxqs[i],
port->port_no)) {
need_to_flush = false;
}
}
}
}
pmd_thread_ctx_time_update(non_pmd);
if (need_to_flush) {
/* We didn't receive anything in the process loop.
* Check if we need to send something.
* There was no time updates on current iteration. */
pmd_thread_ctx_time_update(non_pmd);
dp_netdev_pmd_flush_output_packets(non_pmd, false);
}

dpif_netdev_xps_revalidate_pmd(non_pmd, false);
ovs_mutex_unlock(&dp->non_pmd_mutex);

Expand Down Expand Up @@ -3936,6 +3986,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
{
struct tx_port *tx_port_cached;

/* Flush all the queued packets. */
dp_netdev_pmd_flush_output_packets(pmd, true);
/* Free all used tx queue ids. */
dpif_netdev_xps_revalidate_pmd(pmd, true);

Expand Down Expand Up @@ -4066,6 +4118,7 @@ pmd_thread_main(void *f_)
cycles_counter_update(s);
for (;;) {
uint64_t iter_packets = 0;

pmd_perf_start_iteration(s);
for (i = 0; i < poll_cnt; i++) {
process_packets =
Expand All @@ -4074,15 +4127,20 @@ pmd_thread_main(void *f_)
iter_packets += process_packets;
}

if (!iter_packets) {
/* We didn't receive anything in the process loop.
* Check if we need to send something.
* There was no time updates on current iteration. */
pmd_thread_ctx_time_update(pmd);
iter_packets += dp_netdev_pmd_flush_output_packets(pmd, false);
}

if (lc++ > 1024) {
bool reload;

lc = 0;

coverage_try_clear();
/* It's possible that the time was not updated on current
* iteration, if there were no received packets. */
pmd_thread_ctx_time_update(pmd);
dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(&pmd->flow_cache);
Expand Down Expand Up @@ -4521,6 +4579,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd->core_id = core_id;
pmd->numa_id = numa_id;
pmd->need_reload = false;
pmd->n_output_batches = 0;

ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
Expand Down Expand Up @@ -4710,6 +4769,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,

tx->port = port;
tx->qid = -1;
tx->flush_time = 0LL;
dp_packet_batch_init(&tx->output_pkts);

hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
Expand Down Expand Up @@ -5404,12 +5464,16 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
dp_netdev_pmd_flush_output_on_port(pmd, p);
}
#endif
if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
+ dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
/* Some packets was generated while input batch processing.
* Flush here to avoid overflow. */
if (dp_packet_batch_size(&p->output_pkts)
+ dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
/* Flush here to avoid overflow. */
dp_netdev_pmd_flush_output_on_port(pmd, p);
}

if (dp_packet_batch_is_empty(&p->output_pkts)) {
pmd->n_output_batches++;
}

DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
p->output_pkts_rxqs[dp_packet_batch_size(&p->output_pkts)] =
pmd->ctx.last_rxq;
Expand Down
16 changes: 16 additions & 0 deletions vswitchd/vswitch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,22 @@
</p>
</column>

<column name="other_config" key="tx-flush-interval"
type='{"type": "integer",
"minInteger": 0, "maxInteger": 1000000}'>
<p>
Specifies the time in microseconds that a packet can wait in output
batch for sending i.e. amount of time that packet can spend in an
intermediate output queue before sending to netdev.
This option can be used to configure balance between throughput
and latency. Lower values decreases latency while higher values
may be useful to achieve higher performance.
</p>
<p>
Defaults to 0 i.e. instant packet sending (latency optimized).
</p>
</column>

<column name="other_config" key="n-handler-threads"
type='{"type": "integer", "minInteger": 1}'>
<p>
Expand Down

0 comments on commit c71ea3c

Please sign in to comment.