summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/io/task.h256
-rw-r--r--io/Makefile.objs1
-rw-r--r--io/task.c159
-rw-r--r--tests/.gitignore1
-rw-r--r--tests/Makefile3
-rw-r--r--tests/test-io-task.c268
-rw-r--r--trace-events9
7 files changed, 697 insertions, 0 deletions
diff --git a/include/io/task.h b/include/io/task.h
new file mode 100644
index 0000000000..2418714156
--- /dev/null
+++ b/include/io/task.h
@@ -0,0 +1,256 @@
+/*
+ * QEMU I/O task
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifndef QIO_TASK_H__
+#define QIO_TASK_H__
+
+#include "qemu-common.h"
+#include "qapi/error.h"
+#include "qom/object.h"
+
+typedef struct QIOTask QIOTask;
+
+typedef void (*QIOTaskFunc)(Object *source,
+ Error *err,
+ gpointer opaque);
+
+typedef int (*QIOTaskWorker)(QIOTask *task,
+ Error **errp,
+ gpointer opaque);
+
+/**
+ * QIOTask:
+ *
+ * The QIOTask object provides a simple mechanism for reporting
+ * success / failure of long running background operations.
+ *
+ * A object on which the operation is to be performed could have
+ * a public API which accepts a task callback:
+ *
+ * <example>
+ * <title>Task callback function signature</title>
+ * <programlisting>
+ * void myobject_operation(QMyObject *obj,
+ * QIOTaskFunc *func,
+ * gpointer opaque,
+ * GDestroyNotify *notify);
+ * </programlisting>
+ * </example>
+ *
+ * The 'func' parameter is the callback to be invoked, and 'opaque'
+ * is data to pass to it. The optional 'notify' function is used
+ * to free 'opaque' when no longer needed.
+ *
+ * Now, lets say the implementation of this method wants to set
+ * a timer to run once a second checking for completion of some
+ * activity. It would do something like
+ *
+ * <example>
+ * <title>Task callback function implementation</title>
+ * <programlisting>
+ * void myobject_operation(QMyObject *obj,
+ * QIOTaskFunc *func,
+ * gpointer opaque,
+ * GDestroyNotify *notify)
+ * {
+ * QIOTask *task;
+ *
+ * task = qio_task_new(OBJECT(obj), func, opaque, notify);
+ *
+ * g_timeout_add_full(G_PRIORITY_DEFAULT,
+ * 1000,
+ * myobject_operation_timer,
+ * task,
+ * NULL);
+ * }
+ * </programlisting>
+ * </example>
+ *
+ * It could equally have setup a watch on a file descriptor or
+ * created a background thread, or something else entirely.
+ * Notice that the source object is passed to the task, and
+ * QIOTask will hold a reference on that. This ensure that
+ * the QMyObject instance cannot be garbage collected while
+ * the async task is still in progress.
+ *
+ * In this case, myobject_operation_timer will fire after
+ * 3 secs and do
+ *
+ * <example>
+ * <title>Task timer function</title>
+ * <programlisting>
+ * gboolean myobject_operation_timer(gpointer opaque)
+ * {
+ * QIOTask *task = QIO_TASK(opaque);
+ * Error *err;*
+ *
+ * ...check something important...
+ * if (err) {
+ * qio_task_abort(task, err);
+ * error_free(task);
+ * return FALSE;
+ * } else if (...work is completed ...) {
+ * qio_task_complete(task);
+ * return FALSE;
+ * }
+ * ...carry on polling ...
+ * return TRUE;
+ * }
+ * </programlisting>
+ * </example>
+ *
+ * Once this function returns false, object_unref will be called
+ * automatically on the task causing it to be released and the
+ * ref on QMyObject dropped too.
+ *
+ * The QIOTask module can also be used to perform operations
+ * in a background thread context, while still reporting the
+ * results in the main event thread. This allows code which
+ * cannot easily be rewritten to be asychronous (such as DNS
+ * lookups) to be easily run non-blocking. Reporting the
+ * results in the main thread context means that the caller
+ * typically does not need to be concerned about thread
+ * safety wrt the QEMU global mutex.
+ *
+ * For example, the socket_listen() method will block the caller
+ * while DNS lookups take place if given a name, instead of IP
+ * address. The C library often do not provide a practical async
+ * DNS API, so the to get non-blocking DNS lookups in a portable
+ * manner requires use of a thread. So achieve a non-blocking
+ * socket listen using QIOTask would require:
+ *
+ * <example>
+ * static int myobject_listen_worker(QIOTask *task,
+ * Error **errp,
+ * gpointer opaque)
+ * {
+ * QMyObject obj = QMY_OBJECT(qio_task_get_source(task));
+ * SocketAddress *addr = opaque;
+ *
+ * obj->fd = socket_listen(addr, errp);
+ * if (obj->fd < 0) {
+ * return -1;
+ * }
+ * return 0;
+ * }
+ *
+ * void myobject_listen_async(QMyObject *obj,
+ * SocketAddress *addr,
+ * QIOTaskFunc *func,
+ * gpointer opaque,
+ * GDestroyNotify *notify)
+ * {
+ * QIOTask *task;
+ * SocketAddress *addrCopy;
+ *
+ * qapi_copy_SocketAddress(&addrCopy, addr);
+ * task = qio_task_new(OBJECT(obj), func, opaque, notify);
+ *
+ * qio_task_run_in_thread(task, myobject_listen_worker,
+ * addrCopy,
+ * qapi_free_SocketAddress);
+ * }
+ * </example>
+ *
+ * NB, The 'func' callback passed into myobject_listen_async
+ * will be invoked from the main event thread, despite the
+ * actual operation being performed in a different thread.
+ */
+
+/**
+ * qio_task_new:
+ * @source: the object on which the operation is invoked
+ * @func: the callback to invoke when the task completes
+ * @opaque: opaque data to pass to @func when invoked
+ * @destroy: optional callback to free @opaque
+ *
+ * Creates a new task struct to track completion of a
+ * background operation running on the object @source.
+ * When the operation completes or fails, the callback
+ * @func will be invoked. The callback can access the
+ * 'err' attribute in the task object to determine if
+ * the operation was successful or not.
+ *
+ * The returned task will be released when one of
+ * qio_task_abort() or qio_task_complete() are invoked.
+ *
+ * Returns: the task struct
+ */
+QIOTask *qio_task_new(Object *source,
+ QIOTaskFunc func,
+ gpointer opaque,
+ GDestroyNotify destroy);
+
+/**
+ * qio_task_run_in_thread:
+ * @task: the task struct
+ * @worker: the function to invoke in a thread
+ * @opaque: opaque data to pass to @worker
+ * @destroy: function to free @opaque
+ *
+ * Run a task in a background thread. If @worker
+ * returns 0 it will call qio_task_complete() in
+ * the main event thread context. If @worker
+ * returns -1 it will call qio_task_abort() in
+ * the main event thread context.
+ */
+void qio_task_run_in_thread(QIOTask *task,
+ QIOTaskWorker worker,
+ gpointer opaque,
+ GDestroyNotify destroy);
+
+/**
+ * qio_task_complete:
+ * @task: the task struct
+ *
+ * Mark the operation as succesfully completed
+ * and free the memory for @task.
+ */
+void qio_task_complete(QIOTask *task);
+
+/**
+ * qio_task_abort:
+ * @task: the task struct
+ * @err: the error to record for the operation
+ *
+ * Mark the operation as failed, with @err providing
+ * details about the failure. The @err may be freed
+ * afer the function returns, as the notification
+ * callback is invoked synchronously. The @task will
+ * be freed when this call completes.
+ */
+void qio_task_abort(QIOTask *task,
+ Error *err);
+
+
+/**
+ * qio_task_get_source:
+ * @task: the task struct
+ *
+ * Get the source object associated with the background
+ * task. This returns a new reference to the object,
+ * which the caller must released with object_unref()
+ * when no longer required.
+ *
+ * Returns: the source object
+ */
+Object *qio_task_get_source(QIOTask *task);
+
+#endif /* QIO_TASK_H__ */
diff --git a/io/Makefile.objs b/io/Makefile.objs
index b02ea908ef..503b95c30b 100644
--- a/io/Makefile.objs
+++ b/io/Makefile.objs
@@ -1,2 +1,3 @@
io-obj-y = channel.o
io-obj-y += channel-watch.o
+io-obj-y += task.o
diff --git a/io/task.c b/io/task.c
new file mode 100644
index 0000000000..3127fca771
--- /dev/null
+++ b/io/task.c
@@ -0,0 +1,159 @@
+/*
+ * QEMU I/O task
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "io/task.h"
+#include "qemu/thread.h"
+#include "trace.h"
+
+struct QIOTask {
+ Object *source;
+ QIOTaskFunc func;
+ gpointer opaque;
+ GDestroyNotify destroy;
+};
+
+
+QIOTask *qio_task_new(Object *source,
+ QIOTaskFunc func,
+ gpointer opaque,
+ GDestroyNotify destroy)
+{
+ QIOTask *task;
+
+ task = g_new0(QIOTask, 1);
+
+ task->source = source;
+ object_ref(source);
+ task->func = func;
+ task->opaque = opaque;
+ task->destroy = destroy;
+
+ trace_qio_task_new(task, source, func, opaque);
+
+ return task;
+}
+
+static void qio_task_free(QIOTask *task)
+{
+ if (task->destroy) {
+ task->destroy(task->opaque);
+ }
+ object_unref(task->source);
+
+ g_free(task);
+}
+
+
+struct QIOTaskThreadData {
+ QIOTask *task;
+ QIOTaskWorker worker;
+ gpointer opaque;
+ GDestroyNotify destroy;
+ Error *err;
+ int ret;
+};
+
+
+static gboolean gio_task_thread_result(gpointer opaque)
+{
+ struct QIOTaskThreadData *data = opaque;
+
+ trace_qio_task_thread_result(data->task);
+ if (data->ret == 0) {
+ qio_task_complete(data->task);
+ } else {
+ qio_task_abort(data->task, data->err);
+ }
+
+ error_free(data->err);
+ if (data->destroy) {
+ data->destroy(data->opaque);
+ }
+
+ g_free(data);
+
+ return FALSE;
+}
+
+
+static gpointer qio_task_thread_worker(gpointer opaque)
+{
+ struct QIOTaskThreadData *data = opaque;
+
+ trace_qio_task_thread_run(data->task);
+ data->ret = data->worker(data->task, &data->err, data->opaque);
+ if (data->ret < 0 && data->err == NULL) {
+ error_setg(&data->err, "Task worker failed but did not set an error");
+ }
+
+ /* We're running in the background thread, and must only
+ * ever report the task results in the main event loop
+ * thread. So we schedule an idle callback to report
+ * the worker results
+ */
+ trace_qio_task_thread_exit(data->task);
+ g_idle_add(gio_task_thread_result, data);
+ return NULL;
+}
+
+
+void qio_task_run_in_thread(QIOTask *task,
+ QIOTaskWorker worker,
+ gpointer opaque,
+ GDestroyNotify destroy)
+{
+ struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
+ QemuThread thread;
+
+ data->task = task;
+ data->worker = worker;
+ data->opaque = opaque;
+ data->destroy = destroy;
+
+ trace_qio_task_thread_start(task, worker, opaque);
+ qemu_thread_create(&thread,
+ "io-task-worker",
+ qio_task_thread_worker,
+ data,
+ QEMU_THREAD_DETACHED);
+}
+
+
+void qio_task_complete(QIOTask *task)
+{
+ task->func(task->source, NULL, task->opaque);
+ trace_qio_task_complete(task);
+ qio_task_free(task);
+}
+
+void qio_task_abort(QIOTask *task,
+ Error *err)
+{
+ task->func(task->source, err, task->opaque);
+ trace_qio_task_abort(task);
+ qio_task_free(task);
+}
+
+
+Object *qio_task_get_source(QIOTask *task)
+{
+ object_ref(task->source);
+ return task->source;
+}
diff --git a/tests/.gitignore b/tests/.gitignore
index 1e55722b6a..eec12cc2db 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -24,6 +24,7 @@ test-cutils
test-hbitmap
test-int128
test-iov
+test-io-task
test-mul64
test-opts-visitor
test-qapi-event.[ch]
diff --git a/tests/Makefile b/tests/Makefile
index 053c1ae481..515a7c7c30 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -84,6 +84,7 @@ check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlscredsx509$(EXESUF)
check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlssession$(EXESUF)
check-unit-$(CONFIG_LINUX) += tests/test-qga$(EXESUF)
check-unit-y += tests/test-timed-average$(EXESUF)
+check-unit-y += tests/test-io-task$(EXESUF)
check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh
@@ -381,6 +382,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
$(test-qom-obj-y)
test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
test-block-obj-y = $(block-obj-y) $(test-crypto-obj-y)
+test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
@@ -469,6 +471,7 @@ tests/test-crypto-tlscredsx509$(EXESUF): tests/test-crypto-tlscredsx509.o \
tests/test-crypto-tlssession.o-cflags := $(TASN1_CFLAGS)
tests/test-crypto-tlssession$(EXESUF): tests/test-crypto-tlssession.o \
tests/crypto-tls-x509-helpers.o tests/pkix_asn1_tab.o $(test-crypto-obj-y)
+tests/test-io-task$(EXESUF): tests/test-io-task.o $(test-io-obj-y)
libqos-obj-y = tests/libqos/pci.o tests/libqos/fw_cfg.o tests/libqos/malloc.o
libqos-obj-y += tests/libqos/i2c.o tests/libqos/libqos.o
diff --git a/tests/test-io-task.c b/tests/test-io-task.c
new file mode 100644
index 0000000000..3344382c7f
--- /dev/null
+++ b/tests/test-io-task.c
@@ -0,0 +1,268 @@
+/*
+ * QEMU I/O task tests
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <glib.h>
+
+#include "io/task.h"
+
+#define TYPE_DUMMY "qemu:dummy"
+
+typedef struct DummyObject DummyObject;
+typedef struct DummyObjectClass DummyObjectClass;
+
+struct DummyObject {
+ Object parent;
+};
+
+struct DummyObjectClass {
+ ObjectClass parent;
+};
+
+static const TypeInfo dummy_info = {
+ .parent = TYPE_OBJECT,
+ .name = TYPE_DUMMY,
+ .instance_size = sizeof(DummyObject),
+ .class_size = sizeof(DummyObjectClass),
+};
+
+struct TestTaskData {
+ Object *source;
+ Error *err;
+ bool freed;
+};
+
+
+static void task_callback(Object *source,
+ Error *err,
+ gpointer opaque)
+{
+ struct TestTaskData *data = opaque;
+
+ data->source = source;
+ data->err = err;
+}
+
+
+static void test_task_complete(void)
+{
+ QIOTask *task;
+ Object *obj = object_new(TYPE_DUMMY);
+ Object *src;
+ struct TestTaskData data = { NULL, NULL, false };
+
+ task = qio_task_new(obj, task_callback, &data, NULL);
+ src = qio_task_get_source(task);
+
+ qio_task_complete(task);
+
+ g_assert(obj == src);
+
+ object_unref(obj);
+ object_unref(src);
+
+ g_assert(data.source == obj);
+ g_assert(data.err == NULL);
+ g_assert(data.freed == false);
+}
+
+
+static void task_data_free(gpointer opaque)
+{
+ struct TestTaskData *data = opaque;
+
+ data->freed = true;
+}
+
+
+static void test_task_data_free(void)
+{
+ QIOTask *task;
+ Object *obj = object_new(TYPE_DUMMY);
+ struct TestTaskData data = { NULL, NULL, false };
+
+ task = qio_task_new(obj, task_callback, &data, task_data_free);
+
+ qio_task_complete(task);
+
+ object_unref(obj);
+
+ g_assert(data.source == obj);
+ g_assert(data.err == NULL);
+ g_assert(data.freed == true);
+}
+
+
+static void test_task_error(void)
+{
+ QIOTask *task;
+ Object *obj = object_new(TYPE_DUMMY);
+ struct TestTaskData data = { NULL, NULL, false };
+ Error *err = NULL;
+
+ task = qio_task_new(obj, task_callback, &data, NULL);
+
+ error_setg(&err, "Some error");
+
+ qio_task_abort(task, err);
+
+ error_free(err);
+ object_unref(obj);
+
+ g_assert(data.source == obj);
+ g_assert(data.err == err);
+ g_assert(data.freed == false);
+
+}
+
+
+struct TestThreadWorkerData {
+ Object *source;
+ Error *err;
+ bool fail;
+ GThread *worker;
+ GThread *complete;
+ GMainLoop *loop;
+};
+
+static int test_task_thread_worker(QIOTask *task,
+ Error **errp,
+ gpointer opaque)
+{
+ struct TestThreadWorkerData *data = opaque;
+
+ data->worker = g_thread_self();
+
+ if (data->fail) {
+ error_setg(errp, "Testing fail");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static void test_task_thread_callback(Object *source,
+ Error *err,
+ gpointer opaque)
+{
+ struct TestThreadWorkerData *data = opaque;
+
+ data->source = source;
+ data->err = err;
+
+ data->complete = g_thread_self();
+
+ g_main_loop_quit(data->loop);
+}
+
+
+static void test_task_thread_complete(void)
+{
+ QIOTask *task;
+ Object *obj = object_new(TYPE_DUMMY);
+ struct TestThreadWorkerData data = { 0 };
+ GThread *self;
+
+ data.loop = g_main_loop_new(g_main_context_default(),
+ TRUE);
+
+ task = qio_task_new(obj,
+ test_task_thread_callback,
+ &data,
+ NULL);
+
+ qio_task_run_in_thread(task,
+ test_task_thread_worker,
+ &data,
+ NULL);
+
+ g_main_loop_run(data.loop);
+
+ g_main_loop_unref(data.loop);
+ object_unref(obj);
+
+ g_assert(data.source == obj);
+ g_assert(data.err == NULL);
+
+ self = g_thread_self();
+
+ /* Make sure the test_task_thread_worker actually got
+ * run in a different thread */
+ g_assert(data.worker != self);
+
+ /* And that the test_task_thread_callback got rnu in
+ * the main loop thread (ie this one) */
+ g_assert(data.complete == self);
+}
+
+
+static void test_task_thread_error(void)
+{
+ QIOTask *task;
+ Object *obj = object_new(TYPE_DUMMY);
+ struct TestThreadWorkerData data = { 0 };
+ GThread *self;
+
+ data.loop = g_main_loop_new(g_main_context_default(),
+ TRUE);
+ data.fail = true;
+
+ task = qio_task_new(obj,
+ test_task_thread_callback,
+ &data,
+ NULL);
+
+ qio_task_run_in_thread(task,
+ test_task_thread_worker,
+ &data,
+ NULL);
+
+ g_main_loop_run(data.loop);
+
+ g_main_loop_unref(data.loop);
+ object_unref(obj);
+
+ g_assert(data.source == obj);
+ g_assert(data.err != NULL);
+
+ self = g_thread_self();
+
+ /* Make sure the test_task_thread_worker actually got
+ * run in a different thread */
+ g_assert(data.worker != self);
+
+ /* And that the test_task_thread_callback got rnu in
+ * the main loop thread (ie this one) */
+ g_assert(data.complete == self);
+}
+
+
+int main(int argc, char **argv)
+{
+ g_test_init(&argc, &argv, NULL);
+ module_call_init(MODULE_INIT_QOM);
+ type_register_static(&dummy_info);
+ g_test_add_func("/crypto/task/complete", test_task_complete);
+ g_test_add_func("/crypto/task/datafree", test_task_data_free);
+ g_test_add_func("/crypto/task/error", test_task_error);
+ g_test_add_func("/crypto/task/thread_complete", test_task_thread_complete);
+ g_test_add_func("/crypto/task/thread_error", test_task_thread_error);
+ return g_test_run();
+}
diff --git a/trace-events b/trace-events
index fa504cf494..acf2484b7d 100644
--- a/trace-events
+++ b/trace-events
@@ -1808,3 +1808,12 @@ user_handle_signal(void *env, int target_sig) "env=%p signal %d"
user_host_signal(void *env, int host_sig, int target_sig) "env=%p signal %d (target %d("
user_queue_signal(void *env, int target_sig) "env=%p signal %d"
user_s390x_restore_sigregs(void *env, uint64_t sc_psw_addr, uint64_t env_psw_addr) "env=%p frame psw.addr "PRIx64 " current psw.addr "PRIx64""
+
+# io/task.c
+qio_task_new(void *task, void *source, void *func, void *opaque) "Task new task=%p source=%p func=%p opaque=%p"
+qio_task_complete(void *task) "Task complete task=%p"
+qio_task_abort(void *task) "Task abort task=%p"
+qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start task=%p worker=%p opaque=%p"
+qio_task_thread_run(void *task) "Task thread run task=%p"
+qio_task_thread_exit(void *task) "Task thread exit task=%p"
+qio_task_thread_result(void *task) "Task thread result task=%p"