Skip to content

Commit

Permalink
dpif-netdev: Allow multi-rx-queue, multi-pmd-thread configuration.
Browse files Browse the repository at this point in the history
This commits adds the multithreading functionality to OVS dpdk
module.  Users are able to create multiple pmd threads and set
their cpu affinity via specifying the cpu mask string similar
to the EAL '-c COREMASK' option.

Also, the number of rx queues for each dpdk interface is made
configurable to help distribution of rx packets among multiple
pmd threads.

Signed-off-by: Alex Wang <[email protected]>
Acked-by: Pravin B Shelar <[email protected]>
  • Loading branch information
yew011 committed Sep 19, 2014
1 parent 8db2f89 commit f2eee18
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 8 deletions.
122 changes: 114 additions & 8 deletions lib/dpif-netdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ struct dp_netdev {
/* Each pmd thread will store its pointer to
* 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
ovsthread_key_t per_pmd_key;

/* Number of rx queues for each dpdk interface and the cpu mask
* for pin of pmd threads. */
size_t n_dpdk_rxqs;
char *pmd_cmask;
};

static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
Expand Down Expand Up @@ -396,10 +401,12 @@ static void dp_netdev_disable_upcall(struct dp_netdev *);
static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev *dp, int index,
int core_id, int numa_id);
static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp);
static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);

static void emc_clear_entry(struct emc_entry *ce);

Expand Down Expand Up @@ -539,7 +546,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
OVS_REQUIRES(dp_netdev_mutex)
{
struct dp_netdev *dp;
struct dp_netdev_pmd_thread *non_pmd;
int error;

dp = xzalloc(sizeof *dp);
Expand Down Expand Up @@ -572,9 +578,8 @@ create_dp_netdev(const char *name, const struct dpif_class *class,

/* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */
ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
non_pmd = xzalloc(sizeof *non_pmd);
dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
OVS_NUMA_UNSPEC);
dp_netdev_set_nonpmd(dp);
dp->n_dpdk_rxqs = NR_QUEUE;

ovs_mutex_lock(&dp->port_mutex);
error = do_add_port(dp, name, "internal", ODPP_LOCAL);
Expand Down Expand Up @@ -649,6 +654,7 @@ dp_netdev_free(struct dp_netdev *dp)
cmap_destroy(&dp->ports);
fat_rwlock_destroy(&dp->upcall_rwlock);

free(dp->pmd_cmask);
free(CONST_CAST(char *, dp->name));
free(dp);
}
Expand Down Expand Up @@ -778,8 +784,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
return ENOENT;
}
/* There can only be ovs_numa_get_n_cores() pmd threads,
* so creates a tx_q for each. */
error = netdev_set_multiq(netdev, n_cores, NR_QUEUE);
* so creates a txq for each. */
error = netdev_set_multiq(netdev, n_cores, dp->n_dpdk_rxqs);
if (error) {
VLOG_ERR("%s, cannot set multiq", devname);
return errno;
Expand Down Expand Up @@ -1904,6 +1910,77 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
}
}

/* Returns true if the configuration for rx queues or cpu mask
* is changed. */
static bool
pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask)
{
if (dp->n_dpdk_rxqs != rxqs) {
return true;
} else {
if (dp->pmd_cmask != NULL && cmask != NULL) {
return strcmp(dp->pmd_cmask, cmask);
} else {
return (dp->pmd_cmask != NULL || cmask != NULL);
}
}
}

/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */
static int
dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
{
struct dp_netdev *dp = get_dp_netdev(dpif);

if (pmd_config_changed(dp, n_rxqs, cmask)) {
struct dp_netdev_port *port;

dp_netdev_destroy_all_pmds(dp);

CMAP_FOR_EACH (port, node, &dp->ports) {
if (netdev_is_pmd(port->netdev)) {
int i, err;

/* Closes the existing 'rxq's. */
for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
netdev_rxq_close(port->rxq[i]);
port->rxq[i] = NULL;
}

/* Sets the new rx queue config. */
err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(),
n_rxqs);
if (err) {
VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
" %u", netdev_get_name(port->netdev),
n_rxqs);
return err;
}

/* If the set_multiq() above succeeds, reopens the 'rxq's. */
port->rxq = xrealloc(port->rxq, sizeof *port->rxq
* netdev_n_rxq(port->netdev));
for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
netdev_rxq_open(port->netdev, &port->rxq[i], i);
}
}
}
dp->n_dpdk_rxqs = n_rxqs;

