Skip to content

Commit

Permalink
aio: add aio_context_acquire() and aio_context_release()
Browse files Browse the repository at this point in the history
It can be useful to run an AioContext from a thread which normally does
not "own" the AioContext.  For example, request draining can be
implemented by acquiring the AioContext and looping aio_poll() until all
requests have been completed.

The following pattern should work:

  /* Event loop thread */
  while (running) {
      aio_context_acquire(ctx);
      aio_poll(ctx, true);
      aio_context_release(ctx);
  }

  /* Another thread */
  aio_context_acquire(ctx);
  bdrv_read(bs, 0x1000, buf, 1);
  aio_context_release(ctx);

This patch implements aio_context_acquire() and aio_context_release().

Note that existing aio_poll() callers do not need to worry about
acquiring and releasing - it is only needed when multiple threads will
call aio_poll() on the same AioContext.

Signed-off-by: Stefan Hajnoczi <[email protected]>
  • Loading branch information
stefanhaRH committed Mar 13, 2014
1 parent 2da61b6 commit 98563fc
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
18 changes: 18 additions & 0 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ aio_ctx_finalize(GSource *source)
thread_pool_free(ctx->thread_pool);
aio_set_event_notifier(ctx, &ctx->notifier, NULL);
event_notifier_cleanup(&ctx->notifier);
rfifolock_destroy(&ctx->lock);
qemu_mutex_destroy(&ctx->bh_lock);
g_array_free(ctx->pollfds, TRUE);
timerlistgroup_deinit(&ctx->tlg);
Expand Down Expand Up @@ -250,13 +251,20 @@ static void aio_timerlist_notify(void *opaque)
aio_notify(opaque);
}

static void aio_rfifolock_cb(void *opaque)
{
/* Kick owner thread in case they are blocked in aio_poll() */
aio_notify(opaque);
}

AioContext *aio_context_new(void)
{
AioContext *ctx;
ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
ctx->thread_pool = NULL;
qemu_mutex_init(&ctx->bh_lock);
rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
event_notifier_init(&ctx->notifier, false);
aio_set_event_notifier(ctx, &ctx->notifier,
(EventNotifierHandler *)
Expand All @@ -275,3 +283,13 @@ void aio_context_unref(AioContext *ctx)
{
g_source_unref(&ctx->source);
}

void aio_context_acquire(AioContext *ctx)
{
rfifolock_lock(&ctx->lock);
}

void aio_context_release(AioContext *ctx)
{
rfifolock_unlock(&ctx->lock);
}
18 changes: 18 additions & 0 deletions include/block/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "qemu/queue.h"
#include "qemu/event_notifier.h"
#include "qemu/thread.h"
#include "qemu/rfifolock.h"
#include "qemu/timer.h"

typedef struct BlockDriverAIOCB BlockDriverAIOCB;
Expand Down Expand Up @@ -47,6 +48,9 @@ typedef void IOHandler(void *opaque);
struct AioContext {
GSource source;

/* Protects all fields from multi-threaded access */
RFifoLock lock;

/* The list of registered AIO handlers */
QLIST_HEAD(, AioHandler) aio_handlers;

Expand Down Expand Up @@ -104,6 +108,20 @@ void aio_context_ref(AioContext *ctx);
*/
void aio_context_unref(AioContext *ctx);

/* Take ownership of the AioContext. If the AioContext will be shared between
* threads, a thread must have ownership when calling aio_poll().
*
* Note that multiple threads calling aio_poll() means timers, BHs, and
* callbacks may be invoked from a different thread than they were registered
* from. Therefore, code must use AioContext acquire/release or use
* fine-grained synchronization to protect shared state if other threads will
* be accessing it simultaneously.
*/
void aio_context_acquire(AioContext *ctx);

/* Relinquish ownership of the AioContext. */
void aio_context_release(AioContext *ctx);

/**
* aio_bh_new: Allocate a new bottom half structure.
*
Expand Down
59 changes: 59 additions & 0 deletions tests/test-aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,64 @@ static void test_notify(void)
g_assert(!aio_poll(ctx, false));
}

typedef struct {
QemuMutex start_lock;
bool thread_acquired;
} AcquireTestData;

static void *test_acquire_thread(void *opaque)
{
AcquireTestData *data = opaque;

/* Wait for other thread to let us start */
qemu_mutex_lock(&data->start_lock);
qemu_mutex_unlock(&data->start_lock);

aio_context_acquire(ctx);
aio_context_release(ctx);

data->thread_acquired = true; /* success, we got here */

return NULL;
}

static void dummy_notifier_read(EventNotifier *unused)
{
g_assert(false); /* should never be invoked */
}

static void test_acquire(void)
{
QemuThread thread;
EventNotifier notifier;
AcquireTestData data;

/* Dummy event notifier ensures aio_poll() will block */
event_notifier_init(&notifier, false);
aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */

qemu_mutex_init(&data.start_lock);
qemu_mutex_lock(&data.start_lock);
data.thread_acquired = false;

qemu_thread_create(&thread, "test_acquire_thread",
test_acquire_thread,
&data, QEMU_THREAD_JOINABLE);

/* Block in aio_poll(), let other thread kick us and acquire context */
aio_context_acquire(ctx);
qemu_mutex_unlock(&data.start_lock); /* let the thread run */
g_assert(!aio_poll(ctx, true));
aio_context_release(ctx);

qemu_thread_join(&thread);
aio_set_event_notifier(ctx, &notifier, NULL);
event_notifier_cleanup(&notifier);

g_assert(data.thread_acquired);
}

static void test_bh_schedule(void)
{
BHTestData data = { .n = 0 };
Expand Down Expand Up @@ -775,6 +833,7 @@ int main(int argc, char **argv)

g_test_init(&argc, &argv, NULL);
g_test_add_func("/aio/notify", test_notify);
g_test_add_func("/aio/acquire", test_acquire);
g_test_add_func("/aio/bh/schedule", test_bh_schedule);
g_test_add_func("/aio/bh/schedule10", test_bh_schedule10);
g_test_add_func("/aio/bh/cancel", test_bh_cancel);
Expand Down

0 comments on commit 98563fc

Please sign in to comment.