Skip to content

Commit

Permalink
job: Switch transactions to JobTxn
Browse files Browse the repository at this point in the history
This doesn't actually move any transaction code to Job yet, but it
renames the type for transactions from BlockJobTxn to JobTxn and makes
them contain Jobs rather than BlockJobs

Signed-off-by: Kevin Wolf <[email protected]>
Reviewed-by: Max Reitz <[email protected]>
  • Loading branch information
kevmw committed May 23, 2018
1 parent 6a74c07 commit 62c9e41
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 48 deletions.
2 changes: 1 addition & 1 deletion block/backup.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
BlockdevOnError on_target_error,
int creation_flags,
BlockCompletionFunc *cb, void *opaque,
BlockJobTxn *txn, Error **errp)
JobTxn *txn, Error **errp)
{
int64_t len;
BlockDriverInfo bdi;
Expand Down
14 changes: 7 additions & 7 deletions blockdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ typedef struct BlkActionOps {
struct BlkActionState {
TransactionAction *action;
const BlkActionOps *ops;
BlockJobTxn *block_job_txn;
JobTxn *block_job_txn;
TransactionProperties *txn_props;
QSIMPLEQ_ENTRY(BlkActionState) entry;
};
Expand Down Expand Up @@ -1864,7 +1864,7 @@ typedef struct DriveBackupState {
BlockJob *job;
} DriveBackupState;

static BlockJob *do_drive_backup(DriveBackup *backup, BlockJobTxn *txn,
static BlockJob *do_drive_backup(DriveBackup *backup, JobTxn *txn,
Error **errp);

static void drive_backup_prepare(BlkActionState *common, Error **errp)
Expand Down Expand Up @@ -1954,7 +1954,7 @@ typedef struct BlockdevBackupState {
BlockJob *job;
} BlockdevBackupState;

static BlockJob *do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn,
static BlockJob *do_blockdev_backup(BlockdevBackup *backup, JobTxn *txn,
Error **errp);

static void blockdev_backup_prepare(BlkActionState *common, Error **errp)
Expand Down Expand Up @@ -2243,15 +2243,15 @@ void qmp_transaction(TransactionActionList *dev_list,
Error **errp)
{
TransactionActionList *dev_entry = dev_list;
BlockJobTxn *block_job_txn = NULL;
JobTxn *block_job_txn = NULL;
BlkActionState *state, *next;
Error *local_err = NULL;

QSIMPLEQ_HEAD(snap_bdrv_states, BlkActionState) snap_bdrv_states;
QSIMPLEQ_INIT(&snap_bdrv_states);

/* Does this transaction get canceled as a group on failure?
* If not, we don't really need to make a BlockJobTxn.
* If not, we don't really need to make a JobTxn.
*/
props = get_transaction_properties(props);
if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) {
Expand Down Expand Up @@ -3264,7 +3264,7 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
aio_context_release(aio_context);
}

static BlockJob *do_drive_backup(DriveBackup *backup, BlockJobTxn *txn,
static BlockJob *do_drive_backup(DriveBackup *backup, JobTxn *txn,
Error **errp)
{
BlockDriverState *bs;
Expand Down Expand Up @@ -3434,7 +3434,7 @@ BlockDeviceInfoList *qmp_query_named_block_nodes(Error **errp)
return bdrv_named_nodes_list(errp);
}

BlockJob *do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn,
BlockJob *do_blockdev_backup(BlockdevBackup *backup, JobTxn *txn,
Error **errp)
{
BlockDriverState *bs;
Expand Down
60 changes: 32 additions & 28 deletions blockjob.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
#include "qemu/timer.h"

/* Transactional group of block jobs */
struct BlockJobTxn {
struct JobTxn {

/* Is this txn being cancelled? */
bool aborting;

/* List of jobs */
QLIST_HEAD(, BlockJob) jobs;
QLIST_HEAD(, Job) jobs;

/* Reference count */
int refcnt;
Expand Down Expand Up @@ -94,27 +94,27 @@ BlockJob *block_job_get(const char *id)
}
}

BlockJobTxn *block_job_txn_new(void)
JobTxn *block_job_txn_new(void)
{
BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
JobTxn *txn = g_new0(JobTxn, 1);
QLIST_INIT(&txn->jobs);
txn->refcnt = 1;
return txn;
}

static void block_job_txn_ref(BlockJobTxn *txn)
static void block_job_txn_ref(JobTxn *txn)
{
txn->refcnt++;
}

void block_job_txn_unref(BlockJobTxn *txn)
void block_job_txn_unref(JobTxn *txn)
{
if (txn && --txn->refcnt == 0) {
g_free(txn);
}
}

void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
void block_job_txn_add_job(JobTxn *txn, BlockJob *job)
{
if (!txn) {
return;
Expand All @@ -123,14 +123,14 @@ void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
assert(!job->txn);
job->txn = txn;

QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list);
block_job_txn_ref(txn);
}

void block_job_txn_del_job(BlockJob *job)
{
if (job->txn) {
QLIST_REMOVE(job, txn_list);
QLIST_REMOVE(&job->job, txn_list);
block_job_txn_unref(job->txn);
job->txn = NULL;
}
Expand Down Expand Up @@ -285,18 +285,22 @@ static void job_cancel_async(Job *job, bool force)
job->force_cancel |= force;
}

static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock)
{
AioContext *ctx;
BlockJob *job, *next;
Job *job, *next;
BlockJob *bjob;
int rc = 0;

QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
assert(is_block_job(job));
bjob = container_of(job, BlockJob, job);

if (lock) {
ctx = blk_get_aio_context(job->blk);
ctx = job->aio_context;
aio_context_acquire(ctx);
}
rc = fn(job);
rc = fn(bjob);
if (lock) {
aio_context_release(ctx);
}
Expand All @@ -310,8 +314,8 @@ static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
static void block_job_completed_txn_abort(BlockJob *job)
{
AioContext *ctx;
BlockJobTxn *txn = job->txn;
BlockJob *other_job;
JobTxn *txn = job->txn;
Job *other_job;

if (txn->aborting) {
/*
Expand All @@ -324,26 +328,26 @@ static void block_job_completed_txn_abort(BlockJob *job)

/* We are the first failed job. Cancel other jobs. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ctx = blk_get_aio_context(other_job->blk);
ctx = other_job->aio_context;
aio_context_acquire(ctx);
}

/* Other jobs are effectively cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (other_job != job) {
job_cancel_async(&other_job->job, false);
if (other_job != &job->job) {
job_cancel_async(other_job, false);
}
}
while (!QLIST_EMPTY(&txn->jobs)) {
other_job = QLIST_FIRST(&txn->jobs);
ctx = blk_get_aio_context(other_job->blk);
if (!job_is_completed(&other_job->job)) {
assert(job_is_cancelled(&other_job->job));
job_finish_sync(&other_job->job, NULL, NULL);
ctx = other_job->aio_context;
if (!job_is_completed(other_job)) {
assert(job_is_cancelled(other_job));
job_finish_sync(other_job, NULL, NULL);
}
job_finalize_single(&other_job->job);
job_finalize_single(other_job);
aio_context_release(ctx);
}

Expand Down Expand Up @@ -385,8 +389,8 @@ static int block_job_transition_to_pending(BlockJob *job)

static void block_job_completed_txn_success(BlockJob *job)
{
BlockJobTxn *txn = job->txn;
BlockJob *other_job;
JobTxn *txn = job->txn;
Job *other_job;

job_state_transition(&job->job, JOB_STATUS_WAITING);

Expand All @@ -395,10 +399,10 @@ static void block_job_completed_txn_success(BlockJob *job)
* txn.
*/
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (!job_is_completed(&other_job->job)) {
if (!job_is_completed(other_job)) {
return;
}
assert(other_job->job.ret == 0);
assert(other_job->ret == 0);
}

block_job_txn_apply(txn, block_job_transition_to_pending, false);
Expand Down Expand Up @@ -628,7 +632,7 @@ static void block_job_event_pending(Notifier *n, void *opaque)
*/

void *block_job_create(const char *job_id, const BlockJobDriver *driver,
BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
JobTxn *txn, BlockDriverState *bs, uint64_t perm,
uint64_t shared_perm, int64_t speed, int flags,
BlockCompletionFunc *cb, void *opaque, Error **errp)
{
Expand Down
2 changes: 1 addition & 1 deletion include/block/block_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
BlockdevOnError on_target_error,
int creation_flags,
BlockCompletionFunc *cb, void *opaque,
BlockJobTxn *txn, Error **errp);
JobTxn *txn, Error **errp);

void hmp_drive_add_node(Monitor *mon, const char *optstr);

Expand Down
11 changes: 5 additions & 6 deletions include/block/blockjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#define BLOCK_JOB_SLICE_TIME 100000000ULL /* ns */

typedef struct BlockJobDriver BlockJobDriver;
typedef struct BlockJobTxn BlockJobTxn;
typedef struct JobTxn JobTxn;

/**
* BlockJob:
Expand Down Expand Up @@ -85,8 +85,7 @@ typedef struct BlockJob {
/** BlockDriverStates that are involved in this block job */
GSList *nodes;

BlockJobTxn *txn;
QLIST_ENTRY(BlockJob) txn_list;
JobTxn *txn;
} BlockJob;

/**
Expand Down Expand Up @@ -273,7 +272,7 @@ void block_job_iostatus_reset(BlockJob *job);
* group. Jobs wait for each other before completing. Cancelling one job
* cancels all jobs in the transaction.
*/
BlockJobTxn *block_job_txn_new(void);
JobTxn *block_job_txn_new(void);

/**
* block_job_txn_unref:
Expand All @@ -282,7 +281,7 @@ BlockJobTxn *block_job_txn_new(void);
* or block_job_txn_new. If it's the last reference to the object, it will be
* freed.
*/
void block_job_txn_unref(BlockJobTxn *txn);
void block_job_txn_unref(JobTxn *txn);

/**
* block_job_txn_add_job:
Expand All @@ -293,7 +292,7 @@ void block_job_txn_unref(BlockJobTxn *txn);
* The caller must call either block_job_txn_unref() or block_job_completed()
* to release the reference that is automatically grabbed here.
*/
void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
void block_job_txn_add_job(JobTxn *txn, BlockJob *job);

/**
* block_job_is_internal:
Expand Down
2 changes: 1 addition & 1 deletion include/block/blockjob_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct BlockJobDriver {
* called from a wrapper that is specific to the job type.
*/
void *block_job_create(const char *job_id, const BlockJobDriver *driver,
BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
JobTxn *txn, BlockDriverState *bs, uint64_t perm,
uint64_t shared_perm, int64_t speed, int flags,
BlockCompletionFunc *cb, void *opaque, Error **errp);

Expand Down
3 changes: 3 additions & 0 deletions include/qemu/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ typedef struct Job {

/** Element of the list of jobs */
QLIST_ENTRY(Job) job_list;

/** Element of the list of jobs in a job transaction */
QLIST_ENTRY(Job) txn_list;
} Job;

/**
Expand Down
8 changes: 4 additions & 4 deletions tests/test-blockjob-txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static const BlockJobDriver test_block_job_driver = {
*/
static BlockJob *test_block_job_start(unsigned int iterations,
bool use_timer,
int rc, int *result, BlockJobTxn *txn)
int rc, int *result, JobTxn *txn)
{
BlockDriverState *bs;
TestBlockJob *s;
Expand Down Expand Up @@ -122,7 +122,7 @@ static BlockJob *test_block_job_start(unsigned int iterations,
static void test_single_job(int expected)
{
BlockJob *job;
BlockJobTxn *txn;
JobTxn *txn;
int result = -EINPROGRESS;

txn = block_job_txn_new();
Expand Down Expand Up @@ -160,7 +160,7 @@ static void test_pair_jobs(int expected1, int expected2)
{
BlockJob *job1;
BlockJob *job2;
BlockJobTxn *txn;
JobTxn *txn;
int result1 = -EINPROGRESS;
int result2 = -EINPROGRESS;

Expand Down Expand Up @@ -222,7 +222,7 @@ static void test_pair_jobs_fail_cancel_race(void)
{
BlockJob *job1;
BlockJob *job2;
BlockJobTxn *txn;
JobTxn *txn;
int result1 = -EINPROGRESS;
int result2 = -EINPROGRESS;

Expand Down

0 comments on commit 62c9e41

Please sign in to comment.