Skip to content

Commit

Permalink
block: Do not poll in bdrv_do_drained_end()
Browse files Browse the repository at this point in the history
We should never poll anywhere in bdrv_do_drained_end() (including its
recursive callees like bdrv_drain_invoke()), because it does not cope
well with graph changes.  In fact, it has been written based on the
postulation that no graph changes will happen in it.

Instead, the callers that want to poll must poll, i.e. all currently
globally available wrappers: bdrv_drained_end(),
bdrv_subtree_drained_end(), bdrv_unapply_subtree_drain(), and
bdrv_drain_all_end().  Graph changes there do not matter.

They can poll simply by passing a pointer to a drained_end_counter and
wait until it reaches 0.

This patch also adds a non-polling global wrapper for
bdrv_do_drained_end() that takes a drained_end_counter pointer.  We need
such a variant because now no function called anywhere from
bdrv_do_drained_end() must poll.  This includes
BdrvChildRole.drained_end(), which already must not poll according to
its interface documentation, but bdrv_child_cb_drained_end() just
violates that by invoking bdrv_drained_end() (which does poll).
Therefore, BdrvChildRole.drained_end() must take a *drained_end_counter
parameter, which bdrv_child_cb_drained_end() can pass on to the new
bdrv_drained_end_no_poll() function.

Note that we now have a pattern of all drained_end-related functions
either polling or receiving a *drained_end_counter to let the caller
poll based on that.

A problem with a single poll loop is that when the drained section in
bdrv_set_aio_context_ignore() ends, some nodes in the subgraph may be in
the old contexts, while others are in the new context already.  To let
the collective poll in bdrv_drained_end() work correctly, we must not
hold a lock to the old context, so that the old context can make
progress in case it is different from the current context.

(In the process, remove the comment saying that the current context is
always the old context, because it is wrong.)

In all other places, all nodes in a subtree must be in the same context,
so we can just poll that.  The exception of course is
bdrv_drain_all_end(), but that always runs in the main context, so we
can just poll NULL (like bdrv_drain_all_begin() does).

Signed-off-by: Max Reitz <[email protected]>
Signed-off-by: Kevin Wolf <[email protected]>
  • Loading branch information
XanClic authored and kevmw committed Jul 19, 2019
1 parent 1b28565 commit e037c09
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 36 deletions.
37 changes: 29 additions & 8 deletions block.c
Original file line number Diff line number Diff line change
Expand Up @@ -911,10 +911,11 @@ static bool bdrv_child_cb_drained_poll(BdrvChild *child)
return bdrv_drain_poll(bs, false, NULL, false);
}

static void bdrv_child_cb_drained_end(BdrvChild *child)
static void bdrv_child_cb_drained_end(BdrvChild *child,
int *drained_end_counter)
{
BlockDriverState *bs = child->opaque;
bdrv_drained_end(bs);
bdrv_drained_end_no_poll(bs, drained_end_counter);
}

static void bdrv_child_cb_attach(BdrvChild *child)
Expand Down Expand Up @@ -5923,9 +5924,11 @@ static void bdrv_attach_aio_context(BlockDriverState *bs,
void bdrv_set_aio_context_ignore(BlockDriverState *bs,
AioContext *new_context, GSList **ignore)
{
AioContext *old_context = bdrv_get_aio_context(bs);
AioContext *current_context = qemu_get_current_aio_context();
BdrvChild *child;

if (bdrv_get_aio_context(bs) == new_context) {
if (old_context == new_context) {
return;
}

Expand All @@ -5949,13 +5952,31 @@ void bdrv_set_aio_context_ignore(BlockDriverState *bs,

bdrv_detach_aio_context(bs);

/* This function executes in the old AioContext so acquire the new one in
* case it runs in a different thread.
*/
aio_context_acquire(new_context);
/* Acquire the new context, if necessary */
if (current_context != new_context) {
aio_context_acquire(new_context);
}

bdrv_attach_aio_context(bs, new_context);

/*
* If this function was recursively called from
* bdrv_set_aio_context_ignore(), there may be nodes in the
* subtree that have not yet been moved to the new AioContext.
* Release the old one so bdrv_drained_end() can poll them.
*/
if (current_context != old_context) {
aio_context_release(old_context);
}

bdrv_drained_end(bs);
aio_context_release(new_context);

if (current_context != old_context) {
aio_context_acquire(old_context);
}
if (current_context != new_context) {
aio_context_release(new_context);
}
}

static bool bdrv_parent_can_set_aio_context(BdrvChild *c, AioContext *ctx,
Expand Down
6 changes: 3 additions & 3 deletions block/block-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options,
}
static void blk_root_drained_begin(BdrvChild *child);
static bool blk_root_drained_poll(BdrvChild *child);
static void blk_root_drained_end(BdrvChild *child);
static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter);

static void blk_root_change_media(BdrvChild *child, bool load);
static void blk_root_resize(BdrvChild *child);
Expand Down Expand Up @@ -1249,7 +1249,7 @@ int blk_pread_unthrottled(BlockBackend *blk, int64_t offset, uint8_t *buf,

blk_root_drained_begin(blk->root);
ret = blk_pread(blk, offset, buf, count);
blk_root_drained_end(blk->root);
blk_root_drained_end(blk->root, NULL);
return ret;
}

Expand Down Expand Up @@ -2236,7 +2236,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
return !!blk->in_flight;
}

static void blk_root_drained_end(BdrvChild *child)
static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter)
{
BlockBackend *blk = child->opaque;
assert(blk->quiesce_counter);
Expand Down
80 changes: 57 additions & 23 deletions block/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,34 @@ static void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore,
}
}

