Skip to content

Commit

Permalink
dpif-netdev: Use hmap for poll_list in pmd threads.
Browse files Browse the repository at this point in the history
A future commit will use this to determine if a queue is already
contained in a pmd thread.

To keep the behavior unaltered we now have to sort queues before
printing them in pmd_info_show_rxq().

Also this commit introduces 'struct polled_queue' that will be used
exclusively in the fast path, uses 'struct dp_netdev_rxq' from 'struct
rxq_poll' and uses 'rx' for 'netdev_rxq' and 'rxq' for 'dp_netdev_rxq'.

Signed-off-by: Daniele Di Proietto <[email protected]>
Acked-by: Ilya Maximets <[email protected]>
  • Loading branch information
ddiproietto committed Jan 16, 2017
1 parent 90f9f83 commit 947dc56
Showing 1 changed file with 112 additions and 56 deletions.
168 changes: 112 additions & 56 deletions lib/dpif-netdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,12 @@ enum pmd_cycles_counter_type {

/* Contained by struct dp_netdev_port's 'rxqs' member. */
struct dp_netdev_rxq {
struct netdev_rxq *rxq;
unsigned core_id; /* Сore to which this queue is pinned. */
struct dp_netdev_port *port;
struct netdev_rxq *rx;
unsigned core_id; /* Core to which this queue should be
pinned. OVS_CORE_UNSPEC if the
queue doesn't need to be pinned to a
particular core. */
};

/* A port in a netdev-based datapath. */
Expand Down Expand Up @@ -415,11 +419,15 @@ struct dp_netdev_pmd_cycles {
atomic_ullong n[PMD_N_CYCLES];
};

struct polled_queue {
struct netdev_rxq *rx;
odp_port_t port_no;
};

/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
struct rxq_poll {
struct dp_netdev_port *port;
struct netdev_rxq *rx;
struct ovs_list node;
struct dp_netdev_rxq *rxq;
struct hmap_node node;
};

/* Contained by struct dp_netdev_pmd_thread's 'send_port_cache',
Expand Down Expand Up @@ -500,9 +508,7 @@ struct dp_netdev_pmd_thread {

struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
struct ovs_list poll_list OVS_GUARDED;
/* Number of elements in 'poll_list' */
int poll_cnt;
struct hmap poll_list OVS_GUARDED;
/* Map of 'tx_port's used for transmission. Written by the main thread,
* read by the pmd thread. */
struct hmap tx_ports OVS_GUARDED;
Expand Down Expand Up @@ -586,8 +592,8 @@ static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port);
static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port,
struct netdev_rxq *rx);
struct dp_netdev_rxq *rxq)
OVS_REQUIRES(pmd->port_mutex);
static struct dp_netdev_pmd_thread *
dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
Expand Down Expand Up @@ -783,34 +789,80 @@ pmd_info_clear_stats(struct ds *reply OVS_UNUSED,
}
}

static int
compare_poll_list(const void *a_, const void *b_)
{
const struct rxq_poll *a = a_;
const struct rxq_poll *b = b_;

const char *namea = netdev_rxq_get_name(a->rxq->rx);
const char *nameb = netdev_rxq_get_name(b->rxq->rx);

int cmp = strcmp(namea, nameb);
if (!cmp) {
return netdev_rxq_get_queue_id(a->rxq->rx)
- netdev_rxq_get_queue_id(b->rxq->rx);
} else {
return cmp;
}
}

static void
sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
size_t *n)
{
struct rxq_poll *ret, *poll;
size_t i;

*n = hmap_count(&pmd->poll_list);
if (!*n) {
ret = NULL;
} else {
ret = xcalloc(*n, sizeof *ret);
i = 0;
HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
ret[i] = *poll;
i++;
}
ovs_assert(i == *n);
}

qsort(ret, *n, sizeof *ret, compare_poll_list);

*list = ret;
}

