From f2eee189118139995d61468cecb85654dd45bb36 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Mon, 8 Sep 2014 15:22:26 -0700 Subject: [PATCH] dpif-netdev: Allow multi-rx-queue, multi-pmd-thread configuration. This commits adds the multithreading functionality to OVS dpdk module. Users are able to create multiple pmd threads and set their cpu affinity via specifying the cpu mask string similar to the EAL '-c COREMASK' option. Also, the number of rx queues for each dpdk interface is made configurable to help distribution of rx packets among multiple pmd threads. Signed-off-by: Alex Wang Acked-by: Pravin B Shelar --- lib/dpif-netdev.c | 122 ++++++++++++++++++++++++++++++++++--- lib/dpif-netlink.c | 1 + lib/dpif-provider.h | 7 +++ lib/dpif.c | 18 ++++++ lib/dpif.h | 2 + ofproto/ofproto-dpif.c | 2 + ofproto/ofproto-provider.h | 6 ++ ofproto/ofproto.c | 16 +++++ ofproto/ofproto.h | 2 + vswitchd/bridge.c | 3 + vswitchd/vswitch.xml | 27 ++++++++ 11 files changed, 198 insertions(+), 8 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 7e27acf2ba4..90fe01cbb03 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -206,6 +206,11 @@ struct dp_netdev { /* Each pmd thread will store its pointer to * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */ ovsthread_key_t per_pmd_key; + + /* Number of rx queues for each dpdk interface and the cpu mask + * for pin of pmd threads. */ + size_t n_dpdk_rxqs; + char *pmd_cmask; }; static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp, @@ -396,10 +401,12 @@ static void dp_netdev_disable_upcall(struct dp_netdev *); static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, int index, int core_id, int numa_id); +static void dp_netdev_set_nonpmd(struct dp_netdev *dp); static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp); static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); +static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); static void emc_clear_entry(struct emc_entry *ce); @@ -539,7 +546,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class, OVS_REQUIRES(dp_netdev_mutex) { struct dp_netdev *dp; - struct dp_netdev_pmd_thread *non_pmd; int error; dp = xzalloc(sizeof *dp); @@ -572,9 +578,8 @@ create_dp_netdev(const char *name, const struct dpif_class *class, /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */ ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID); - non_pmd = xzalloc(sizeof *non_pmd); - dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, - OVS_NUMA_UNSPEC); + dp_netdev_set_nonpmd(dp); + dp->n_dpdk_rxqs = NR_QUEUE; ovs_mutex_lock(&dp->port_mutex); error = do_add_port(dp, name, "internal", ODPP_LOCAL); @@ -649,6 +654,7 @@ dp_netdev_free(struct dp_netdev *dp) cmap_destroy(&dp->ports); fat_rwlock_destroy(&dp->upcall_rwlock); + free(dp->pmd_cmask); free(CONST_CAST(char *, dp->name)); free(dp); } @@ -778,8 +784,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, return ENOENT; } /* There can only be ovs_numa_get_n_cores() pmd threads, - * so creates a tx_q for each. */ - error = netdev_set_multiq(netdev, n_cores, NR_QUEUE); + * so creates a txq for each. */ + error = netdev_set_multiq(netdev, n_cores, dp->n_dpdk_rxqs); if (error) { VLOG_ERR("%s, cannot set multiq", devname); return errno; @@ -1904,6 +1910,77 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops) } } +/* Returns true if the configuration for rx queues or cpu mask + * is changed. */ +static bool +pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask) +{ + if (dp->n_dpdk_rxqs != rxqs) { + return true; + } else { + if (dp->pmd_cmask != NULL && cmask != NULL) { + return strcmp(dp->pmd_cmask, cmask); + } else { + return (dp->pmd_cmask != NULL || cmask != NULL); + } + } +} + +/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */ +static int +dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask) +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + + if (pmd_config_changed(dp, n_rxqs, cmask)) { + struct dp_netdev_port *port; + + dp_netdev_destroy_all_pmds(dp); + + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev)) { + int i, err; + + /* Closes the existing 'rxq's. */ + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + netdev_rxq_close(port->rxq[i]); + port->rxq[i] = NULL; + } + + /* Sets the new rx queue config. */ + err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(), + n_rxqs); + if (err) { + VLOG_ERR("Failed to set dpdk interface %s rx_queue to:" + " %u", netdev_get_name(port->netdev), + n_rxqs); + return err; + } + + /* If the set_multiq() above succeeds, reopens the 'rxq's. */ + port->rxq = xrealloc(port->rxq, sizeof *port->rxq + * netdev_n_rxq(port->netdev)); + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + netdev_rxq_open(port->netdev, &port->rxq[i], i); + } + } + } + dp->n_dpdk_rxqs = n_rxqs; + + /* Reconfigures the cpu mask. */ + ovs_numa_set_cpu_mask(cmask); + free(dp->pmd_cmask); + dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL; + + /* Restores the non-pmd. */ + dp_netdev_set_nonpmd(dp); + /* Restores all pmd threads. */ + dp_netdev_reset_pmd_threads(dp); + } + + return 0; +} + static int dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, uint32_t queue_id, uint32_t *priority) @@ -2155,6 +2232,17 @@ dp_netdev_get_nonpmd(struct dp_netdev *dp) return pmd; } +/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */ +static void +dp_netdev_set_nonpmd(struct dp_netdev *dp) +{ + struct dp_netdev_pmd_thread *non_pmd; + + non_pmd = xzalloc(sizeof *non_pmd); + dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, + OVS_NUMA_UNSPEC); +} + /* Configures the 'pmd' based on the input argument. */ static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, @@ -2247,8 +2335,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) return; } - /* Tries creating NR_PMD_THREADS pmd threads on the numa node. */ - can_have = MIN(n_unpinned, NR_PMD_THREADS); + /* If cpu mask is specified, uses all unpinned cores, otherwise + * tries creating NR_PMD_THREADS pmd threads. */ + can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS); for (i = 0; i < can_have; i++) { struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd); int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id); @@ -2271,6 +2360,22 @@ dp_netdev_flow_stats_new_cb(void) return bucket; } +/* Called after pmd threads config change. Restarts pmd threads with + * new configuration. */ +static void +dp_netdev_reset_pmd_threads(struct dp_netdev *dp) +{ + struct dp_netdev_port *port; + + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev)) { + int numa_id = netdev_get_numa_id(port->netdev); + + dp_netdev_set_pmds_on_numa(dp, numa_id); + } + } +} + static void dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size, @@ -2834,6 +2939,7 @@ const struct dpif_class dpif_netdev_class = { dpif_netdev_operate, NULL, /* recv_set */ NULL, /* handlers_set */ + dpif_netdev_pmd_set, dpif_netdev_queue_to_priority, NULL, /* recv */ NULL, /* recv_wait */ diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c index d43aab0e2e9..029c4d1b5ea 100644 --- a/lib/dpif-netlink.c +++ b/lib/dpif-netlink.c @@ -1925,6 +1925,7 @@ const struct dpif_class dpif_netlink_class = { dpif_netlink_operate, dpif_netlink_recv_set, dpif_netlink_handlers_set, + NULL, /* poll_thread_set */ dpif_netlink_queue_to_priority, dpif_netlink_recv, dpif_netlink_recv_wait, diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h index 3f57049c9d8..65cf505f91a 100644 --- a/lib/dpif-provider.h +++ b/lib/dpif-provider.h @@ -300,6 +300,13 @@ struct dpif_class { * */ int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers); + /* If 'dpif' creates its own I/O polling threads, refreshes poll threads + * configuration. 'n_rxqs' configures the number of rx_queues, which + * are distributed among threads. 'cmask' configures the cpu mask + * for setting the polling threads' cpu affinity. */ + int (*poll_threads_set)(struct dpif *dpif, unsigned int n_rxqs, + const char *cmask); + /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a * priority value used for setting packet priority. */ int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id, diff --git a/lib/dpif.c b/lib/dpif.c index 4449202cc87..bdefdcc564d 100644 --- a/lib/dpif.c +++ b/lib/dpif.c @@ -1300,6 +1300,24 @@ dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall) } } +/* If 'dpif' creates its own I/O polling threads, refreshes poll threads + * configuration. */ +int +dpif_poll_threads_set(struct dpif *dpif, unsigned int n_rxqs, + const char *cmask) +{ + int error = 0; + + if (dpif->dpif_class->poll_threads_set) { + error = dpif->dpif_class->poll_threads_set(dpif, n_rxqs, cmask); + if (error) { + log_operation(dpif, "poll_threads_set", error); + } + } + + return error; +} + /* Polls for an upcall from 'dpif' for an upcall handler. Since there * there can be multiple poll loops, 'handler_id' is needed as index to * identify the corresponding poll loop. If successful, stores the upcall diff --git a/lib/dpif.h b/lib/dpif.h index be1bc4f7fa9..c57c8b01fb2 100644 --- a/lib/dpif.h +++ b/lib/dpif.h @@ -769,6 +769,8 @@ void dpif_register_upcall_cb(struct dpif *, upcall_callback *, void *aux); int dpif_recv_set(struct dpif *, bool enable); int dpif_handlers_set(struct dpif *, uint32_t n_handlers); +int dpif_poll_threads_set(struct dpif *, unsigned int n_rxqs, + const char *cmask); int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *, struct ofpbuf *); void dpif_recv_purge(struct dpif *); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 6a590987c70..6cc97898a46 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -532,6 +532,8 @@ type_run(const char *type) udpif_set_threads(backer->udpif, n_handlers, n_revalidators); } + dpif_poll_threads_set(backer->dpif, n_dpdk_rxqs, pmd_cpu_mask); + if (backer->need_revalidate) { struct ofproto_dpif *ofproto; struct simap_node *node; diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h index de354ec4274..158f86eaf48 100644 --- a/ofproto/ofproto-provider.h +++ b/ofproto/ofproto-provider.h @@ -451,6 +451,12 @@ extern unsigned ofproto_max_idle; * ofproto-dpif implementation. */ extern size_t n_handlers, n_revalidators; +/* Number of rx queues to be created for each dpdk interface. */ +extern size_t n_dpdk_rxqs; + +/* Cpu mask for pmd threads. */ +extern char *pmd_cpu_mask; + static inline struct rule *rule_from_cls_rule(const struct cls_rule *); void ofproto_rule_expire(struct rule *rule, uint8_t reason) diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c index 754d290c10d..5233a4ddd1b 100644 --- a/ofproto/ofproto.c +++ b/ofproto/ofproto.c @@ -304,6 +304,8 @@ unsigned ofproto_flow_limit = OFPROTO_FLOW_LIMIT_DEFAULT; unsigned ofproto_max_idle = OFPROTO_MAX_IDLE_DEFAULT; size_t n_handlers, n_revalidators; +size_t n_dpdk_rxqs; +char *pmd_cpu_mask; /* Map from datapath name to struct ofproto, for use by unixctl commands. */ static struct hmap all_ofprotos = HMAP_INITIALIZER(&all_ofprotos); @@ -730,6 +732,20 @@ ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, bool flood) : EOPNOTSUPP); } +void +ofproto_set_n_dpdk_rxqs(int n_rxqs) +{ + n_dpdk_rxqs = MAX(n_rxqs, 0); +} + +void +ofproto_set_cpu_mask(const char *cmask) +{ + free(pmd_cpu_mask); + + pmd_cpu_mask = cmask ? xstrdup(cmask) : NULL; +} + void ofproto_set_threads(int n_handlers_, int n_revalidators_) { diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h index d60b1982c1d..40bb3b7d23c 100644 --- a/ofproto/ofproto.h +++ b/ofproto/ofproto.h @@ -299,6 +299,8 @@ int ofproto_set_mcast_snooping(struct ofproto *ofproto, int ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, bool flood); void ofproto_set_threads(int n_handlers, int n_revalidators); +void ofproto_set_n_dpdk_rxqs(int n_rxqs); +void ofproto_set_cpu_mask(const char *cmask); void ofproto_set_dp_desc(struct ofproto *, const char *dp_desc); int ofproto_set_snoops(struct ofproto *, const struct sset *snoops); int ofproto_set_netflow(struct ofproto *, diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c index 1060719ab13..3dd7f8ad432 100644 --- a/vswitchd/bridge.c +++ b/vswitchd/bridge.c @@ -537,6 +537,9 @@ bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg) OFPROTO_FLOW_LIMIT_DEFAULT)); ofproto_set_max_idle(smap_get_int(&ovs_cfg->other_config, "max-idle", OFPROTO_MAX_IDLE_DEFAULT)); + ofproto_set_n_dpdk_rxqs(smap_get_int(&ovs_cfg->other_config, + "n-dpdk-rxqs", 0)); + ofproto_set_cpu_mask(smap_get(&ovs_cfg->other_config, "pmd-cpu-mask")); ofproto_set_threads( smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0), diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index d07d54fb736..b00f74d6643 100644 --- a/vswitchd/vswitch.xml +++ b/vswitchd/vswitch.xml @@ -152,6 +152,33 @@

+ +

+ Specifies the number of rx queues to be created for each dpdk + interface. If not specified or specified to 0, one rx queue will + be created for each dpdk interface by default. +

+
+ + +

+ Specifies CPU mask for setting the cpu affinity of PMD (Poll + Mode Driver) threads. Value should be in the form of hex string, + similar to the dpdk EAL '-c COREMASK' option input or the 'taskset' + mask input. +

+

+ The lowest order bit corresponds to the first CPU core. A set bit + means the corresponding core is available. If the input does not + cover all cores, those uncovered cores are considered not set. +

+

+ If not specified, one pmd thread will be created for each numa node + and pinned to any available core on the numa node by default. +

+
+