Skip to content

Commit

Permalink
ipc/sem: separate wait-for-zero and alter tasks into seperate queues
Browse files Browse the repository at this point in the history
Introduce separate queues for operations that do not modify the
semaphore values.  Advantages:

 - Simpler logic in check_restart().
 - Faster update_queue(): Right now, all wait-for-zero operations are
   always tested, even if the semaphore value is not 0.
 - wait-for-zero gets again priority, as in linux <=3.0.9

Signed-off-by: Manfred Spraul <[email protected]>
Cc: Rik van Riel <[email protected]>
Cc: Davidlohr Bueso <[email protected]>
Signed-off-by: Andrew Morton <[email protected]>
Signed-off-by: Linus Torvalds <[email protected]>
  • Loading branch information
manfred-colorfu authored and torvalds committed Jul 9, 2013
1 parent f5c936c commit 1a82e9e
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 61 deletions.
5 changes: 4 additions & 1 deletion include/linux/sem.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ struct sem_array {
time_t sem_otime; /* last semop time */
time_t sem_ctime; /* last change time */
struct sem *sem_base; /* ptr to first semaphore in array */
struct list_head sem_pending; /* pending operations to be processed */
struct list_head pending_alter; /* pending operations */
/* that alter the array */
struct list_head pending_const; /* pending complex operations */
/* that do not alter semvals */
struct list_head list_id; /* undo requests on this array */
int sem_nsems; /* no. of semaphores in array */
int complex_count; /* pending complex operations */
Expand Down
211 changes: 151 additions & 60 deletions ipc/sem.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ struct sem {
int semval; /* current value */
int sempid; /* pid of last operation */
spinlock_t lock; /* spinlock for fine-grained semtimedop */
struct list_head sem_pending; /* pending single-sop operations */
struct list_head pending_alter; /* pending single-sop operations */
/* that alter the semaphore */
struct list_head pending_const; /* pending single-sop operations */
/* that do not alter the semaphore*/
} ____cacheline_aligned_in_smp;

/* One queue for each sleeping process in the system. */
Expand Down Expand Up @@ -152,7 +155,7 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it);
/*
* linked list protection:
* sem_undo.id_next,
* sem_array.sem_pending{,last},
* sem_array.pending{_alter,_cont},
* sem_array.sem_undo: sem_lock() for read/write
* sem_undo.proc_next: only "current" is allowed to read/write that field.
*
Expand Down Expand Up @@ -337,7 +340,7 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
* Without the check/retry algorithm a lockless wakeup is possible:
* - queue.status is initialized to -EINTR before blocking.
* - wakeup is performed by
* * unlinking the queue entry from sma->sem_pending
* * unlinking the queue entry from the pending list
* * setting queue.status to IN_WAKEUP
* This is the notification for the blocked thread that a
* result value is imminent.
Expand Down Expand Up @@ -418,12 +421,14 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
sma->sem_base = (struct sem *) &sma[1];

for (i = 0; i < nsems; i++) {
INIT_LIST_HEAD(&sma->sem_base[i].sem_pending);
INIT_LIST_HEAD(&sma->sem_base[i].pending_alter);
INIT_LIST_HEAD(&sma->sem_base[i].pending_const);
spin_lock_init(&sma->sem_base[i].lock);
}

sma->complex_count = 0;
INIT_LIST_HEAD(&sma->sem_pending);
INIT_LIST_HEAD(&sma->pending_alter);
INIT_LIST_HEAD(&sma->pending_const);
INIT_LIST_HEAD(&sma->list_id);
sma->sem_nsems = nsems;
sma->sem_ctime = get_seconds();
Expand Down Expand Up @@ -609,60 +614,132 @@ static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
* update_queue is O(N^2) when it restarts scanning the whole queue of
* waiting operations. Therefore this function checks if the restart is
* really necessary. It is called after a previously waiting operation
* was completed.
* modified the array.
* Note that wait-for-zero operations are handled without restart.
*/
static int check_restart(struct sem_array *sma, struct sem_queue *q)
{
struct sem *curr;
struct sem_queue *h;

/* if the operation didn't modify the array, then no restart */
if (q->alter == 0)
return 0;

/* pending complex operations are too difficult to analyse */
if (sma->complex_count)
/* pending complex alter operations are too difficult to analyse */
if (!list_empty(&sma->pending_alter))
return 1;

/* we were a sleeping complex operation. Too difficult */
if (q->nsops > 1)
return 1;

curr = sma->sem_base + q->sops[0].sem_num;
/* It is impossible that someone waits for the new value:
* - complex operations always restart.
* - wait-for-zero are handled seperately.
* - q is a previously sleeping simple operation that
* altered the array. It must be a decrement, because
* simple increments never sleep.
* - If there are older (higher priority) decrements
* in the queue, then they have observed the original
* semval value and couldn't proceed. The operation
* decremented to value - thus they won't proceed either.
*/
return 0;
}

