Skip to content

Commit

Permalink
Merge branch 'pipe-exclusive-wakeup'
Browse files Browse the repository at this point in the history
Merge thundering herd avoidance on pipe IO.

This would have been applied for 5.5 already, but got delayed because of
a user-space race condition in the GNU make jobserver code.  Now that
there's a new GNU make 4.3 release, and most distributions seem to have
at least applied the (almost three year old) fix for the problem, let's
see if people notice.

And it might have been just bad random timing luck on my machine.

If you do hit the race condition, things will still work, but the
symptom is that you don't get nearly the expected parallelism when using
"make -j<N>".

The jobserver bug can definitely happen without this patch too, but
seems to be easier to trigger when we no longer wake up pipe waiters
unnecessarily.

* pipe-exclusive-wakeup:
  pipe: use exclusive waits when reading or writing
  • Loading branch information
torvalds committed Feb 8, 2020
2 parents f757165 + 0ddad21 commit 9959333
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 30 deletions.
4 changes: 2 additions & 2 deletions fs/coredump.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,15 +517,15 @@ static void wait_for_dump_helpers(struct file *file)
pipe_lock(pipe);
pipe->readers++;
pipe->writers--;
wake_up_interruptible_sync(&pipe->wait);
wake_up_interruptible_sync(&pipe->rd_wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
pipe_unlock(pipe);

/*
* We actually want wait_event_freezable() but then we need
* to clear TIF_SIGPENDING and improve dump_interrupted().
*/
wait_event_interruptible(pipe->wait, pipe->readers == 1);
wait_event_interruptible(pipe->rd_wait, pipe->readers == 1);

pipe_lock(pipe);
pipe->readers--;
Expand Down
67 changes: 44 additions & 23 deletions fs/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,19 @@ void pipe_double_lock(struct pipe_inode_info *pipe1,
/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct pipe_inode_info *pipe)
{
DEFINE_WAIT(wait);
DEFINE_WAIT(rdwait);
DEFINE_WAIT(wrwait);

/*
* Pipes are system-local resources, so sleeping on them
* is considered a noninteractive wait:
*/
prepare_to_wait(&pipe->wait, &wait, TASK_INTERRUPTIBLE);
prepare_to_wait(&pipe->rd_wait, &rdwait, TASK_INTERRUPTIBLE);
prepare_to_wait(&pipe->wr_wait, &wrwait, TASK_INTERRUPTIBLE);
pipe_unlock(pipe);
schedule();
finish_wait(&pipe->wait, &wait);
finish_wait(&pipe->rd_wait, &rdwait);
finish_wait(&pipe->wr_wait, &wrwait);
pipe_lock(pipe);
}

Expand Down Expand Up @@ -286,7 +289,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
bool was_full;
bool was_full, wake_next_reader = false;
ssize_t ret;

/* Null read succeeds. */
Expand Down Expand Up @@ -344,10 +347,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)

if (!buf->len) {
pipe_buf_release(pipe, buf);
spin_lock_irq(&pipe->wait.lock);
spin_lock_irq(&pipe->rd_wait.lock);
tail++;
pipe->tail = tail;
spin_unlock_irq(&pipe->wait.lock);
spin_unlock_irq(&pipe->rd_wait.lock);
}
total_len -= chars;
if (!total_len)
Expand Down Expand Up @@ -384,7 +387,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
* no data.
*/
if (unlikely(was_full)) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}

Expand All @@ -394,18 +397,23 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
* since we've done any required wakeups and there's no need
* to mark anything accessed. And we've dropped the lock.
*/
if (wait_event_interruptible(pipe->wait, pipe_readable(pipe)) < 0)
if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
return -ERESTARTSYS;

__pipe_lock(pipe);
was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
wake_next_reader = true;
}
if (pipe_empty(pipe->head, pipe->tail))
wake_next_reader = false;
__pipe_unlock(pipe);

if (was_full) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
if (wake_next_reader)
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
if (ret > 0)
file_accessed(filp);
return ret;
Expand Down Expand Up @@ -437,6 +445,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
size_t total_len = iov_iter_count(from);
ssize_t chars;
bool was_empty = false;
bool wake_next_writer = false;

/* Null write succeeds. */
if (unlikely(total_len == 0))
Expand Down Expand Up @@ -515,16 +524,16 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
* it, either the reader will consume it or it'll still
* be there for the next write.
*/
spin_lock_irq(&pipe->wait.lock);
spin_lock_irq(&pipe->rd_wait.lock);

head = pipe->head;
if (pipe_full(head, pipe->tail, pipe->max_usage)) {
spin_unlock_irq(&pipe->wait.lock);
spin_unlock_irq(&pipe->rd_wait.lock);
continue;
}

pipe->head = head + 1;
spin_unlock_irq(&pipe->wait.lock);
spin_unlock_irq(&pipe->rd_wait.lock);

