summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2017-02-13 14:52:19 +0100
committerStefan Hajnoczi <stefanha@redhat.com>2017-02-21 11:14:07 +0000
commit0c330a734b51c177ab8488932ac3b0c4d63a718a (patch)
tree1251fc380ca5313495d9a9c541460b3ac2ffb7e0 /util
parentc2b38b277a7882a592f4f2ec955084b2b756daaa (diff)
downloadqemu-0c330a734b51c177ab8488932ac3b0c4d63a718a.tar.gz
aio: introduce aio_co_schedule and aio_co_wake
aio_co_wake provides the infrastructure to start a coroutine on a "home" AioContext. It will be used by CoMutex and CoQueue, so that coroutines don't jump from one context to another when they go to sleep on a mutex or waitqueue. However, it can also be used as a more efficient alternative to one-shot bottom halves, and saves the effort of tracking which AioContext a coroutine is running on. aio_co_schedule is the part of aio_co_wake that starts a coroutine on a remove AioContext, but it is also useful to implement e.g. bdrv_set_aio_context callbacks. The implementation of aio_co_schedule is based on a lock-free multiple-producer, single-consumer queue. The multiple producers use cmpxchg to add to a LIFO stack. The consumer (a per-AioContext bottom half) grabs all items added so far, inverts the list to make it FIFO, and goes through it one item at a time until it's empty. The data structure was inspired by OSv, which uses it in the very code we'll "port" to QEMU for the thread-safe CoMutex. Most of the new code is really tests. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Reviewed-by: Fam Zheng <famz@redhat.com> Message-id: 20170213135235.12274-3-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Diffstat (limited to 'util')
-rw-r--r--util/async.c65
-rw-r--r--util/qemu-coroutine.c8
-rw-r--r--util/trace-events4
3 files changed, 77 insertions, 0 deletions
diff --git a/util/async.c b/util/async.c
index 1fd97e1f15..9cac702c5b 100644
--- a/util/async.c
+++ b/util/async.c
@@ -31,6 +31,8 @@
#include "qemu/main-loop.h"
#include "qemu/atomic.h"
#include "block/raw-aio.h"
+#include "qemu/coroutine_int.h"
+#include "trace.h"
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
@@ -275,6 +277,9 @@ aio_ctx_finalize(GSource *source)
}
#endif
+ assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
+ qemu_bh_delete(ctx->co_schedule_bh);
+
qemu_lockcnt_lock(&ctx->list_lock);
assert(!qemu_lockcnt_count(&ctx->list_lock));
while (ctx->first_bh) {
@@ -364,6 +369,28 @@ static bool event_notifier_poll(void *opaque)
return atomic_read(&ctx->notified);
}
+static void co_schedule_bh_cb(void *opaque)
+{
+ AioContext *ctx = opaque;
+ QSLIST_HEAD(, Coroutine) straight, reversed;
+
+ QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
+ QSLIST_INIT(&straight);
+
+ while (!QSLIST_EMPTY(&reversed)) {
+ Coroutine *co = QSLIST_FIRST(&reversed);
+ QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
+ QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
+ }
+
+ while (!QSLIST_EMPTY(&straight)) {
+ Coroutine *co = QSLIST_FIRST(&straight);
+ QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
+ trace_aio_co_schedule_bh_cb(ctx, co);
+ qemu_coroutine_enter(co);
+ }
+}
+
AioContext *aio_context_new(Error **errp)
{
int ret;
@@ -379,6 +406,10 @@ AioContext *aio_context_new(Error **errp)
}
g_source_set_can_recurse(&ctx->source, true);
qemu_lockcnt_init(&ctx->list_lock);
+
+ ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
+ QSLIST_INIT(&ctx->scheduled_coroutines);
+
aio_set_event_notifier(ctx, &ctx->notifier,
false,
(EventNotifierHandler *)
@@ -402,6 +433,40 @@ fail:
return NULL;
}
+void aio_co_schedule(AioContext *ctx, Coroutine *co)
+{
+ trace_aio_co_schedule(ctx, co);
+ QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
+ co, co_scheduled_next);
+ qemu_bh_schedule(ctx->co_schedule_bh);
+}
+
+void aio_co_wake(struct Coroutine *co)
+{
+ AioContext *ctx;
+
+ /* Read coroutine before co->ctx. Matches smp_wmb in
+ * qemu_coroutine_enter.
+ */
+ smp_read_barrier_depends();
+ ctx = atomic_read(&co->ctx);
+
+ if (ctx != qemu_get_current_aio_context()) {
+ aio_co_schedule(ctx, co);
+ return;
+ }
+
+ if (qemu_in_coroutine()) {
+ Coroutine *self = qemu_coroutine_self();
+ assert(self != co);
+ QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
+ } else {
+ aio_context_acquire(ctx);
+ qemu_coroutine_enter(co);
+ aio_context_release(ctx);
+ }
+}
+
void aio_context_ref(AioContext *ctx)
{
g_source_ref(&ctx->source);
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index a5d2f6c0c3..415600dc30 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -19,6 +19,7 @@
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
+#include "block/aio.h"
enum {
POOL_BATCH_SIZE = 64,
@@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co)
}
co->caller = self;
+ co->ctx = qemu_get_current_aio_context();
+
+ /* Store co->ctx before anything that stores co. Matches
+ * barrier in aio_co_wake.
+ */
+ smp_wmb();
+
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
qemu_co_queue_run_restart(co);
diff --git a/util/trace-events b/util/trace-events
index 1fa12f0491..53bd70c4cd 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -6,6 +6,10 @@ run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
+# util/async.c
+aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
+aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
+
# util/thread-pool.c
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"