Skip to content

Commit

Permalink
rcu: Fix rcu_barrier() race that could result in too-short wait
Browse files Browse the repository at this point in the history
The rcu_barrier() no-callbacks check for no-CBs CPUs has race conditions.
It checks a given CPU's lists of callbacks, and if all three no-CBs lists
are empty, ignores that CPU.  However, these three lists could potentially
be empty even when callbacks are present if the check executed just as
the callbacks were being moved from one list to another.  It turns out
that recent versions of rcutorture can spot this race.

This commit plugs this hole by consolidating the per-list counts of
no-CBs callbacks into a single count, which is incremented before
the corresponding callback is posted and after it is invoked.  Then
rcu_barrier() checks this single count to reliably determine whether
the corresponding CPU has no-CBs callbacks.

Signed-off-by: Paul E. McKenney <[email protected]>
  • Loading branch information
paulmck committed Jan 6, 2015
1 parent 87af9e7 commit 41050a0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 39 deletions.
1 change: 1 addition & 0 deletions kernel/rcu/tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -3344,6 +3344,7 @@ static void _rcu_barrier(struct rcu_state *rsp)
} else {
_rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
rsp->n_barrier_done);
smp_mb__before_atomic();
atomic_inc(&rsp->barrier_cpu_count);
__call_rcu(&rdp->barrier_head,
rcu_barrier_callback, rsp, cpu, 0);
Expand Down
29 changes: 7 additions & 22 deletions kernel/rcu/tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,10 @@ struct rcu_data {
#ifdef CONFIG_RCU_NOCB_CPU
struct rcu_head *nocb_head; /* CBs waiting for kthread. */
struct rcu_head **nocb_tail;
atomic_long_t nocb_q_count; /* # CBs waiting for kthread */
atomic_long_t nocb_q_count_lazy; /* (approximate). */
atomic_long_t nocb_q_count; /* # CBs waiting for nocb */
atomic_long_t nocb_q_count_lazy; /* invocation (all stages). */
struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */
struct rcu_head **nocb_follower_tail;
atomic_long_t nocb_follower_count; /* # CBs ready to invoke. */
atomic_long_t nocb_follower_count_lazy; /* (approximate). */
int nocb_p_count; /* # CBs being invoked by kthread */
int nocb_p_count_lazy; /* (approximate). */
wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */
struct task_struct *nocb_kthread;
int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */
Expand All @@ -356,8 +352,6 @@ struct rcu_data {
struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
/* CBs waiting for GP. */
struct rcu_head **nocb_gp_tail;
long nocb_gp_count;
long nocb_gp_count_lazy;
bool nocb_leader_sleep; /* Is the nocb leader thread asleep? */
struct rcu_data *nocb_next_follower;
/* Next follower in wakeup chain. */
Expand Down Expand Up @@ -622,24 +616,15 @@ static void rcu_dynticks_task_exit(void);
#endif /* #ifndef RCU_TREE_NONCORE */

#ifdef CONFIG_RCU_TRACE
#ifdef CONFIG_RCU_NOCB_CPU
/* Sum up queue lengths for tracing. */
/* Read out queue lengths for tracing. */
static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
{
*ql = atomic_long_read(&rdp->nocb_q_count) +
rdp->nocb_p_count +
atomic_long_read(&rdp->nocb_follower_count) +
rdp->nocb_p_count + rdp->nocb_gp_count;
*qll = atomic_long_read(&rdp->nocb_q_count_lazy) +
rdp->nocb_p_count_lazy +
atomic_long_read(&rdp->nocb_follower_count_lazy) +
rdp->nocb_p_count_lazy + rdp->nocb_gp_count_lazy;
}
#ifdef CONFIG_RCU_NOCB_CPU
*ql = atomic_long_read(&rdp->nocb_q_count);
*qll = atomic_long_read(&rdp->nocb_q_count_lazy);
#else /* #ifdef CONFIG_RCU_NOCB_CPU */
static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
{
*ql = 0;
*qll = 0;
}
#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
}
#endif /* #ifdef CONFIG_RCU_TRACE */
45 changes: 28 additions & 17 deletions kernel/rcu/tree_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -2056,9 +2056,26 @@ static void wake_nocb_leader(struct rcu_data *rdp, bool force)
static bool rcu_nocb_cpu_needs_barrier(struct rcu_state *rsp, int cpu)
{
struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
unsigned long ret;
#ifdef CONFIG_PROVE_RCU
struct rcu_head *rhp;
#endif /* #ifdef CONFIG_PROVE_RCU */

/*
* Check count of all no-CBs callbacks awaiting invocation.
* There needs to be a barrier before this function is called,
* but associated with a prior determination that no more
* callbacks would be posted. In the worst case, the first
* barrier in _rcu_barrier() suffices (but the caller cannot
* necessarily rely on this, not a substitute for the caller
* getting the concurrency design right!). There must also be
* a barrier between the following load an posting of a callback
* (if a callback is in fact needed). This is associated with an
* atomic_inc() in the caller.
*/
ret = atomic_long_read(&rdp->nocb_q_count);

/* No-CBs CPUs might have callbacks on any of three lists. */
#ifdef CONFIG_PROVE_RCU
rhp = ACCESS_ONCE(rdp->nocb_head);
if (!rhp)
rhp = ACCESS_ONCE(rdp->nocb_gp_head);
Expand All @@ -2072,8 +2089,9 @@ static bool rcu_nocb_cpu_needs_barrier(struct rcu_state *rsp, int cpu)
cpu, rhp->func);
WARN_ON_ONCE(1);
}
#endif /* #ifdef CONFIG_PROVE_RCU */