/* Reconfigures the cpu mask. */
ovs_numa_set_cpu_mask(cmask);
free(dp->pmd_cmask);
dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL;

/* Restores the non-pmd. */
dp_netdev_set_nonpmd(dp);
/* Restores all pmd threads. */
dp_netdev_reset_pmd_threads(dp);
}

return 0;
}

static int
dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
uint32_t queue_id, uint32_t *priority)
Expand Down Expand Up @@ -2155,6 +2232,17 @@ dp_netdev_get_nonpmd(struct dp_netdev *dp)
return pmd;
}

/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
static void
dp_netdev_set_nonpmd(struct dp_netdev *dp)
{
struct dp_netdev_pmd_thread *non_pmd;

non_pmd = xzalloc(sizeof *non_pmd);
dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
OVS_NUMA_UNSPEC);
}

/* Configures the 'pmd' based on the input argument. */
static void
dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
Expand Down Expand Up @@ -2247,8 +2335,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
return;
}

/* Tries creating NR_PMD_THREADS pmd threads on the numa node. */
can_have = MIN(n_unpinned, NR_PMD_THREADS);
/* If cpu mask is specified, uses all unpinned cores, otherwise
* tries creating NR_PMD_THREADS pmd threads. */
can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
for (i = 0; i < can_have; i++) {
struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
Expand All @@ -2271,6 +2360,22 @@ dp_netdev_flow_stats_new_cb(void)
return bucket;
}

/* Called after pmd threads config change. Restarts pmd threads with
* new configuration. */
static void
dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
{
struct dp_netdev_port *port;

CMAP_FOR_EACH (port, node, &dp->ports) {
if (netdev_is_pmd(port->netdev)) {
int numa_id = netdev_get_numa_id(port->netdev);

dp_netdev_set_pmds_on_numa(dp, numa_id);
}
}
}

static void
dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
int cnt, int size,
Expand Down Expand Up @@ -2834,6 +2939,7 @@ const struct dpif_class dpif_netdev_class = {
dpif_netdev_operate,
NULL, /* recv_set */
NULL, /* handlers_set */
dpif_netdev_pmd_set,
dpif_netdev_queue_to_priority,
NULL, /* recv */
NULL, /* recv_wait */
Expand Down
1 change: 1 addition & 0 deletions lib/dpif-netlink.c
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,7 @@ const struct dpif_class dpif_netlink_class = {
dpif_netlink_operate,
dpif_netlink_recv_set,
dpif_netlink_handlers_set,
NULL, /* poll_thread_set */
dpif_netlink_queue_to_priority,
dpif_netlink_recv,
dpif_netlink_recv_wait,
Expand Down
7 changes: 7 additions & 0 deletions lib/dpif-provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ struct dpif_class {
* */
int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers);

/* If 'dpif' creates its own I/O polling threads, refreshes poll threads
* configuration. 'n_rxqs' configures the number of rx_queues, which
* are distributed among threads. 'cmask' configures the cpu mask
* for setting the polling threads' cpu affinity. */
int (*poll_threads_set)(struct dpif *dpif, unsigned int n_rxqs,
const char *cmask);

/* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
* priority value used for setting packet priority. */
int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
Expand Down
18 changes: 18 additions & 0 deletions lib/dpif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,24 @@ dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
}
}

/* If 'dpif' creates its own I/O polling threads, refreshes poll threads
* configuration. */
int
dpif_poll_threads_set(struct dpif *dpif, unsigned int n_rxqs,
const char *cmask)
{
int error = 0;

if (dpif->dpif_class->poll_threads_set) {
error = dpif->dpif_class->poll_threads_set(dpif, n_rxqs, cmask);
if (error) {
log_operation(dpif, "poll_threads_set", error);
}
}

return error;
}

/* Polls for an upcall from 'dpif' for an upcall handler. Since there
* there can be multiple poll loops, 'handler_id' is needed as index to
* identify the corresponding poll loop. If successful, stores the upcall
Expand Down
2 changes: 2 additions & 0 deletions lib/dpif.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,8 @@ void dpif_register_upcall_cb(struct dpif *, upcall_callback *, void *aux);

int dpif_recv_set(struct dpif *, bool enable);
int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
int dpif_poll_threads_set(struct dpif *, unsigned int n_rxqs,
const char *cmask);
int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
struct ofpbuf *);
void dpif_recv_purge(struct dpif *);
Expand Down
2 changes: 2 additions & 0 deletions ofproto/ofproto-dpif.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ type_run(const char *type)
udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
}

dpif_poll_threads_set(backer->dpif, n_dpdk_rxqs, pmd_cpu_mask);

if (backer->need_revalidate) {
struct ofproto_dpif *ofproto;
struct simap_node *node;
Expand Down
6 changes: 6 additions & 0 deletions ofproto/ofproto-provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ extern unsigned ofproto_max_idle;
* ofproto-dpif implementation. */
extern size_t n_handlers, n_revalidators;