/* Insert it into the buffer array */
buf = &pipe->bufs[head & mask];
Expand Down Expand Up @@ -576,14 +585,17 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
*/
__pipe_unlock(pipe);
if (was_empty) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}
wait_event_interruptible(pipe->wait, pipe_writable(pipe));
wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
__pipe_lock(pipe);
was_empty = pipe_empty(pipe->head, pipe->tail);
wake_next_writer = true;
}
out:
if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
wake_next_writer = false;
__pipe_unlock(pipe);

/*
Expand All @@ -596,9 +608,11 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
* wake up pending jobs
*/
if (was_empty) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}
if (wake_next_writer)
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {
int err = file_update_time(filp);
if (err)
Expand Down Expand Up @@ -642,12 +656,15 @@ pipe_poll(struct file *filp, poll_table *wait)
unsigned int head, tail;

/*
* Reading only -- no need for acquiring the semaphore.
* Reading pipe state only -- no need for acquiring the semaphore.
*
* But because this is racy, the code has to add the
* entry to the poll table _first_ ..
*/
poll_wait(filp, &pipe->wait, wait);
if (filp->f_mode & FMODE_READ)
poll_wait(filp, &pipe->rd_wait, wait);
if (filp->f_mode & FMODE_WRITE)
poll_wait(filp, &pipe->wr_wait, wait);

/*
* .. and only then can you do the racy tests. That way,
Expand Down Expand Up @@ -706,7 +723,8 @@ pipe_release(struct inode *inode, struct file *file)
pipe->writers--;

if (pipe->readers || pipe->writers) {
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM | EPOLLERR | EPOLLHUP);
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM | EPOLLERR | EPOLLHUP);
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM | EPOLLERR | EPOLLHUP);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
Expand Down Expand Up @@ -789,7 +807,8 @@ struct pipe_inode_info *alloc_pipe_info(void)
GFP_KERNEL_ACCOUNT);

if (pipe->bufs) {
init_waitqueue_head(&pipe->wait);
init_waitqueue_head(&pipe->rd_wait);
init_waitqueue_head(&pipe->wr_wait);
pipe->r_counter = pipe->w_counter = 1;
pipe->max_usage = pipe_bufs;
pipe->ring_size = pipe_bufs;
Expand Down Expand Up @@ -1007,7 +1026,8 @@ static int wait_for_partner(struct pipe_inode_info *pipe, unsigned int *cnt)

static void wake_up_partner(struct pipe_inode_info *pipe)
{
wake_up_interruptible(&pipe->wait);
wake_up_interruptible(&pipe->rd_wait);
wake_up_interruptible(&pipe->wr_wait);
}

static int fifo_open(struct inode *inode, struct file *filp)
Expand Down Expand Up @@ -1118,13 +1138,13 @@ static int fifo_open(struct inode *inode, struct file *filp)

err_rd:
if (!--pipe->readers)
wake_up_interruptible(&pipe->wait);
wake_up_interruptible(&pipe->wr_wait);
ret = -ERESTARTSYS;
goto err;

err_wr:
if (!--pipe->writers)
wake_up_interruptible(&pipe->wait);
wake_up_interruptible(&pipe->rd_wait);
ret = -ERESTARTSYS;
goto err;

Expand Down Expand Up @@ -1251,7 +1271,8 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
pipe->max_usage = nr_slots;
pipe->tail = tail;
pipe->head = head;
wake_up_interruptible_all(&pipe->wait);
wake_up_interruptible_all(&pipe->rd_wait);
wake_up_interruptible_all(&pipe->wr_wait);
return pipe->max_usage * PAGE_SIZE;

out_revert_acct:
Expand Down
8 changes: 4 additions & 4 deletions fs/splice.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ static const struct pipe_buf_operations user_page_pipe_buf_ops = {
static void wakeup_pipe_readers(struct pipe_inode_info *pipe)
{
smp_mb();
if (waitqueue_active(&pipe->wait))
wake_up_interruptible(&pipe->wait);
if (waitqueue_active(&pipe->rd_wait))
wake_up_interruptible(&pipe->rd_wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}

Expand Down Expand Up @@ -462,8 +462,8 @@ static int pipe_to_sendpage(struct pipe_inode_info *pipe,
static void wakeup_pipe_writers(struct pipe_inode_info *pipe)
{
smp_mb();
if (waitqueue_active(&pipe->wait))
wake_up_interruptible(&pipe->wait);
if (waitqueue_active(&pipe->wr_wait))
wake_up_interruptible(&pipe->wr_wait);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}

Expand Down
2 changes: 1 addition & 1 deletion include/linux/pipe_fs_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct pipe_buffer {
**/
struct pipe_inode_info {
struct mutex mutex;
wait_queue_head_t wait;
wait_queue_head_t rd_wait, wr_wait;
unsigned int head;
unsigned int tail;
unsigned int max_usage;
Expand Down

0 comments on commit 9959333

Please sign in to comment.