return !!rhp;
return !!ret;
}

/*
Expand All @@ -2095,9 +2113,10 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
struct task_struct *t;

/* Enqueue the callback on the nocb list and update counts. */
atomic_long_add(rhcount, &rdp->nocb_q_count);
/* rcu_barrier() relies on ->nocb_q_count add before xchg. */
old_rhpp = xchg(&rdp->nocb_tail, rhtp);
ACCESS_ONCE(*old_rhpp) = rhp;
atomic_long_add(rhcount, &rdp->nocb_q_count);
atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */

Expand Down Expand Up @@ -2288,9 +2307,6 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
/* Move callbacks to wait-for-GP list, which is empty. */
ACCESS_ONCE(rdp->nocb_head) = NULL;
rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
rdp->nocb_gp_count = atomic_long_xchg(&rdp->nocb_q_count, 0);
rdp->nocb_gp_count_lazy =
atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
gotcbs = true;
}

Expand Down Expand Up @@ -2338,9 +2354,6 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
/* Append callbacks to follower's "done" list. */
tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
*tail = rdp->nocb_gp_head;
atomic_long_add(rdp->nocb_gp_count, &rdp->nocb_follower_count);
atomic_long_add(rdp->nocb_gp_count_lazy,
&rdp->nocb_follower_count_lazy);
smp_mb__after_atomic(); /* Store *tail before wakeup. */
if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
/*
Expand Down Expand Up @@ -2415,13 +2428,11 @@ static int rcu_nocb_kthread(void *arg)
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
ACCESS_ONCE(rdp->nocb_follower_head) = NULL;
tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
c = atomic_long_xchg(&rdp->nocb_follower_count, 0);
cl = atomic_long_xchg(&rdp->nocb_follower_count_lazy, 0);
rdp->nocb_p_count += c;
rdp->nocb_p_count_lazy += cl;

/* Each pass through the following loop invokes a callback. */
trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
trace_rcu_batch_start(rdp->rsp->name,
atomic_long_read(&rdp->nocb_q_count_lazy),
atomic_long_read(&rdp->nocb_q_count), -1);
c = cl = 0;
while (list) {
next = list->next;
Expand All @@ -2443,9 +2454,9 @@ static int rcu_nocb_kthread(void *arg)
list = next;
}
trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
ACCESS_ONCE(rdp->nocb_p_count) = rdp->nocb_p_count - c;
ACCESS_ONCE(rdp->nocb_p_count_lazy) =
rdp->nocb_p_count_lazy - cl;
smp_mb__before_atomic(); /* _add after CB invocation. */
atomic_long_add(-c, &rdp->nocb_q_count);
atomic_long_add(-cl, &rdp->nocb_q_count_lazy);
rdp->n_nocbs_invoked += c;
}
return 0;
Expand Down

0 comments on commit 41050a0

Please sign in to comment.