/* Number of rx queues to be created for each dpdk interface. */
extern size_t n_dpdk_rxqs;

/* Cpu mask for pmd threads. */
extern char *pmd_cpu_mask;

static inline struct rule *rule_from_cls_rule(const struct cls_rule *);

void ofproto_rule_expire(struct rule *rule, uint8_t reason)
Expand Down
16 changes: 16 additions & 0 deletions ofproto/ofproto.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ unsigned ofproto_flow_limit = OFPROTO_FLOW_LIMIT_DEFAULT;
unsigned ofproto_max_idle = OFPROTO_MAX_IDLE_DEFAULT;

size_t n_handlers, n_revalidators;
size_t n_dpdk_rxqs;
char *pmd_cpu_mask;

/* Map from datapath name to struct ofproto, for use by unixctl commands. */
static struct hmap all_ofprotos = HMAP_INITIALIZER(&all_ofprotos);
Expand Down Expand Up @@ -730,6 +732,20 @@ ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, bool flood)
: EOPNOTSUPP);
}

void
ofproto_set_n_dpdk_rxqs(int n_rxqs)
{
n_dpdk_rxqs = MAX(n_rxqs, 0);
}

void
ofproto_set_cpu_mask(const char *cmask)
{
free(pmd_cpu_mask);

pmd_cpu_mask = cmask ? xstrdup(cmask) : NULL;
}

void
ofproto_set_threads(int n_handlers_, int n_revalidators_)
{
Expand Down
2 changes: 2 additions & 0 deletions ofproto/ofproto.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ int ofproto_set_mcast_snooping(struct ofproto *ofproto,
int ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux,
bool flood);
void ofproto_set_threads(int n_handlers, int n_revalidators);
void ofproto_set_n_dpdk_rxqs(int n_rxqs);
void ofproto_set_cpu_mask(const char *cmask);
void ofproto_set_dp_desc(struct ofproto *, const char *dp_desc);
int ofproto_set_snoops(struct ofproto *, const struct sset *snoops);
int ofproto_set_netflow(struct ofproto *,
Expand Down
3 changes: 3 additions & 0 deletions vswitchd/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg)
OFPROTO_FLOW_LIMIT_DEFAULT));
ofproto_set_max_idle(smap_get_int(&ovs_cfg->other_config, "max-idle",
OFPROTO_MAX_IDLE_DEFAULT));
ofproto_set_n_dpdk_rxqs(smap_get_int(&ovs_cfg->other_config,
"n-dpdk-rxqs", 0));
ofproto_set_cpu_mask(smap_get(&ovs_cfg->other_config, "pmd-cpu-mask"));

ofproto_set_threads(
smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0),
Expand Down
27 changes: 27 additions & 0 deletions vswitchd/vswitch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,33 @@
</p>
</column>

<column name="other_config" key="n-dpdk-rxqs"
type='{"type": "integer", "minInteger": 1}'>
<p>
Specifies the number of rx queues to be created for each dpdk
interface. If not specified or specified to 0, one rx queue will
be created for each dpdk interface by default.
</p>
</column>

<column name="other_config" key="pmd-cpu-mask">
<p>
Specifies CPU mask for setting the cpu affinity of PMD (Poll
Mode Driver) threads. Value should be in the form of hex string,
similar to the dpdk EAL '-c COREMASK' option input or the 'taskset'
mask input.
</p>
<p>
The lowest order bit corresponds to the first CPU core. A set bit
means the corresponding core is available. If the input does not
cover all cores, those uncovered cores are considered not set.
</p>
<p>
If not specified, one pmd thread will be created for each numa node
and pinned to any available core on the numa node by default.
</p>
</column>

<column name="other_config" key="n-handler-threads"
type='{"type": "integer", "minInteger": 1}'>
<p>
Expand Down

0 comments on commit f2eee18

Please sign in to comment.