Skip to content

Commit

Permalink
job: Add reference counting
Browse files Browse the repository at this point in the history
This moves reference counting from BlockJob to Job.

In order to keep calling the BlockJob cleanup code when the job is
deleted via job_unref(), introduce a new JobDriver.free callback. Every
block job must use block_job_free() for this callback, this is asserted
in block_job_create().

Signed-off-by: Kevin Wolf <[email protected]>
Reviewed-by: Max Reitz <[email protected]>
Reviewed-by: John Snow <[email protected]>
  • Loading branch information
kevmw committed May 23, 2018
1 parent a50c2ab commit 80fa2c7
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 58 deletions.
1 change: 1 addition & 0 deletions block/backup.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ static const BlockJobDriver backup_job_driver = {
.job_driver = {
.instance_size = sizeof(BackupBlockJob),
.job_type = JOB_TYPE_BACKUP,
.free = block_job_free,
},
.start = backup_run,
.commit = backup_commit,
Expand Down
1 change: 1 addition & 0 deletions block/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ static const BlockJobDriver commit_job_driver = {
.job_driver = {
.instance_size = sizeof(CommitBlockJob),
.job_type = JOB_TYPE_COMMIT,
.free = block_job_free,
},
.start = commit_run,
};
Expand Down
2 changes: 2 additions & 0 deletions block/mirror.c
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ static const BlockJobDriver mirror_job_driver = {
.job_driver = {
.instance_size = sizeof(MirrorBlockJob),
.job_type = JOB_TYPE_MIRROR,
.free = block_job_free,
},
.start = mirror_run,
.complete = mirror_complete,
Expand All @@ -1001,6 +1002,7 @@ static const BlockJobDriver commit_active_job_driver = {
.job_driver = {
.instance_size = sizeof(MirrorBlockJob),
.job_type = JOB_TYPE_COMMIT,
.free = block_job_free,
},
.start = mirror_run,
.complete = mirror_complete,
Expand Down
1 change: 1 addition & 0 deletions block/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ static const BlockJobDriver stream_job_driver = {
.job_driver = {
.instance_size = sizeof(StreamBlockJob),
.job_type = JOB_TYPE_STREAM,
.free = block_job_free,
},
.start = stream_run,
};
Expand Down
48 changes: 21 additions & 27 deletions blockjob.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,31 +190,25 @@ static void block_job_resume(BlockJob *job)
block_job_enter_cond(job, block_job_timer_not_pending);
}

void block_job_ref(BlockJob *job)
{
++job->refcnt;
}

static void block_job_attached_aio_context(AioContext *new_context,
void *opaque);
static void block_job_detach_aio_context(void *opaque);

void block_job_unref(BlockJob *job)
void block_job_free(Job *job)
{
if (--job->refcnt == 0) {
assert(job->job.status == JOB_STATUS_NULL);
assert(!job->txn);
BlockDriverState *bs = blk_bs(job->blk);
bs->job = NULL;
block_job_remove_all_bdrv(job);
blk_remove_aio_context_notifier(job->blk,
block_job_attached_aio_context,
block_job_detach_aio_context, job);
blk_unref(job->blk);
error_free(job->blocker);
assert(!timer_pending(&job->sleep_timer));
job_delete(&job->job);
}
BlockJob *bjob = container_of(job, BlockJob, job);
BlockDriverState *bs = blk_bs(bjob->blk);

assert(!bjob->txn);

bs->job = NULL;
block_job_remove_all_bdrv(bjob);
blk_remove_aio_context_notifier(bjob->blk,
block_job_attached_aio_context,
block_job_detach_aio_context, bjob);
blk_unref(bjob->blk);
error_free(bjob->blocker);
assert(!timer_pending(&bjob->sleep_timer));
}

static void block_job_attached_aio_context(AioContext *new_context,
Expand Down Expand Up @@ -245,15 +239,15 @@ static void block_job_detach_aio_context(void *opaque)
BlockJob *job = opaque;

/* In case the job terminates during aio_poll()... */
block_job_ref(job);
job_ref(&job->job);

block_job_pause(job);

while (!job->paused && !job->completed) {
block_job_drain(job);
}

block_job_unref(job);
job_unref(&job->job);
}

static char *child_job_get_parent_desc(BdrvChild *c)
Expand Down Expand Up @@ -367,7 +361,7 @@ static void block_job_decommission(BlockJob *job)
job->deferred_to_main_loop = true;
block_job_txn_del_job(job);
job_state_transition(&job->job, JOB_STATUS_NULL);
block_job_unref(job);
job_unref(&job->job);
}

static void block_job_do_dismiss(BlockJob *job)
Expand Down Expand Up @@ -506,14 +500,14 @@ static int block_job_finish_sync(BlockJob *job,

assert(blk_bs(job->blk)->job == job);

block_job_ref(job);
job_ref(&job->job);

if (finish) {
finish(job, &local_err);
}
if (local_err) {
error_propagate(errp, local_err);
block_job_unref(job);
job_unref(&job->job);
return -EBUSY;
}
/* block_job_drain calls block_job_enter, and it should be enough to
Expand All @@ -526,7 +520,7 @@ static int block_job_finish_sync(BlockJob *job,
aio_poll(qemu_get_aio_context(), true);
}
ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
block_job_unref(job);
job_unref(&job->job);
return ret;
}

Expand Down Expand Up @@ -909,6 +903,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
}

assert(is_block_job(&job->job));
assert(job->job.driver->free == &block_job_free);

job->driver = driver;
job->blk = blk;
Expand All @@ -917,7 +912,6 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->busy = false;
job->paused = true;
job->pause_count = 1;
job->refcnt = 1;
job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS);
aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
Expand Down
21 changes: 0 additions & 21 deletions include/block/blockjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ typedef struct BlockJob {
/** The opaque value that is passed to the completion function. */
void *opaque;

/** Reference count of the block job */
int refcnt;

/** True when job has reported completion by calling block_job_completed. */
bool completed;

Expand Down Expand Up @@ -399,24 +396,6 @@ void block_job_iostatus_reset(BlockJob *job);
*/
BlockJobTxn *block_job_txn_new(void);

/**
* block_job_ref:
*
* Add a reference to BlockJob refcnt, it will be decreased with
* block_job_unref, and then be freed if it comes to be the last
* reference.
*/
void block_job_ref(BlockJob *job);

/**
* block_job_unref:
*
* Release a reference that was previously acquired with block_job_ref
* or block_job_create. If it's the last reference to the object, it will be
* freed.
*/
void block_job_unref(BlockJob *job);

/**
* block_job_txn_unref:
*
Expand Down
7 changes: 7 additions & 0 deletions include/block/blockjob_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
uint64_t shared_perm, int64_t speed, int flags,
BlockCompletionFunc *cb, void *opaque, Error **errp);

/**
* block_job_free:
* Callback to be used for JobDriver.free in all block jobs. Frees block job
* specific resources in @job.
*/
void block_job_free(Job *job);

/**
* block_job_sleep_ns:
* @job: The job that calls the function.
Expand Down
19 changes: 17 additions & 2 deletions include/qemu/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ typedef struct Job {
/** The type of this job. */
const JobDriver *driver;

/** Reference count of the block job */
int refcnt;

/** Current state; See @JobStatus for details. */
JobStatus status;

Expand All @@ -57,6 +60,9 @@ struct JobDriver {

/** Enum describing the operation */
JobType job_type;

/** Called when the job is freed */
void (*free)(Job *job);
};


Expand All @@ -69,8 +75,17 @@ struct JobDriver {
*/
void *job_create(const char *job_id, const JobDriver *driver, Error **errp);

/** Frees the @job object. */
void job_delete(Job *job);
/**
* Add a reference to Job refcnt, it will be decreased with job_unref, and then
* be freed if it comes to be the last reference.
*/
void job_ref(Job *job);

/**
* Release a reference that was previously acquired with job_ref() or
* job_create(). If it's the last reference to the object, it will be freed.
*/
void job_unref(Job *job);

/** Returns the JobType of a given Job. */
JobType job_type(const Job *job);
Expand Down
22 changes: 18 additions & 4 deletions job.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ void *job_create(const char *job_id, const JobDriver *driver, Error **errp)
job = g_malloc0(driver->instance_size);
job->driver = driver;
job->id = g_strdup(job_id);
job->refcnt = 1;

job_state_transition(job, JOB_STATUS_CREATED);

Expand All @@ -142,10 +143,23 @@ void *job_create(const char *job_id, const JobDriver *driver, Error **errp)
return job;
}

void job_delete(Job *job)
void job_ref(Job *job)
{
QLIST_REMOVE(job, job_list);
++job->refcnt;
}

void job_unref(Job *job)
{
if (--job->refcnt == 0) {
assert(job->status == JOB_STATUS_NULL);

g_free(job->id);
g_free(job);
if (job->driver->free) {
job->driver->free(job);
}

QLIST_REMOVE(job, job_list);

g_free(job->id);
g_free(job);
}
}
4 changes: 2 additions & 2 deletions qemu-img.c
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ static void run_block_job(BlockJob *job, Error **errp)
int ret = 0;

aio_context_acquire(aio_context);
block_job_ref(job);
job_ref(&job->job);
do {
aio_poll(aio_context, true);
qemu_progress_print(job->len ?
Expand All @@ -873,7 +873,7 @@ static void run_block_job(BlockJob *job, Error **errp)
} else {
ret = job->ret;
}
block_job_unref(job);
job_unref(&job->job);
aio_context_release(aio_context);

/* publish completion progress only when success */
Expand Down
1 change: 1 addition & 0 deletions tests/test-bdrv-drain.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ static void test_job_complete(BlockJob *job, Error **errp)
BlockJobDriver test_job_driver = {
.job_driver = {
.instance_size = sizeof(TestBlockJob),
.free = block_job_free,
},
.start = test_job_start,
.complete = test_job_complete,
Expand Down
1 change: 1 addition & 0 deletions tests/test-blockjob-txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static void test_block_job_cb(void *opaque, int ret)
static const BlockJobDriver test_block_job_driver = {
.job_driver = {
.instance_size = sizeof(TestBlockJob),
.free = block_job_free,
},
.start = test_block_job_run,
};
Expand Down
6 changes: 4 additions & 2 deletions tests/test-blockjob.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
static const BlockJobDriver test_block_job_driver = {
.job_driver = {
.instance_size = sizeof(BlockJob),
.free = block_job_free,
},
};

Expand Down Expand Up @@ -196,6 +197,7 @@ static void coroutine_fn cancel_job_start(void *opaque)
static const BlockJobDriver test_cancel_driver = {
.job_driver = {
.instance_size = sizeof(CancelJob),
.free = block_job_free,
},
.start = cancel_job_start,
.complete = cancel_job_complete,
Expand All @@ -210,7 +212,7 @@ static CancelJob *create_common(BlockJob **pjob)
blk = create_blk(NULL);
job = mk_job(blk, "Steve", &test_cancel_driver, true,
BLOCK_JOB_MANUAL_FINALIZE | BLOCK_JOB_MANUAL_DISMISS);
block_job_ref(job);
job_ref(&job->job);
assert(job->job.status == JOB_STATUS_CREATED);
s = container_of(job, CancelJob, common);
s->blk = blk;
Expand All @@ -231,7 +233,7 @@ static void cancel_common(CancelJob *s)
block_job_dismiss(&dummy, &error_abort);
}
assert(job->job.status == JOB_STATUS_NULL);
block_job_unref(job);
job_unref(&job->job);
destroy_blk(blk);
}

Expand Down

0 comments on commit 80fa2c7

Please sign in to comment.