Skip to content

Commit

Permalink
io: change the QIOTask callback signature
Browse files Browse the repository at this point in the history
Currently the QIOTaskFunc signature takes an Object * for
the source, and an Error * for any error. We also need to
be able to provide a result pointer. Rather than continue
to add parameters to QIOTaskFunc, remove the existing
ones and simply pass the QIOTask object instead. This
has methods to access all the other data items required
in the callback impl.

Reviewed-by: Eric Blake <[email protected]>
Signed-off-by: Daniel P. Berrange <[email protected]>
  • Loading branch information
berrange committed Jan 23, 2017
1 parent 1a447e4 commit 60e705c
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 112 deletions.
73 changes: 42 additions & 31 deletions include/io/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@

typedef struct QIOTask QIOTask;

typedef void (*QIOTaskFunc)(Object *source,
Error *err,
typedef void (*QIOTaskFunc)(QIOTask *task,
gpointer opaque);

typedef int (*QIOTaskWorker)(QIOTask *task,
Expand All @@ -44,7 +43,7 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* a public API which accepts a task callback:
*
* <example>
* <title>Task callback function signature</title>
* <title>Task function signature</title>
* <programlisting>
* void myobject_operation(QMyObject *obj,
* QIOTaskFunc *func,
Expand All @@ -57,12 +56,36 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* is data to pass to it. The optional 'notify' function is used
* to free 'opaque' when no longer needed.
*
* Now, lets say the implementation of this method wants to set
* a timer to run once a second checking for completion of some
* activity. It would do something like
* When the operation completes, the 'func' callback will be
* invoked, allowing the calling code to determine the result
* of the operation. An example QIOTaskFunc implementation may
* look like
*
* <example>
* <title>Task callback function implementation</title>
* <title>Task callback implementation</title>
* <programlisting>
* static void myobject_operation_notify(QIOTask *task,
* gpointer opaque)
* {
* Error *err = NULL;
* if (qio_task_propagate_error(task, &err)) {
* ...deal with the failure...
* error_free(err);
* } else {
* QMyObject *src = QMY_OBJECT(qio_task_get_source(task));
* ...deal with the completion...
* }
* }
* </programlisting>
* </example>
*
* Now, lets say the implementation of the method using the
* task wants to set a timer to run once a second checking
* for completion of some activity. It would do something
* like
*
* <example>
* <title>Task function implementation</title>
* <programlisting>
* void myobject_operation(QMyObject *obj,
* QIOTaskFunc *func,
Expand Down Expand Up @@ -102,8 +125,8 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
*
* ...check something important...
* if (err) {
* qio_task_abort(task, err);
* error_free(task);
* qio_task_set_error(task, err);
* qio_task_complete(task);
* return FALSE;
* } else if (...work is completed ...) {
* qio_task_complete(task);
Expand All @@ -115,6 +138,10 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* </programlisting>
* </example>
*
* The 'qio_task_complete' call in this method will trigger
* the callback func 'myobject_operation_notify' shown
* earlier to deal with the results.
*
* Once this function returns false, object_unref will be called
* automatically on the task causing it to be released and the
* ref on QMyObject dropped too.
Expand Down Expand Up @@ -187,8 +214,8 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* 'err' attribute in the task object to determine if
* the operation was successful or not.
*
* The returned task will be released when one of
* qio_task_abort() or qio_task_complete() are invoked.
* The returned task will be released when qio_task_complete()
* is invoked.
*
* Returns: the task struct
*/
Expand All @@ -204,10 +231,8 @@ QIOTask *qio_task_new(Object *source,
* @opaque: opaque data to pass to @worker
* @destroy: function to free @opaque
*
* Run a task in a background thread. If @worker
* returns 0 it will call qio_task_complete() in
* the main event thread context. If @worker
* returns -1 it will call qio_task_abort() in
* Run a task in a background thread. When @worker
* returns it will call qio_task_complete() in
* the main event thread context.
*/
void qio_task_run_in_thread(QIOTask *task,
Expand All @@ -219,25 +244,11 @@ void qio_task_run_in_thread(QIOTask *task,
* qio_task_complete:
* @task: the task struct
*
* Mark the operation as successfully completed
* and free the memory for @task.
* Invoke the completion callback for @task and
* then free its memory.
*/
void qio_task_complete(QIOTask *task);

/**
* qio_task_abort:
* @task: the task struct
* @err: the error to record for the operation
*
* Mark the operation as failed, with @err providing
* details about the failure. The @err may be freed
* afer the function returns, as the notification
* callback is invoked synchronously. The @task will
* be freed when this call completes.
*/
void qio_task_abort(QIOTask *task,
Error *err);


/**
* qio_task_set_error:
Expand Down
14 changes: 6 additions & 8 deletions io/channel-tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,

if (qcrypto_tls_session_handshake(ioc->session, &err) < 0) {
trace_qio_channel_tls_handshake_fail(ioc);
qio_task_abort(task, err);
goto cleanup;
qio_task_set_error(task, err);
qio_task_complete(task);
return;
}

status = qcrypto_tls_session_get_handshake_status(ioc->session);
Expand All @@ -163,10 +164,10 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
if (qcrypto_tls_session_check_credentials(ioc->session,
&err) < 0) {
trace_qio_channel_tls_credentials_deny(ioc);
qio_task_abort(task, err);
goto cleanup;
qio_task_set_error(task, err);
} else {
trace_qio_channel_tls_credentials_allow(ioc);
}
trace_qio_channel_tls_credentials_allow(ioc);
qio_task_complete(task);
} else {
GIOCondition condition;
Expand All @@ -183,9 +184,6 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
task,
NULL);
}

cleanup:
error_free(err);
}


Expand Down
8 changes: 4 additions & 4 deletions io/channel-websock.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ static gboolean qio_channel_websock_handshake_send(QIOChannel *ioc,

if (ret < 0) {
trace_qio_channel_websock_handshake_fail(ioc);
qio_task_abort(task, err);
error_free(err);
qio_task_set_error(task, err);
qio_task_complete(task);
return FALSE;
}

Expand All @@ -307,8 +307,8 @@ static gboolean qio_channel_websock_handshake_io(QIOChannel *ioc,
ret = qio_channel_websock_handshake_read(wioc, &err);
if (ret < 0) {
trace_qio_channel_websock_handshake_fail(ioc);
qio_task_abort(task, err);
error_free(err);
qio_task_set_error(task, err);
qio_task_complete(task);
return FALSE;
}
if (ret == 0) {
Expand Down
18 changes: 4 additions & 14 deletions io/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ static gboolean gio_task_thread_result(gpointer opaque)
struct QIOTaskThreadData *data = opaque;

trace_qio_task_thread_result(data->task);
if (data->ret == 0) {
qio_task_complete(data->task);
} else {
qio_task_abort(data->task, data->err);
if (data->err) {
qio_task_set_error(data->task, data->err);
}
qio_task_complete(data->task);

error_free(data->err);
if (data->destroy) {
data->destroy(data->opaque);
}
Expand Down Expand Up @@ -149,19 +147,11 @@ void qio_task_run_in_thread(QIOTask *task,

void qio_task_complete(QIOTask *task)
{
task->func(task->source, NULL, task->opaque);
task->func(task, task->opaque);
trace_qio_task_complete(task);
qio_task_free(task);
}

void qio_task_abort(QIOTask *task,
Error *err)
{
task->func(task->source, err, task->opaque);
trace_qio_task_abort(task);
qio_task_free(task);
}


void qio_task_set_error(QIOTask *task,
Error *err)
Expand Down
1 change: 0 additions & 1 deletion io/trace-events
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# io/task.c
qio_task_new(void *task, void *source, void *func, void *opaque) "Task new task=%p source=%p func=%p opaque=%p"
qio_task_complete(void *task) "Task complete task=%p"
qio_task_abort(void *task) "Task abort task=%p"
qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start task=%p worker=%p opaque=%p"
qio_task_thread_run(void *task) "Task thread run task=%p"
qio_task_thread_exit(void *task) "Task thread exit task=%p"
Expand Down
11 changes: 6 additions & 5 deletions migration/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,23 @@ static void socket_connect_data_free(void *opaque)
g_free(data);
}

static void socket_outgoing_migration(Object *src,
Error *err,
static void socket_outgoing_migration(QIOTask *task,
gpointer opaque)
{
struct SocketConnectData *data = opaque;
QIOChannel *sioc = QIO_CHANNEL(src);
QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;

if (err) {
if (qio_task_propagate_error(task, &err)) {
trace_migration_socket_outgoing_error(error_get_pretty(err));
data->s->to_dst_file = NULL;
migrate_fd_error(data->s, err);
error_free(err);
} else {
trace_migration_socket_outgoing_connected(data->hostname);
migration_channel_connect(data->s, sioc, data->hostname);
}
object_unref(src);
object_unref(OBJECT(sioc));
}

static void socket_start_outgoing_migration(MigrationState *s,
Expand Down
19 changes: 10 additions & 9 deletions migration/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ migration_tls_get_creds(MigrationState *s,
}


static void migration_tls_incoming_handshake(Object *src,
Error *err,
static void migration_tls_incoming_handshake(QIOTask *task,
gpointer opaque)
{
QIOChannel *ioc = QIO_CHANNEL(src);
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;

if (err) {
if (qio_task_propagate_error(task, &err)) {
trace_migration_tls_incoming_handshake_error(error_get_pretty(err));
error_report("%s", error_get_pretty(err));
error_report_err(err);
} else {
trace_migration_tls_incoming_handshake_complete();
migration_channel_process_incoming(migrate_get_current(), ioc);
Expand Down Expand Up @@ -107,17 +107,18 @@ void migration_tls_channel_process_incoming(MigrationState *s,
}


static void migration_tls_outgoing_handshake(Object *src,
Error *err,
static void migration_tls_outgoing_handshake(QIOTask *task,
gpointer opaque)
{
MigrationState *s = opaque;
QIOChannel *ioc = QIO_CHANNEL(src);
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;

if (err) {
if (qio_task_propagate_error(task, &err)) {
trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
s->to_dst_file = NULL;
migrate_fd_error(s, err);
error_free(err);
} else {
trace_migration_tls_outgoing_handshake_complete();
migration_channel_connect(s, ioc, NULL);
Expand Down
8 changes: 3 additions & 5 deletions nbd/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,13 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
}


void nbd_tls_handshake(Object *src,
Error *err,
void nbd_tls_handshake(QIOTask *task,
void *opaque)
{
struct NBDTLSHandshakeData *data = opaque;

if (err) {
TRACE("TLS failed %s", error_get_pretty(err));
data->error = error_copy(err);
if (qio_task_propagate_error(task, &data->error)) {
TRACE("TLS failed %s", error_get_pretty(data->error));
}
data->complete = true;
g_main_loop_quit(data->loop);
Expand Down
3 changes: 1 addition & 2 deletions nbd/nbd-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ struct NBDTLSHandshakeData {
};


void nbd_tls_handshake(Object *src,
Error *err,
void nbd_tls_handshake(QIOTask *task,
void *opaque);

#endif
18 changes: 10 additions & 8 deletions qemu-char.c
Original file line number Diff line number Diff line change
Expand Up @@ -3277,14 +3277,13 @@ static void tcp_chr_telnet_init(CharDriverState *chr)
}


static void tcp_chr_tls_handshake(Object *source,
Error *err,
static void tcp_chr_tls_handshake(QIOTask *task,
gpointer user_data)
{
CharDriverState *chr = user_data;
TCPCharDriver *s = chr->opaque;

if (err) {
if (qio_task_propagate_error(task, NULL)) {
tcp_chr_disconnect(chr);
} else {
if (s->do_telnetopt) {
Expand Down Expand Up @@ -3492,20 +3491,23 @@ static void tcp_chr_free(CharDriverState *chr)
}


static void qemu_chr_socket_connected(Object *src, Error *err, void *opaque)
static void qemu_chr_socket_connected(QIOTask *task, void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(src);
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
CharDriverState *chr = opaque;
TCPCharDriver *s = chr->opaque;
Error *err = NULL;

if (err) {
if (qio_task_propagate_error(task, &err)) {
check_report_connect_error(chr, err);
object_unref(src);
return;
error_free(err);
goto cleanup;
}

s->connect_err_reported = false;
tcp_chr_new_client(chr, sioc);

cleanup:
object_unref(OBJECT(sioc));
}

Expand Down
Loading

0 comments on commit 60e705c

Please sign in to comment.