static void
pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
{
if (pmd->core_id != NON_PMD_CORE_ID) {
struct rxq_poll *poll;
const char *prev_name = NULL;
struct rxq_poll *list;
size_t i, n;

ds_put_format(reply,
"pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",
pmd->numa_id, pmd->core_id, (pmd->isolated)
? "true" : "false");

ovs_mutex_lock(&pmd->port_mutex);
LIST_FOR_EACH (poll, node, &pmd->poll_list) {
const char *name = netdev_get_name(poll->port->netdev);
sorted_poll_list(pmd, &list, &n);
for (i = 0; i < n; i++) {
const char *name = netdev_rxq_get_name(list[i].rxq->rx);

if (!prev_name || strcmp(name, prev_name)) {
if (prev_name) {
ds_put_cstr(reply, "\n");
}
ds_put_format(reply, "\tport: %s\tqueue-id:",
netdev_get_name(poll->port->netdev));
ds_put_format(reply, "\tport: %s\tqueue-id:", name);
}
ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
ds_put_format(reply, " %d",
netdev_rxq_get_queue_id(list[i].rxq->rx));
prev_name = name;
}
ovs_mutex_unlock(&pmd->port_mutex);
ds_put_cstr(reply, "\n");
free(list);
}
}

Expand Down Expand Up @@ -1295,7 +1347,8 @@ port_create(const char *devname, const char *type,
port->dynamic_txqs = dynamic_txqs;

for (i = 0; i < port->n_rxq; i++) {
error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
port->rxqs[i].port = port;
error = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
if (error) {
VLOG_ERR("%s: cannot receive packets on this network device (%s)",
devname, ovs_strerror(errno));
Expand All @@ -1317,7 +1370,7 @@ port_create(const char *devname, const char *type,

out_rxq_close:
for (i = 0; i < n_open_rxqs; i++) {
netdev_rxq_close(port->rxqs[i].rxq);
netdev_rxq_close(port->rxqs[i].rx);
}
ovs_mutex_destroy(&port->txq_used_mutex);
free(port->type);
Expand Down Expand Up @@ -1455,7 +1508,7 @@ port_destroy(struct dp_netdev_port *port)
netdev_restore_flags(port->sf);

for (unsigned i = 0; i < port->n_rxq; i++) {
netdev_rxq_close(port->rxqs[i].rxq);
netdev_rxq_close(port->rxqs[i].rx);
}
ovs_mutex_destroy(&port->txq_used_mutex);
free(port->rxq_affinity_list);
Expand Down Expand Up @@ -2916,27 +2969,27 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,

static void
dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port,
struct netdev_rxq *rxq)
struct netdev_rxq *rx,
odp_port_t port_no)
{
struct dp_packet_batch batch;
int error;

dp_packet_batch_init(&batch);
cycles_count_start(pmd);
error = netdev_rxq_recv(rxq, &batch);
error = netdev_rxq_recv(rx, &batch);
cycles_count_end(pmd, PMD_CYCLES_POLLING);
if (!error) {
*recirc_depth_get() = 0;

cycles_count_start(pmd);
dp_netdev_input(pmd, &batch, port->port_no);
dp_netdev_input(pmd, &batch, port_no);
cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
} else if (error != EAGAIN && error != EOPNOTSUPP) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);

VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
netdev_get_name(port->netdev), ovs_strerror(error));
netdev_rxq_get_name(rx), ovs_strerror(error));
}
}

Expand All @@ -2952,8 +3005,8 @@ port_reconfigure(struct dp_netdev_port *port)

/* Closes the existing 'rxq's. */
for (i = 0; i < port->n_rxq; i++) {
netdev_rxq_close(port->rxqs[i].rxq);
port->rxqs[i].rxq = NULL;
netdev_rxq_close(port->rxqs[i].rx);
port->rxqs[i].rx = NULL;
}
port->n_rxq = 0;

Expand All @@ -2972,7 +3025,8 @@ port_reconfigure(struct dp_netdev_port *port)
port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);

for (i = 0; i < netdev_n_rxq(netdev); i++) {
err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
port->rxqs[i].port = port;
err = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
if (err) {
return err;
}
Expand Down Expand Up @@ -3057,8 +3111,8 @@ dpif_netdev_run(struct dpif *dpif)
int i;

for (i = 0; i < port->n_rxq; i++) {
dp_netdev_process_rxq_port(non_pmd, port,
port->rxqs[i].rxq);
dp_netdev_process_rxq_port(non_pmd, port->rxqs[i].rx,
port->port_no);
}
}
}
Expand Down Expand Up @@ -3098,7 +3152,7 @@ dpif_netdev_wait(struct dpif *dpif)
int i;

for (i = 0; i < port->n_rxq; i++) {
netdev_rxq_wait(port->rxqs[i].rxq);
netdev_rxq_wait(port->rxqs[i].rx);
}
}
}
Expand Down Expand Up @@ -3152,18 +3206,21 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)

static int
pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
struct rxq_poll **ppoll_list)
struct polled_queue **ppoll_list)
{
struct rxq_poll *poll_list = *ppoll_list;
struct polled_queue *poll_list = *ppoll_list;
struct rxq_poll *poll;
int i;

ovs_mutex_lock(&pmd->port_mutex);
poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
* sizeof *poll_list);