void bdrv_parent_drained_end_single(BdrvChild *c)
static void bdrv_parent_drained_end_single_no_poll(BdrvChild *c,
int *drained_end_counter)
{
assert(c->parent_quiesce_counter > 0);
c->parent_quiesce_counter--;
if (c->role->drained_end) {
c->role->drained_end(c);
c->role->drained_end(c, drained_end_counter);
}
}

void bdrv_parent_drained_end_single(BdrvChild *c)
{
int drained_end_counter = 0;
bdrv_parent_drained_end_single_no_poll(c, &drained_end_counter);
BDRV_POLL_WHILE(c->bs, atomic_read(&drained_end_counter) > 0);
}

static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
bool ignore_bds_parents)
bool ignore_bds_parents,
int *drained_end_counter)
{
BdrvChild *c, *next;

QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) {
if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) {
continue;
}
bdrv_parent_drained_end_single(c);
bdrv_parent_drained_end_single_no_poll(c, drained_end_counter);
}
}

Expand Down Expand Up @@ -212,13 +221,11 @@ static void coroutine_fn bdrv_drain_invoke_entry(void *opaque)
atomic_mb_set(&data->done, true);
bdrv_dec_in_flight(bs);

if (data->drained_end_counter) {
if (!data->begin) {
atomic_dec(data->drained_end_counter);
}

if (data->begin || data->drained_end_counter) {
g_free(data);
}
g_free(data);
}

/* Recursively call BlockDriver.bdrv_co_drain_begin/end callbacks */
Expand All @@ -240,7 +247,7 @@ static void bdrv_drain_invoke(BlockDriverState *bs, bool begin,
.drained_end_counter = drained_end_counter,
};

if (!begin && drained_end_counter) {
if (!begin) {
atomic_inc(drained_end_counter);
}

Expand All @@ -249,15 +256,6 @@ static void bdrv_drain_invoke(BlockDriverState *bs, bool begin,
bdrv_inc_in_flight(bs);
data->co = qemu_coroutine_create(bdrv_drain_invoke_entry, data);
aio_co_schedule(bdrv_get_aio_context(bs), data->co);

/*
* TODO: Drop this and make callers pass @drained_end_counter and poll
* themselves
*/
if (!begin && !drained_end_counter) {
BDRV_POLL_WHILE(bs, !data->done);
g_free(data);
}
}

/* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */
Expand Down Expand Up @@ -320,9 +318,11 @@ static void bdrv_co_drain_bh_cb(void *opaque)
}
bdrv_dec_in_flight(bs);
if (data->begin) {
assert(!data->drained_end_counter);
bdrv_do_drained_begin(bs, data->recursive, data->parent,
data->ignore_bds_parents, data->poll);
} else {
assert(!data->poll);
bdrv_do_drained_end(bs, data->recursive, data->parent,
data->ignore_bds_parents,
data->drained_end_counter);
Expand Down Expand Up @@ -438,13 +438,29 @@ void bdrv_subtree_drained_begin(BlockDriverState *bs)
bdrv_do_drained_begin(bs, true, NULL, false, true);
}

/**
* This function does not poll, nor must any of its recursively called
* functions. The *drained_end_counter pointee will be incremented
* once for every background operation scheduled, and decremented once
* the operation settles. Therefore, the pointer must remain valid
* until the pointee reaches 0. That implies that whoever sets up the
* pointee has to poll until it is 0.
*
* We use atomic operations to access *drained_end_counter, because
* (1) when called from bdrv_set_aio_context_ignore(), the subgraph of
* @bs may contain nodes in different AioContexts,
* (2) bdrv_drain_all_end() uses the same counter for all nodes,
* regardless of which AioContext they are in.
*/
static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
BdrvChild *parent, bool ignore_bds_parents,
int *drained_end_counter)
{
BdrvChild *child, *next;
int old_quiesce_counter;

assert(drained_end_counter != NULL);

if (qemu_in_coroutine()) {
bdrv_co_yield_to_drain(bs, false, recursive, parent, ignore_bds_parents,
false, drained_end_counter);
Expand All @@ -454,7 +470,8 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,

/* Re-enable things in child-to-parent order */
bdrv_drain_invoke(bs, false, drained_end_counter);
bdrv_parent_drained_end(bs, parent, ignore_bds_parents);
bdrv_parent_drained_end(bs, parent, ignore_bds_parents,
drained_end_counter);

old_quiesce_counter = atomic_fetch_dec(&bs->quiesce_counter);
if (old_quiesce_counter == 1) {
Expand All @@ -473,12 +490,21 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,

void bdrv_drained_end(BlockDriverState *bs)
{
bdrv_do_drained_end(bs, false, NULL, false, NULL);
int drained_end_counter = 0;
bdrv_do_drained_end(bs, false, NULL, false, &drained_end_counter);
BDRV_POLL_WHILE(bs, atomic_read(&drained_end_counter) > 0);
}

void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter)
{
bdrv_do_drained_end(bs, false, NULL, false, drained_end_counter);
}

void bdrv_subtree_drained_end(BlockDriverState *bs)
{
bdrv_do_drained_end(bs, true, NULL, false, NULL);
int drained_end_counter = 0;
bdrv_do_drained_end(bs, true, NULL, false, &drained_end_counter);
BDRV_POLL_WHILE(bs, atomic_read(&drained_end_counter) > 0);
}

void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)
Expand All @@ -492,11 +518,15 @@ void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)