/* No-one waits on this queue */
if (list_empty(&curr->sem_pending))
return 0;
/**
* wake_const_ops(sma, semnum, pt) - Wake up non-alter tasks
* @sma: semaphore array.
* @semnum: semaphore that was modified.
* @pt: list head for the tasks that must be woken up.
*
* wake_const_ops must be called after a semaphore in a semaphore array
* was set to 0. If complex const operations are pending, wake_const_ops must
* be called with semnum = -1, as well as with the number of each modified
* semaphore.
* The tasks that must be woken up are added to @pt. The return code
* is stored in q->pid.
* The function returns 1 if at least one operation was completed successfully.
*/
static int wake_const_ops(struct sem_array *sma, int semnum,
struct list_head *pt)
{
struct sem_queue *q;
struct list_head *walk;
struct list_head *pending_list;
int semop_completed = 0;

if (semnum == -1)
pending_list = &sma->pending_const;
else
pending_list = &sma->sem_base[semnum].pending_const;

/* the new semaphore value */
if (curr->semval) {
/* It is impossible that someone waits for the new value:
* - q is a previously sleeping simple operation that
* altered the array. It must be a decrement, because
* simple increments never sleep.
* - The value is not 0, thus wait-for-zero won't proceed.
* - If there are older (higher priority) decrements
* in the queue, then they have observed the original
* semval value and couldn't proceed. The operation
* decremented to value - thus they won't proceed either.
walk = pending_list->next;
while (walk != pending_list) {
int error;

q = container_of(walk, struct sem_queue, list);
walk = walk->next;

error = try_atomic_semop(sma, q->sops, q->nsops,
q->undo, q->pid);

if (error <= 0) {
/* operation completed, remove from queue & wakeup */

unlink_queue(sma, q);

wake_up_sem_queue_prepare(pt, q, error);
if (error == 0)
semop_completed = 1;
}
}
return semop_completed;
}

/**
* do_smart_wakeup_zero(sma, sops, nsops, pt) - wakeup all wait for zero tasks
* @sma: semaphore array
* @sops: operations that were performed
* @nsops: number of operations
* @pt: list head of the tasks that must be woken up.
*
* do_smart_wakeup_zero() checks all required queue for wait-for-zero
* operations, based on the actual changes that were performed on the
* semaphore array.
* The function returns 1 if at least one operation was completed successfully.
*/
static int do_smart_wakeup_zero(struct sem_array *sma, struct sembuf *sops,
int nsops, struct list_head *pt)
{
int i;
int semop_completed = 0;
int got_zero = 0;

/* first: the per-semaphore queues, if known */
if (sops) {
for (i = 0; i < nsops; i++) {
int num = sops[i].sem_num;

if (sma->sem_base[num].semval == 0) {
got_zero = 1;
semop_completed |= wake_const_ops(sma, num, pt);
}
}
} else {
/*
* No sops means modified semaphores not known.
* Assume all were changed.
*/
BUG_ON(q->sops[0].sem_op >= 0);
return 0;
for (i = 0; i < sma->sem_nsems; i++) {
if (sma->sem_base[i].semval == 0) {
got_zero = 1;
semop_completed |= wake_const_ops(sma, i, pt);
}
}
}
/*
* semval is 0. Check if there are wait-for-zero semops.
* They must be the first entries in the per-semaphore queue
* If one of the modified semaphores got 0,
* then check the global queue, too.
*/
h = list_first_entry(&curr->sem_pending, struct sem_queue, list);
BUG_ON(h->nsops != 1);
BUG_ON(h->sops[0].sem_num != q->sops[0].sem_num);
if (got_zero)
semop_completed |= wake_const_ops(sma, -1, pt);

/* Yes, there is a wait-for-zero semop. Restart */
if (h->sops[0].sem_op == 0)
return 1;

/* Again - no-one is waiting for the new value. */
return 0;
return semop_completed;
}


Expand All @@ -678,6 +755,8 @@ static int check_restart(struct sem_array *sma, struct sem_queue *q)
* semaphore.
* The tasks that must be woken up are added to @pt. The return code
* is stored in q->pid.
* The function internally checks if const operations can now succeed.
*
* The function return 1 if at least one semop was completed successfully.
*/
static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
Expand All @@ -688,9 +767,9 @@ static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
int semop_completed = 0;

if (semnum == -1)
pending_list = &sma->sem_pending;
pending_list = &sma->pending_alter;
else
pending_list = &sma->sem_base[semnum].sem_pending;
pending_list = &sma->sem_base[semnum].pending_alter;

again:
walk = pending_list->next;
Expand All @@ -702,13 +781,12 @@ static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)