i = 0;
LIST_FOR_EACH (poll, node, &pmd->poll_list) {
poll_list[i++] = *poll;
HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
poll_list[i].rx = poll->rxq->rx;
poll_list[i].port_no = poll->rxq->port->port_no;
i++;
}

pmd_load_cached_ports(pmd);
Expand All @@ -3179,7 +3236,7 @@ pmd_thread_main(void *f_)
{
struct dp_netdev_pmd_thread *pmd = f_;
unsigned int lc = 0;
struct rxq_poll *poll_list;
struct polled_queue *poll_list;
bool exiting;
int poll_cnt;
int i;
Expand All @@ -3197,7 +3254,7 @@ pmd_thread_main(void *f_)
/* List port/core affinity */
for (i = 0; i < poll_cnt; i++) {
VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
pmd->core_id, netdev_get_name(poll_list[i].port->netdev),
pmd->core_id, netdev_rxq_get_name(poll_list[i].rx),
netdev_rxq_get_queue_id(poll_list[i].rx));
}

Expand All @@ -3211,7 +3268,8 @@ pmd_thread_main(void *f_)

for (;;) {
for (i = 0; i < poll_cnt; i++) {
dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
poll_list[i].port_no);
}

if (lc++ > 1024) {
Expand Down Expand Up @@ -3370,7 +3428,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd->dp = dp;
pmd->core_id = core_id;
pmd->numa_id = numa_id;
pmd->poll_cnt = 0;

*CONST_CAST(int *, &pmd->static_tx_qid) = (core_id == NON_PMD_CORE_ID)
? ovs_numa_get_n_cores()
Expand All @@ -3388,7 +3445,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
cmap_init(&pmd->flow_table);
cmap_init(&pmd->classifiers);
pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
ovs_list_init(&pmd->poll_list);
hmap_init(&pmd->poll_list);
hmap_init(&pmd->tx_ports);
hmap_init(&pmd->tnl_port_cache);
hmap_init(&pmd->send_port_cache);
Expand All @@ -3410,6 +3467,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
hmap_destroy(&pmd->send_port_cache);
hmap_destroy(&pmd->tnl_port_cache);
hmap_destroy(&pmd->tx_ports);
hmap_destroy(&pmd->poll_list);
/* All flows (including their dpcls_rules) have been deleted already */
CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
dpcls_destroy(cls);
Expand Down Expand Up @@ -3520,10 +3578,9 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
struct tx_port *port;

ovs_mutex_lock(&pmd->port_mutex);
LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
HMAP_FOR_EACH_POP (poll, node, &pmd->poll_list) {
free(poll);
}
pmd->poll_cnt = 0;
HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) {
free(port);
}
Expand Down Expand Up @@ -3556,11 +3613,10 @@ dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
bool found = false;

ovs_mutex_lock(&pmd->port_mutex);
LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
if (poll->port == port) {
HMAP_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
if (poll->rxq->port == port) {
found = true;
ovs_list_remove(&poll->node);
pmd->poll_cnt--;
hmap_remove(&pmd->poll_list, &poll->node);
free(poll);
}
}
Expand Down Expand Up @@ -3629,8 +3685,8 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)

CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
if (!pmd->isolated && pmd->numa_id == numa_id
&& (min_cnt > pmd->poll_cnt || res == NULL)) {
min_cnt = pmd->poll_cnt;
&& (min_cnt > hmap_count(&pmd->poll_list) || res == NULL)) {
min_cnt = hmap_count(&pmd->poll_list);
res = pmd;
}
}
Expand All @@ -3641,16 +3697,16 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
/* Adds rx queue to poll_list of PMD thread. */
static void
dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port, struct netdev_rxq *rx)
struct dp_netdev_rxq *rxq)
OVS_REQUIRES(pmd->port_mutex)
{
struct rxq_poll *poll = xmalloc(sizeof *poll);

poll->port = port;
poll->rx = rx;
int qid = netdev_rxq_get_queue_id(rxq->rx);
uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
struct rxq_poll *poll;

ovs_list_push_back(&pmd->poll_list, &poll->node);
pmd->poll_cnt++;
poll = xmalloc(sizeof *poll);
poll->rxq = rxq;
hmap_insert(&pmd->poll_list, &poll->node, hash);
}

/* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
Expand Down Expand Up @@ -3715,7 +3771,7 @@ dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
}

ovs_mutex_lock(&pmd->port_mutex);
dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
dp_netdev_add_rxq_to_pmd(pmd, &port->rxqs[i]);
ovs_mutex_unlock(&pmd->port_mutex);

hmapx_add(to_reload, pmd);
Expand Down

0 comments on commit 947dc56

Please sign in to comment.