void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent)
{
int drained_end_counter = 0;
int i;

for (i = 0; i < old_parent->recursive_quiesce_counter; i++) {
bdrv_do_drained_end(child->bs, true, child, false, NULL);
bdrv_do_drained_end(child->bs, true, child, false,
&drained_end_counter);
}

BDRV_POLL_WHILE(child->bs, atomic_read(&drained_end_counter) > 0);
}

/*
Expand Down Expand Up @@ -596,15 +626,19 @@ void bdrv_drain_all_begin(void)
void bdrv_drain_all_end(void)
{
BlockDriverState *bs = NULL;
int drained_end_counter = 0;

while ((bs = bdrv_next_all_states(bs))) {
AioContext *aio_context = bdrv_get_aio_context(bs);

aio_context_acquire(aio_context);
bdrv_do_drained_end(bs, false, NULL, true, NULL);
bdrv_do_drained_end(bs, false, NULL, true, &drained_end_counter);
aio_context_release(aio_context);
}

assert(qemu_get_current_aio_context() == qemu_get_aio_context());
AIO_WAIT_WHILE(NULL, atomic_read(&drained_end_counter) > 0);

assert(bdrv_drain_all_count > 0);
bdrv_drain_all_count--;
}
Expand Down
2 changes: 1 addition & 1 deletion blockjob.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ static bool child_job_drained_poll(BdrvChild *c)
}
}

static void child_job_drained_end(BdrvChild *c)
static void child_job_drained_end(BdrvChild *c, int *drained_end_counter)
{
BlockJob *job = c->opaque;
job_resume(&job->job);
Expand Down
25 changes: 25 additions & 0 deletions include/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ void bdrv_parent_drained_begin_single(BdrvChild *c, bool poll);
* bdrv_parent_drained_end_single:
*
* End a quiesced section for the parent of @c.
*
* This polls @bs's AioContext until all scheduled sub-drained_ends
* have settled, which may result in graph changes.
*/
void bdrv_parent_drained_end_single(BdrvChild *c);

Expand Down Expand Up @@ -661,9 +664,31 @@ void bdrv_subtree_drained_begin(BlockDriverState *bs);
* bdrv_drained_end:
*
* End a quiescent section started by bdrv_drained_begin().
*
* This polls @bs's AioContext until all scheduled sub-drained_ends
* have settled. On one hand, that may result in graph changes. On
* the other, this requires that all involved nodes (@bs and all of
* its parents) are in the same AioContext, and that the caller has
* acquired it.
* If there are any nodes that are in different contexts from @bs,
* these contexts must not be acquired.
*/
void bdrv_drained_end(BlockDriverState *bs);

/**
* bdrv_drained_end_no_poll:
*
* Same as bdrv_drained_end(), but do not poll for the subgraph to
* actually become unquiesced. Therefore, no graph changes will occur
* with this function.
*
* *drained_end_counter is incremented for every background operation
* that is scheduled, and will be decremented for every operation once
* it settles. The caller must poll until it reaches 0. The counter
* should be accessed using atomic operations only.
*/
void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter);

/**
* End a quiescent section started by bdrv_subtree_drained_begin().
*/
Expand Down
6 changes: 5 additions & 1 deletion include/block/block_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,15 @@ struct BdrvChildRole {
* These functions must not change the graph (and therefore also must not
* call aio_poll(), which could change the graph indirectly).
*
* If drained_end() schedules background operations, it must atomically
* increment *drained_end_counter for each such operation and atomically
* decrement it once the operation has settled.
*
* Note that this can be nested. If drained_begin() was called twice, new
* I/O is allowed only after drained_end() was called twice, too.
*/
void (*drained_begin)(BdrvChild *child);
void (*drained_end)(BdrvChild *child);
void (*drained_end)(BdrvChild *child, int *drained_end_counter);

/*
* Returns whether the parent has pending requests for the child. This
Expand Down

0 comments on commit e037c09

Please sign in to comment.