Skip to content

Commit

Permalink
ipc/msg: batch queue sender wakeups
Browse files Browse the repository at this point in the history
Currently the use of wake_qs in sysv msg queues are only for the receiver
tasks that are blocked on the queue.  But blocked sender tasks (due to
queue size constraints) still are awoken with the ipc object lock held,
which can be a problem particularly for small sized queues and far from
gracious for -rt (just like it was for the receiver side).

The paths that actually wakeup a sender are obviously related to when we
are either getting rid of the queue or after (some) space is freed-up
after a receiver takes the msg (msgrcv).  Furthermore, with the exception
of msgrcv, we can always piggy-back on expunge_all that has its own tasks
lined-up for waking.  Finally, upon unlinking the message, it should be no
problem delaying the wakeups a bit until after we've released the lock.

Link: http://lkml.kernel.org/r/[email protected]
Signed-off-by: Davidlohr Bueso <[email protected]>
Acked-by: Peter Zijlstra (Intel) <[email protected]>
Cc: Manfred Spraul <[email protected]>
Cc: Sebastian Andrzej Siewior <[email protected]>
Signed-off-by: Andrew Morton <[email protected]>
Signed-off-by: Linus Torvalds <[email protected]>
  • Loading branch information
Davidlohr Bueso authored and torvalds committed Oct 11, 2016
1 parent ee51636 commit e365853
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions ipc/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,15 @@ static inline void ss_del(struct msg_sender *mss)
list_del(&mss->list);
}

static void ss_wakeup(struct list_head *h, int kill)
static void ss_wakeup(struct list_head *h,
struct wake_q_head *wake_q, int kill)
{
struct msg_sender *mss, *t;

list_for_each_entry_safe(mss, t, h, list) {
if (kill)
mss->list.next = NULL;
wake_up_process(mss->tsk);
wake_q_add(wake_q, mss->tsk);
}
}

Expand Down Expand Up @@ -203,7 +204,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
WAKE_Q(wake_q);

expunge_all(msq, -EIDRM, &wake_q);
ss_wakeup(&msq->q_senders, 1);
ss_wakeup(&msq->q_senders, &wake_q, 1);
msg_rmid(ns, msq);
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
Expand Down Expand Up @@ -331,7 +332,6 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
struct kern_ipc_perm *ipcp;
struct msqid64_ds uninitialized_var(msqid64);
struct msg_queue *msq;
WAKE_Q(wake_q);
int err;

if (cmd == IPC_SET) {
Expand Down Expand Up @@ -362,6 +362,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
freeque(ns, ipcp);
goto out_up;
case IPC_SET:
{
WAKE_Q(wake_q);

if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
!capable(CAP_SYS_RESOURCE)) {
err = -EPERM;
Expand All @@ -376,23 +379,28 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
msq->q_qbytes = msqid64.msg_qbytes;

msq->q_ctime = get_seconds();
/* sleeping receivers might be excluded by
/*
* Sleeping receivers might be excluded by
* stricter permissions.
*/
expunge_all(msq, -EAGAIN, &wake_q);
/* sleeping senders might be able to send
/*
* Sleeping senders might be able to send
* due to a larger queue size.
*/
ss_wakeup(&msq->q_senders, 0);
break;
ss_wakeup(&msq->q_senders, &wake_q, 0);
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);

goto out_unlock1;
}
default:
err = -EINVAL;
goto out_unlock1;
}

out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
out_up:
Expand Down Expand Up @@ -809,6 +817,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
struct msg_queue *msq;
struct ipc_namespace *ns;
struct msg_msg *msg, *copy = NULL;
WAKE_Q(wake_q);

ns = current->nsproxy->ipc_ns;

Expand Down Expand Up @@ -873,7 +882,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
msq->q_cbytes -= msg->m_ts;
atomic_sub(msg->m_ts, &ns->msg_bytes);
atomic_dec(&ns->msg_hdrs);
ss_wakeup(&msq->q_senders, 0);
ss_wakeup(&msq->q_senders, &wake_q, 0);

goto out_unlock0;
}
Expand Down Expand Up @@ -945,6 +954,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl

out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (IS_ERR(msg)) {
Expand Down

0 comments on commit e365853

Please sign in to comment.