/* If we are scanning the single sop, per-semaphore list of
* one semaphore and that semaphore is 0, then it is not
* necessary to scan the "alter" entries: simple increments
* necessary to scan further: simple increments
* that affect only one entry succeed immediately and cannot
* be in the per semaphore pending queue, and decrements
* cannot be successful if the value is already 0.
*/
if (semnum != -1 && sma->sem_base[semnum].semval == 0 &&
q->alter)
if (semnum != -1 && sma->sem_base[semnum].semval == 0)
break;

error = try_atomic_semop(sma, q->sops, q->nsops,
Expand All @@ -724,6 +802,7 @@ static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
restart = 0;
} else {
semop_completed = 1;
do_smart_wakeup_zero(sma, q->sops, q->nsops, pt);
restart = check_restart(sma, q);
}

Expand All @@ -742,8 +821,8 @@ static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
* @otime: force setting otime
* @pt: list head of the tasks that must be woken up.
*
* do_smart_update() does the required called to update_queue, based on the
* actual changes that were performed on the semaphore array.
* do_smart_update() does the required calls to update_queue and wakeup_zero,
* based on the actual changes that were performed on the semaphore array.
* Note that the function does not do the actual wake-up: the caller is
* responsible for calling wake_up_sem_queue_do(@pt).
* It is safe to perform this call after dropping all locks.
Expand All @@ -754,6 +833,8 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsop
int i;
int progress;

otime |= do_smart_wakeup_zero(sma, sops, nsops, pt);

progress = 1;
retry_global:
if (sma->complex_count) {
Expand Down Expand Up @@ -813,14 +894,14 @@ static int count_semncnt (struct sem_array * sma, ushort semnum)
struct sem_queue * q;

semncnt = 0;
list_for_each_entry(q, &sma->sem_base[semnum].sem_pending, list) {
list_for_each_entry(q, &sma->sem_base[semnum].pending_alter, list) {
struct sembuf * sops = q->sops;
BUG_ON(sops->sem_num != semnum);
if ((sops->sem_op < 0) && !(sops->sem_flg & IPC_NOWAIT))
semncnt++;
}

list_for_each_entry(q, &sma->sem_pending, list) {
list_for_each_entry(q, &sma->pending_alter, list) {
struct sembuf * sops = q->sops;
int nsops = q->nsops;
int i;
Expand All @@ -839,14 +920,14 @@ static int count_semzcnt (struct sem_array * sma, ushort semnum)
struct sem_queue * q;

semzcnt = 0;
list_for_each_entry(q, &sma->sem_base[semnum].sem_pending, list) {
list_for_each_entry(q, &sma->sem_base[semnum].pending_const, list) {
struct sembuf * sops = q->sops;
BUG_ON(sops->sem_num != semnum);
if ((sops->sem_op == 0) && !(sops->sem_flg & IPC_NOWAIT))
semzcnt++;
}

list_for_each_entry(q, &sma->sem_pending, list) {
list_for_each_entry(q, &sma->pending_const, list) {
struct sembuf * sops = q->sops;
int nsops = q->nsops;
int i;
Expand Down Expand Up @@ -884,13 +965,22 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)

/* Wake up all pending processes and let them fail with EIDRM. */
INIT_LIST_HEAD(&tasks);
list_for_each_entry_safe(q, tq, &sma->sem_pending, list) {
list_for_each_entry_safe(q, tq, &sma->pending_const, list) {
unlink_queue(sma, q);
wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
}

list_for_each_entry_safe(q, tq, &sma->pending_alter, list) {
unlink_queue(sma, q);
wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
}
for (i = 0; i < sma->sem_nsems; i++) {
struct sem *sem = sma->sem_base + i;
list_for_each_entry_safe(q, tq, &sem->sem_pending, list) {
list_for_each_entry_safe(q, tq, &sem->pending_const, list) {
unlink_queue(sma, q);
wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
}
list_for_each_entry_safe(q, tq, &sem->pending_alter, list) {
unlink_queue(sma, q);
wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
}
Expand Down Expand Up @@ -1658,14 +1748,15 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
curr = &sma->sem_base[sops->sem_num];

if (alter)
list_add_tail(&queue.list, &curr->sem_pending);
list_add_tail(&queue.list, &curr->pending_alter);
else
list_add(&queue.list, &curr->sem_pending);
list_add_tail(&queue.list, &curr->pending_const);
} else {
if (alter)
list_add_tail(&queue.list, &sma->sem_pending);
list_add_tail(&queue.list, &sma->pending_alter);
else
list_add(&queue.list, &sma->sem_pending);
list_add_tail(&queue.list, &sma->pending_const);

sma->complex_count++;
}

Expand Down

0 comments on commit 1a82e9e

Please sign in to comment.