Skip to content

Commit

Permalink
rio_cm: use worker thread instead of receive tasklet
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandre Bounine <[email protected]>
  • Loading branch information
Alexandre Bounine committed Sep 1, 2016
1 parent 6393605 commit 9c5099e
Showing 1 changed file with 44 additions and 102 deletions.
146 changes: 44 additions & 102 deletions rio_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ struct cm_dev {
struct rio_mport *mport;
void *rx_buf[RIOCM_RX_RING_SIZE];
int rx_slots;
spinlock_t rx_lock;
struct mutex rx_lock;

void *tx_buf[RIOCM_TX_RING_SIZE];
int tx_slot;
Expand All @@ -166,8 +166,8 @@ struct cm_dev {

struct list_head peers;
int npeers;
struct tasklet_struct rx_tasklet;
struct work_struct rx_refill_work;
struct workqueue_struct *rx_wq;
struct work_struct rx_work;
};

struct chan_rx_ring {
Expand Down Expand Up @@ -240,7 +240,6 @@ static DEFINE_IDR(ch_idr);

static LIST_HEAD(cm_dev_list);
static DECLARE_RWSEM(rdev_sem);
static struct workqueue_struct *riocm_wq;

static struct class *dev_class;
static unsigned int dev_major;
Expand Down Expand Up @@ -313,15 +312,13 @@ static void *riocm_rx_get_msg(struct cm_dev *cm)

msg = rio_get_inb_message(cm->mport, cmbox);
if (msg) {
spin_lock(&cm->rx_lock);
for (i = 0; i < RIOCM_RX_RING_SIZE; i++) {
if (cm->rx_buf[i] == msg) {
cm->rx_buf[i] = NULL;
cm->rx_slots++;
break;
}
}
spin_unlock(&cm->rx_lock);

if (i == RIOCM_RX_RING_SIZE)
riocm_warn("no record for buffer 0x%p", msg);
Expand All @@ -337,40 +334,23 @@ static void *riocm_rx_get_msg(struct cm_dev *cm)
*
* Returns: none
*/
static int riocm_rx_fill(struct cm_dev *cm, int nent)
static void riocm_rx_fill(struct cm_dev *cm, int nent)
{
int i = 0;
void *buf;

while (nent) {
buf = kmalloc(RIO_MAX_MSG_SIZE, GFP_KERNEL);
if (!buf)
break;

spin_lock_bh(&cm->rx_lock);
int i;

if (!cm->rx_slots)
i = RIOCM_RX_RING_SIZE;
if (cm->rx_slots == 0)
return;

for (; i < RIOCM_RX_RING_SIZE; i++) {
if (cm->rx_buf[i] == NULL) {
cm->rx_buf[i] = buf;
rio_add_inb_buffer(cm->mport, cmbox, buf);
cm->rx_slots--;
nent--;
for (i = 0; i < RIOCM_RX_RING_SIZE && cm->rx_slots && nent; i++) {
if (cm->rx_buf[i] == NULL) {
cm->rx_buf[i] = kmalloc(RIO_MAX_MSG_SIZE, GFP_KERNEL);
if (cm->rx_buf[i] == NULL)
break;
}
}

spin_unlock_bh(&cm->rx_lock);

if (i == RIOCM_RX_RING_SIZE) {
kfree(buf);
break;
rio_add_inb_buffer(cm->mport, cmbox, cm->rx_buf[i]);
cm->rx_slots--;
nent--;
}
}

return cm->rx_slots;
}

/*
Expand Down Expand Up @@ -507,40 +487,36 @@ static int riocm_close_handler(void *data)
}

/*
* rio_cm_handler - worker thread that services request (non-data) packets
* rio_cm_handler - function that services request (non-data) packets
*/
static void rio_cm_handler(struct work_struct *_work)
static void rio_cm_handler(struct cm_dev *cm, void *data)
{
struct rio_cm_work *work;
struct rio_ch_chan_hdr *hdr;

work = container_of(_work, struct rio_cm_work, work);

if (!rio_mport_is_running(work->cm->mport))
if (!rio_mport_is_running(cm->mport))
goto out;

hdr = work->data;
hdr = data;

riocm_debug(RX_CMD, "OP=%x for ch=%d from %d",
hdr->ch_op, ntohs(hdr->dst_ch), ntohs(hdr->src_ch));

switch (hdr->ch_op) {
case CM_CONN_REQ:
riocm_req_handler(work->cm, work->data);
riocm_req_handler(cm, data);
break;
case CM_CONN_ACK:
riocm_resp_handler(work->data);
riocm_resp_handler(data);
break;
case CM_CONN_CLOSE:
riocm_close_handler(work->data);
riocm_close_handler(data);
break;
default:
riocm_error("Invalid packet header");
break;
}
out:
kfree(work->data);
kfree(work);
kfree(data);
}

/*
Expand Down Expand Up @@ -604,34 +580,24 @@ static int rio_rx_data_handler(struct cm_dev *cm, void *buf)
return 0;
}

static void rio_rx_refill_work(struct work_struct *work)
{
struct cm_dev *cm = container_of(work, struct cm_dev, rx_refill_work);
int slots;

slots = riocm_rx_fill(cm, RIOCM_RX_RING_SIZE/4);

if (slots)
queue_work(riocm_wq, &cm->rx_refill_work);
}

/*
* rio_ibmsg_handler - inbound message packet handler (tasklet)
* rio_ibmsg_handler - inbound message packet handler
*/
static void rio_ibmsg_handler(unsigned long context)
static void rio_ibmsg_handler(struct work_struct *work)
{
struct cm_dev *cm = (struct cm_dev *)context;
struct cm_dev *cm = container_of(work, struct cm_dev, rx_work);
void *data;
struct rio_ch_chan_hdr *hdr;
int i;

if (!rio_mport_is_running(cm->mport))
return;

for (i = 0; i < 8; i++) {
while (1) {
mutex_lock(&cm->rx_lock);
data = riocm_rx_get_msg(cm);
// if (data)
// riocm_rx_fill(cm, 1);
if (data)
riocm_rx_fill(cm, 1);
mutex_unlock(&cm->rx_lock);

if (data == NULL)
break;
Expand All @@ -647,40 +613,20 @@ static void rio_ibmsg_handler(unsigned long context)
}

/* Process a channel message */
if (hdr->ch_op == CM_DATA_MSG) {
if (hdr->ch_op == CM_DATA_MSG)
rio_rx_data_handler(cm, data);
} else {
struct rio_cm_work *work;

work = kmalloc(sizeof(*work), GFP_ATOMIC);
if (!work) {
/* Discard a packet if we cannot process it */
riocm_error("Failed to alloc memory for work");
kfree(data);
continue;
}

INIT_WORK(&work->work, rio_cm_handler);
work->data = data;
work->cm = cm;
queue_work(riocm_wq, &work->work);
}
else
rio_cm_handler(cm, data);
}

if (i)
queue_work(riocm_wq, &cm->rx_refill_work);

if (i == 8)
tasklet_schedule(&cm->rx_tasklet);
}

static void riocm_inb_msg_event(struct rio_mport *mport, void *dev_id,
int mbox, int slot)
{
struct cm_dev *cm = dev_id;

if (rio_mport_is_running(cm->mport))
tasklet_schedule(&cm->rx_tasklet);
if (rio_mport_is_running(cm->mport) && !work_pending(&cm->rx_work))
queue_work(cm->rx_wq, &cm->rx_work);
}

/*
Expand Down Expand Up @@ -2262,16 +2208,16 @@ static int riocm_add_mport(struct device *dev,
cm->rx_buf[i] = NULL;

cm->rx_slots = RIOCM_RX_RING_SIZE;
spin_lock_init(&cm->rx_lock);
mutex_init(&cm->rx_lock);
riocm_rx_fill(cm, RIOCM_RX_RING_SIZE);
INIT_WORK(&cm->rx_refill_work, rio_rx_refill_work);
cm->rx_wq = create_workqueue(DRV_NAME "/rxq");
INIT_WORK(&cm->rx_work, rio_ibmsg_handler);

cm->tx_slot = 0;
cm->tx_cnt = 0;
cm->tx_ack_slot = 0;
spin_lock_init(&cm->tx_lock);

tasklet_init(&cm->rx_tasklet, rio_ibmsg_handler, (unsigned long)cm);
INIT_LIST_HEAD(&cm->peers);
cm->npeers = 0;
INIT_LIST_HEAD(&cm->tx_reqs);
Expand Down Expand Up @@ -2318,8 +2264,8 @@ static void riocm_remove_mport(struct device *dev,
if (!found)
return;

tasklet_kill(&cm->rx_tasklet);
flush_workqueue(riocm_wq);
flush_workqueue(cm->rx_wq);
destroy_workqueue(cm->rx_wq);

/* Release channels bound to this mport */
spin_lock_bh(&idr_lock);
Expand Down Expand Up @@ -2420,18 +2366,14 @@ static int __init riocm_init(void)
dev_minor_base = MINOR(dev_number);
riocm_debug(INIT, "Registered class with %d major", dev_major);

riocm_wq = create_singlethread_workqueue("riocm_wq");
if (!riocm_wq)
return -ENOMEM;

/*
* Register as rapidio_port class interface to get notifications about
* mport additions and removals.
*/
ret = class_interface_register(&rio_mport_interface);
if (ret) {
riocm_error("class_interface_register error: %d", ret);
goto err_wq;
goto err_reg;
}

/*
Expand Down Expand Up @@ -2462,8 +2404,9 @@ static int __init riocm_init(void)
subsys_interface_unregister(&riocm_interface);
err_cl:
class_interface_unregister(&rio_mport_interface);
err_wq:
destroy_workqueue(riocm_wq);
err_reg:
unregister_chrdev_region(dev_number, 1);
class_destroy(dev_class);
return ret;
}

Expand All @@ -2473,7 +2416,6 @@ static void __exit riocm_exit(void)
unregister_reboot_notifier(&rio_cm_notifier);
subsys_interface_unregister(&riocm_interface);
class_interface_unregister(&rio_mport_interface);
destroy_workqueue(riocm_wq);
idr_destroy(&ch_idr);

device_unregister(riocm_cdev.dev);
Expand Down

0 comments on commit 9c5099e

Please sign in to comment.