From 10817bf09d5f8cb22711fb0ee8d8da49f6f05f89 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Tue, 1 Sep 2015 14:48:02 +0100 Subject: coroutine: move into libqemuutil.a library The coroutine files are currently referenced by the block-obj-y variable. The coroutine functionality though is already used by more than just the block code. eg migration code uses coroutine yield. In the future the I/O channel code will also use the coroutine yield functionality. Since the coroutine code is nicely self-contained it can be easily built as part of the libqemuutil.a library, making it widely available. The headers are also moved into include/qemu, instead of the include/block directory, since they are now part of the util codebase, and the impl was never in the block/ directory either. Signed-off-by: Daniel P. Berrange --- MAINTAINERS | 7 + Makefile.objs | 4 - block.c | 2 +- block/qcow2.h | 2 +- block/vdi.c | 2 +- block/write-threshold.c | 2 +- blockjob.c | 2 +- coroutine-gthread.c | 198 ---------------------------- coroutine-sigaltstack.c | 293 ------------------------------------------ coroutine-ucontext.c | 194 ---------------------------- coroutine-win32.c | 101 --------------- hw/9pfs/codir.c | 2 +- hw/9pfs/cofile.c | 2 +- hw/9pfs/cofs.c | 2 +- hw/9pfs/coxattr.c | 2 +- hw/9pfs/virtio-9p-coth.c | 2 +- hw/9pfs/virtio-9p-coth.h | 2 +- hw/9pfs/virtio-9p.h | 2 +- include/block/block.h | 2 +- include/block/block_int.h | 2 +- include/block/coroutine.h | 219 ------------------------------- include/block/coroutine_int.h | 54 -------- include/qemu/coroutine.h | 219 +++++++++++++++++++++++++++++++ include/qemu/coroutine_int.h | 54 ++++++++ migration/qemu-file-buf.c | 2 +- migration/qemu-file-stdio.c | 2 +- migration/qemu-file-unix.c | 2 +- migration/qemu-file.c | 2 +- migration/rdma.c | 2 +- nbd.c | 2 +- qemu-coroutine-io.c | 91 ------------- qemu-coroutine-lock.c | 186 --------------------------- qemu-coroutine-sleep.c | 41 ------ qemu-coroutine.c | 146 --------------------- tests/test-coroutine.c | 4 +- tests/test-vmstate.c | 2 +- thread-pool.c | 2 +- util/Makefile.objs | 3 + util/coroutine-gthread.c | 198 ++++++++++++++++++++++++++++ util/coroutine-sigaltstack.c | 293 ++++++++++++++++++++++++++++++++++++++++++ util/coroutine-ucontext.c | 194 ++++++++++++++++++++++++++++ util/coroutine-win32.c | 101 +++++++++++++++ util/qemu-coroutine-io.c | 91 +++++++++++++ util/qemu-coroutine-lock.c | 186 +++++++++++++++++++++++++++ util/qemu-coroutine-sleep.c | 41 ++++++ util/qemu-coroutine.c | 146 +++++++++++++++++++++ 46 files changed, 1557 insertions(+), 1551 deletions(-) delete mode 100644 coroutine-gthread.c delete mode 100644 coroutine-sigaltstack.c delete mode 100644 coroutine-ucontext.c delete mode 100644 coroutine-win32.c delete mode 100644 include/block/coroutine.h delete mode 100644 include/block/coroutine_int.h create mode 100644 include/qemu/coroutine.h create mode 100644 include/qemu/coroutine_int.h delete mode 100644 qemu-coroutine-io.c delete mode 100644 qemu-coroutine-lock.c delete mode 100644 qemu-coroutine-sleep.c delete mode 100644 qemu-coroutine.c create mode 100644 util/coroutine-gthread.c create mode 100644 util/coroutine-sigaltstack.c create mode 100644 util/coroutine-ucontext.c create mode 100644 util/coroutine-win32.c create mode 100644 util/qemu-coroutine-io.c create mode 100644 util/qemu-coroutine-lock.c create mode 100644 util/qemu-coroutine-sleep.c create mode 100644 util/qemu-coroutine.c diff --git a/MAINTAINERS b/MAINTAINERS index 01fb6e26e2..78e14fc132 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -1193,6 +1193,13 @@ F: crypto/ F: include/crypto/ F: tests/test-crypto-* +Coroutines +M: Stefan Hajnoczi +M: Kevin Wolf +F: util/*coroutine* +F: include/qemu/coroutine* +F: tests/test-coroutine.c + Usermode Emulation ------------------ Overall diff --git a/Makefile.objs b/Makefile.objs index bc43e5c1dd..ecfe03c195 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -15,10 +15,6 @@ block-obj-$(CONFIG_WIN32) += aio-win32.o block-obj-y += block/ block-obj-y += qemu-io-cmds.o -block-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o -block-obj-y += qemu-coroutine-sleep.o -block-obj-y += coroutine-$(CONFIG_COROUTINE_BACKEND).o - block-obj-m = block/ ####################################################################### diff --git a/block.c b/block.c index 09f2a754f1..6771c3a1a1 100644 --- a/block.c +++ b/block.c @@ -33,7 +33,7 @@ #include "sysemu/block-backend.h" #include "sysemu/sysemu.h" #include "qemu/notify.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "block/qapi.h" #include "qmp-commands.h" #include "qemu/timer.h" diff --git a/block/qcow2.h b/block/qcow2.h index 351226302f..b8c500b9dc 100644 --- a/block/qcow2.h +++ b/block/qcow2.h @@ -26,7 +26,7 @@ #define BLOCK_QCOW2_H #include "crypto/cipher.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" //#define DEBUG_ALLOC //#define DEBUG_ALLOC2 diff --git a/block/vdi.c b/block/vdi.c index 17626d4f4e..17f435fad6 100644 --- a/block/vdi.c +++ b/block/vdi.c @@ -53,7 +53,7 @@ #include "block/block_int.h" #include "qemu/module.h" #include "migration/migration.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #if defined(CONFIG_UUID) #include diff --git a/block/write-threshold.c b/block/write-threshold.c index a53c1f5e65..0fe38917c5 100644 --- a/block/write-threshold.c +++ b/block/write-threshold.c @@ -11,7 +11,7 @@ */ #include "block/block_int.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "block/write-threshold.h" #include "qemu/notify.h" #include "qapi-event.h" diff --git a/blockjob.c b/blockjob.c index d87869c24a..1da5491228 100644 --- a/blockjob.c +++ b/blockjob.c @@ -31,7 +31,7 @@ #include "block/block_int.h" #include "qapi/qmp/qerror.h" #include "qapi/qmp/qjson.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "qmp-commands.h" #include "qemu/timer.h" #include "qapi-event.h" diff --git a/coroutine-gthread.c b/coroutine-gthread.c deleted file mode 100644 index 6bd6d6b22f..0000000000 --- a/coroutine-gthread.c +++ /dev/null @@ -1,198 +0,0 @@ -/* - * GThread coroutine initialization code - * - * Copyright (C) 2006 Anthony Liguori - * Copyright (C) 2011 Aneesh Kumar K.V - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.0 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, see . - */ - -#include -#include "qemu-common.h" -#include "block/coroutine_int.h" - -typedef struct { - Coroutine base; - GThread *thread; - bool runnable; - bool free_on_thread_exit; - CoroutineAction action; -} CoroutineGThread; - -static CompatGMutex coroutine_lock; -static CompatGCond coroutine_cond; - -/* GLib 2.31 and beyond deprecated various parts of the thread API, - * but the new interfaces are not available in older GLib versions - * so we have to cope with both. - */ -#if GLIB_CHECK_VERSION(2, 31, 0) -/* Awkwardly, the GPrivate API doesn't provide a way to update the - * GDestroyNotify handler for the coroutine key dynamically. So instead - * we track whether or not the CoroutineGThread should be freed on - * thread exit / coroutine key update using the free_on_thread_exit - * field. - */ -static void coroutine_destroy_notify(gpointer data) -{ - CoroutineGThread *co = data; - if (co && co->free_on_thread_exit) { - g_free(co); - } -} - -static GPrivate coroutine_key = G_PRIVATE_INIT(coroutine_destroy_notify); - -static inline CoroutineGThread *get_coroutine_key(void) -{ - return g_private_get(&coroutine_key); -} - -static inline void set_coroutine_key(CoroutineGThread *co, - bool free_on_thread_exit) -{ - /* Unlike g_static_private_set() this does not call the GDestroyNotify - * if the previous value of the key was NULL. Fortunately we only need - * the GDestroyNotify in the non-NULL key case. - */ - co->free_on_thread_exit = free_on_thread_exit; - g_private_replace(&coroutine_key, co); -} - -static inline GThread *create_thread(GThreadFunc func, gpointer data) -{ - return g_thread_new("coroutine", func, data); -} - -#else - -/* Handle older GLib versions */ - -static GStaticPrivate coroutine_key = G_STATIC_PRIVATE_INIT; - -static inline CoroutineGThread *get_coroutine_key(void) -{ - return g_static_private_get(&coroutine_key); -} - -static inline void set_coroutine_key(CoroutineGThread *co, - bool free_on_thread_exit) -{ - g_static_private_set(&coroutine_key, co, - free_on_thread_exit ? (GDestroyNotify)g_free : NULL); -} - -static inline GThread *create_thread(GThreadFunc func, gpointer data) -{ - return g_thread_create_full(func, data, 0, TRUE, TRUE, - G_THREAD_PRIORITY_NORMAL, NULL); -} - -#endif - - -static void __attribute__((constructor)) coroutine_init(void) -{ -#if !GLIB_CHECK_VERSION(2, 31, 0) - if (!g_thread_supported()) { - g_thread_init(NULL); - } -#endif -} - -static void coroutine_wait_runnable_locked(CoroutineGThread *co) -{ - while (!co->runnable) { - g_cond_wait(&coroutine_cond, &coroutine_lock); - } -} - -static void coroutine_wait_runnable(CoroutineGThread *co) -{ - g_mutex_lock(&coroutine_lock); - coroutine_wait_runnable_locked(co); - g_mutex_unlock(&coroutine_lock); -} - -static gpointer coroutine_thread(gpointer opaque) -{ - CoroutineGThread *co = opaque; - - set_coroutine_key(co, false); - coroutine_wait_runnable(co); - co->base.entry(co->base.entry_arg); - qemu_coroutine_switch(&co->base, co->base.caller, COROUTINE_TERMINATE); - return NULL; -} - -Coroutine *qemu_coroutine_new(void) -{ - CoroutineGThread *co; - - co = g_malloc0(sizeof(*co)); - co->thread = create_thread(coroutine_thread, co); - if (!co->thread) { - g_free(co); - return NULL; - } - return &co->base; -} - -void qemu_coroutine_delete(Coroutine *co_) -{ - CoroutineGThread *co = DO_UPCAST(CoroutineGThread, base, co_); - - g_thread_join(co->thread); - g_free(co); -} - -CoroutineAction qemu_coroutine_switch(Coroutine *from_, - Coroutine *to_, - CoroutineAction action) -{ - CoroutineGThread *from = DO_UPCAST(CoroutineGThread, base, from_); - CoroutineGThread *to = DO_UPCAST(CoroutineGThread, base, to_); - - g_mutex_lock(&coroutine_lock); - from->runnable = false; - from->action = action; - to->runnable = true; - to->action = action; - g_cond_broadcast(&coroutine_cond); - - if (action != COROUTINE_TERMINATE) { - coroutine_wait_runnable_locked(from); - } - g_mutex_unlock(&coroutine_lock); - return from->action; -} - -Coroutine *qemu_coroutine_self(void) -{ - CoroutineGThread *co = get_coroutine_key(); - if (!co) { - co = g_malloc0(sizeof(*co)); - co->runnable = true; - set_coroutine_key(co, true); - } - - return &co->base; -} - -bool qemu_in_coroutine(void) -{ - CoroutineGThread *co = get_coroutine_key(); - - return co && co->base.caller; -} diff --git a/coroutine-sigaltstack.c b/coroutine-sigaltstack.c deleted file mode 100644 index 63519fffc7..0000000000 --- a/coroutine-sigaltstack.c +++ /dev/null @@ -1,293 +0,0 @@ -/* - * sigaltstack coroutine initialization code - * - * Copyright (C) 2006 Anthony Liguori - * Copyright (C) 2011 Kevin Wolf - * Copyright (C) 2012 Alex Barcelo -** This file is partly based on pth_mctx.c, from the GNU Portable Threads -** Copyright (c) 1999-2006 Ralf S. Engelschall - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, see . - */ - -/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ -#ifdef _FORTIFY_SOURCE -#undef _FORTIFY_SOURCE -#endif -#include -#include -#include -#include -#include -#include "qemu-common.h" -#include "block/coroutine_int.h" - -typedef struct { - Coroutine base; - void *stack; - sigjmp_buf env; -} CoroutineUContext; - -/** - * Per-thread coroutine bookkeeping - */ -typedef struct { - /** Currently executing coroutine */ - Coroutine *current; - - /** The default coroutine */ - CoroutineUContext leader; - - /** Information for the signal handler (trampoline) */ - sigjmp_buf tr_reenter; - volatile sig_atomic_t tr_called; - void *tr_handler; -} CoroutineThreadState; - -static pthread_key_t thread_state_key; - -static CoroutineThreadState *coroutine_get_thread_state(void) -{ - CoroutineThreadState *s = pthread_getspecific(thread_state_key); - - if (!s) { - s = g_malloc0(sizeof(*s)); - s->current = &s->leader.base; - pthread_setspecific(thread_state_key, s); - } - return s; -} - -static void qemu_coroutine_thread_cleanup(void *opaque) -{ - CoroutineThreadState *s = opaque; - - g_free(s); -} - -static void __attribute__((constructor)) coroutine_init(void) -{ - int ret; - - ret = pthread_key_create(&thread_state_key, qemu_coroutine_thread_cleanup); - if (ret != 0) { - fprintf(stderr, "unable to create leader key: %s\n", strerror(errno)); - abort(); - } -} - -/* "boot" function - * This is what starts the coroutine, is called from the trampoline - * (from the signal handler when it is not signal handling, read ahead - * for more information). - */ -static void coroutine_bootstrap(CoroutineUContext *self, Coroutine *co) -{ - /* Initialize longjmp environment and switch back the caller */ - if (!sigsetjmp(self->env, 0)) { - siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); - } - - while (true) { - co->entry(co->entry_arg); - qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); - } -} - -/* - * This is used as the signal handler. This is called with the brand new stack - * (thanks to sigaltstack). We have to return, given that this is a signal - * handler and the sigmask and some other things are changed. - */ -static void coroutine_trampoline(int signal) -{ - CoroutineUContext *self; - Coroutine *co; - CoroutineThreadState *coTS; - - /* Get the thread specific information */ - coTS = coroutine_get_thread_state(); - self = coTS->tr_handler; - coTS->tr_called = 1; - co = &self->base; - - /* - * Here we have to do a bit of a ping pong between the caller, given that - * this is a signal handler and we have to do a return "soon". Then the - * caller can reestablish everything and do a siglongjmp here again. - */ - if (!sigsetjmp(coTS->tr_reenter, 0)) { - return; - } - - /* - * Ok, the caller has siglongjmp'ed back to us, so now prepare - * us for the real machine state switching. We have to jump - * into another function here to get a new stack context for - * the auto variables (which have to be auto-variables - * because the start of the thread happens later). Else with - * PIC (i.e. Position Independent Code which is used when PTH - * is built as a shared library) most platforms would - * horrible core dump as experience showed. - */ - coroutine_bootstrap(self, co); -} - -Coroutine *qemu_coroutine_new(void) -{ - const size_t stack_size = 1 << 20; - CoroutineUContext *co; - CoroutineThreadState *coTS; - struct sigaction sa; - struct sigaction osa; - stack_t ss; - stack_t oss; - sigset_t sigs; - sigset_t osigs; - sigjmp_buf old_env; - - /* The way to manipulate stack is with the sigaltstack function. We - * prepare a stack, with it delivering a signal to ourselves and then - * put sigsetjmp/siglongjmp where needed. - * This has been done keeping coroutine-ucontext as a model and with the - * pth ideas (GNU Portable Threads). See coroutine-ucontext for the basics - * of the coroutines and see pth_mctx.c (from the pth project) for the - * sigaltstack way of manipulating stacks. - */ - - co = g_malloc0(sizeof(*co)); - co->stack = g_malloc(stack_size); - co->base.entry_arg = &old_env; /* stash away our jmp_buf */ - - coTS = coroutine_get_thread_state(); - coTS->tr_handler = co; - - /* - * Preserve the SIGUSR2 signal state, block SIGUSR2, - * and establish our signal handler. The signal will - * later transfer control onto the signal stack. - */ - sigemptyset(&sigs); - sigaddset(&sigs, SIGUSR2); - pthread_sigmask(SIG_BLOCK, &sigs, &osigs); - sa.sa_handler = coroutine_trampoline; - sigfillset(&sa.sa_mask); - sa.sa_flags = SA_ONSTACK; - if (sigaction(SIGUSR2, &sa, &osa) != 0) { - abort(); - } - - /* - * Set the new stack. - */ - ss.ss_sp = co->stack; - ss.ss_size = stack_size; - ss.ss_flags = 0; - if (sigaltstack(&ss, &oss) < 0) { - abort(); - } - - /* - * Now transfer control onto the signal stack and set it up. - * It will return immediately via "return" after the sigsetjmp() - * was performed. Be careful here with race conditions. The - * signal can be delivered the first time sigsuspend() is - * called. - */ - coTS->tr_called = 0; - pthread_kill(pthread_self(), SIGUSR2); - sigfillset(&sigs); - sigdelset(&sigs, SIGUSR2); - while (!coTS->tr_called) { - sigsuspend(&sigs); - } - - /* - * Inform the system that we are back off the signal stack by - * removing the alternative signal stack. Be careful here: It - * first has to be disabled, before it can be removed. - */ - sigaltstack(NULL, &ss); - ss.ss_flags = SS_DISABLE; - if (sigaltstack(&ss, NULL) < 0) { - abort(); - } - sigaltstack(NULL, &ss); - if (!(oss.ss_flags & SS_DISABLE)) { - sigaltstack(&oss, NULL); - } - - /* - * Restore the old SIGUSR2 signal handler and mask - */ - sigaction(SIGUSR2, &osa, NULL); - pthread_sigmask(SIG_SETMASK, &osigs, NULL); - - /* - * Now enter the trampoline again, but this time not as a signal - * handler. Instead we jump into it directly. The functionally - * redundant ping-pong pointer arithmetic is necessary to avoid - * type-conversion warnings related to the `volatile' qualifier and - * the fact that `jmp_buf' usually is an array type. - */ - if (!sigsetjmp(old_env, 0)) { - siglongjmp(coTS->tr_reenter, 1); - } - - /* - * Ok, we returned again, so now we're finished - */ - - return &co->base; -} - -void qemu_coroutine_delete(Coroutine *co_) -{ - CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); - - g_free(co->stack); - g_free(co); -} - -CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, - CoroutineAction action) -{ - CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); - CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); - CoroutineThreadState *s = coroutine_get_thread_state(); - int ret; - - s->current = to_; - - ret = sigsetjmp(from->env, 0); - if (ret == 0) { - siglongjmp(to->env, action); - } - return ret; -} - -Coroutine *qemu_coroutine_self(void) -{ - CoroutineThreadState *s = coroutine_get_thread_state(); - - return s->current; -} - -bool qemu_in_coroutine(void) -{ - CoroutineThreadState *s = pthread_getspecific(thread_state_key); - - return s && s->current->caller; -} - diff --git a/coroutine-ucontext.c b/coroutine-ucontext.c deleted file mode 100644 index 259fcb48a4..0000000000 --- a/coroutine-ucontext.c +++ /dev/null @@ -1,194 +0,0 @@ -/* - * ucontext coroutine initialization code - * - * Copyright (C) 2006 Anthony Liguori - * Copyright (C) 2011 Kevin Wolf - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.0 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, see . - */ - -/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ -#ifdef _FORTIFY_SOURCE -#undef _FORTIFY_SOURCE -#endif -#include -#include -#include -#include -#include "qemu-common.h" -#include "block/coroutine_int.h" - -#ifdef CONFIG_VALGRIND_H -#include -#endif - -typedef struct { - Coroutine base; - void *stack; - sigjmp_buf env; - -#ifdef CONFIG_VALGRIND_H - unsigned int valgrind_stack_id; -#endif - -} CoroutineUContext; - -/** - * Per-thread coroutine bookkeeping - */ -static __thread CoroutineUContext leader; -static __thread Coroutine *current; - -/* - * va_args to makecontext() must be type 'int', so passing - * the pointer we need may require several int args. This - * union is a quick hack to let us do that - */ -union cc_arg { - void *p; - int i[2]; -}; - -static void coroutine_trampoline(int i0, int i1) -{ - union cc_arg arg; - CoroutineUContext *self; - Coroutine *co; - - arg.i[0] = i0; - arg.i[1] = i1; - self = arg.p; - co = &self->base; - - /* Initialize longjmp environment and switch back the caller */ - if (!sigsetjmp(self->env, 0)) { - siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); - } - - while (true) { - co->entry(co->entry_arg); - qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); - } -} - -Coroutine *qemu_coroutine_new(void) -{ - const size_t stack_size = 1 << 20; - CoroutineUContext *co; - ucontext_t old_uc, uc; - sigjmp_buf old_env; - union cc_arg arg = {0}; - - /* The ucontext functions preserve signal masks which incurs a - * system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not - * preserve signal masks but only works on the current stack. - * Since we need a way to create and switch to a new stack, use - * the ucontext functions for that but sigsetjmp()/siglongjmp() for - * everything else. - */ - - if (getcontext(&uc) == -1) { - abort(); - } - - co = g_malloc0(sizeof(*co)); - co->stack = g_malloc(stack_size); - co->base.entry_arg = &old_env; /* stash away our jmp_buf */ - - uc.uc_link = &old_uc; - uc.uc_stack.ss_sp = co->stack; - uc.uc_stack.ss_size = stack_size; - uc.uc_stack.ss_flags = 0; - -#ifdef CONFIG_VALGRIND_H - co->valgrind_stack_id = - VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size); -#endif - - arg.p = co; - - makecontext(&uc, (void (*)(void))coroutine_trampoline, - 2, arg.i[0], arg.i[1]); - - /* swapcontext() in, siglongjmp() back out */ - if (!sigsetjmp(old_env, 0)) { - swapcontext(&old_uc, &uc); - } - return &co->base; -} - -#ifdef CONFIG_VALGRIND_H -#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE -/* Work around an unused variable in the valgrind.h macro... */ -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-but-set-variable" -#endif -static inline void valgrind_stack_deregister(CoroutineUContext *co) -{ - VALGRIND_STACK_DEREGISTER(co->valgrind_stack_id); -} -#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE -#pragma GCC diagnostic pop -#endif -#endif - -void qemu_coroutine_delete(Coroutine *co_) -{ - CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); - -#ifdef CONFIG_VALGRIND_H - valgrind_stack_deregister(co); -#endif - - g_free(co->stack); - g_free(co); -} - -/* This function is marked noinline to prevent GCC from inlining it - * into coroutine_trampoline(). If we allow it to do that then it - * hoists the code to get the address of the TLS variable "current" - * out of the while() loop. This is an invalid transformation because - * the sigsetjmp() call may be called when running thread A but - * return in thread B, and so we might be in a different thread - * context each time round the loop. - */ -CoroutineAction __attribute__((noinline)) -qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, - CoroutineAction action) -{ - CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); - CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); - int ret; - - current = to_; - - ret = sigsetjmp(from->env, 0); - if (ret == 0) { - siglongjmp(to->env, action); - } - return ret; -} - -Coroutine *qemu_coroutine_self(void) -{ - if (!current) { - current = &leader.base; - } - return current; -} - -bool qemu_in_coroutine(void) -{ - return current && current->caller; -} diff --git a/coroutine-win32.c b/coroutine-win32.c deleted file mode 100644 index 17ace37dee..0000000000 --- a/coroutine-win32.c +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Win32 coroutine initialization code - * - * Copyright (c) 2011 Kevin Wolf - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -#include "qemu-common.h" -#include "block/coroutine_int.h" - -typedef struct -{ - Coroutine base; - - LPVOID fiber; - CoroutineAction action; -} CoroutineWin32; - -static __thread CoroutineWin32 leader; -static __thread Coroutine *current; - -/* This function is marked noinline to prevent GCC from inlining it - * into coroutine_trampoline(). If we allow it to do that then it - * hoists the code to get the address of the TLS variable "current" - * out of the while() loop. This is an invalid transformation because - * the SwitchToFiber() call may be called when running thread A but - * return in thread B, and so we might be in a different thread - * context each time round the loop. - */ -CoroutineAction __attribute__((noinline)) -qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, - CoroutineAction action) -{ - CoroutineWin32 *from = DO_UPCAST(CoroutineWin32, base, from_); - CoroutineWin32 *to = DO_UPCAST(CoroutineWin32, base, to_); - - current = to_; - - to->action = action; - SwitchToFiber(to->fiber); - return from->action; -} - -static void CALLBACK coroutine_trampoline(void *co_) -{ - Coroutine *co = co_; - - while (true) { - co->entry(co->entry_arg); - qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); - } -} - -Coroutine *qemu_coroutine_new(void) -{ - const size_t stack_size = 1 << 20; - CoroutineWin32 *co; - - co = g_malloc0(sizeof(*co)); - co->fiber = CreateFiber(stack_size, coroutine_trampoline, &co->base); - return &co->base; -} - -void qemu_coroutine_delete(Coroutine *co_) -{ - CoroutineWin32 *co = DO_UPCAST(CoroutineWin32, base, co_); - - DeleteFiber(co->fiber); - g_free(co); -} - -Coroutine *qemu_coroutine_self(void) -{ - if (!current) { - current = &leader.base; - leader.fiber = ConvertThreadToFiber(NULL); - } - return current; -} - -bool qemu_in_coroutine(void) -{ - return current && current->caller; -} diff --git a/hw/9pfs/codir.c b/hw/9pfs/codir.c index 65ad3298be..ec9cc7fb27 100644 --- a/hw/9pfs/codir.c +++ b/hw/9pfs/codir.c @@ -14,7 +14,7 @@ #include "fsdev/qemu-fsdev.h" #include "qemu/thread.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "virtio-9p-coth.h" int v9fs_co_readdir_r(V9fsPDU *pdu, V9fsFidState *fidp, struct dirent *dent, diff --git a/hw/9pfs/cofile.c b/hw/9pfs/cofile.c index 2efebf3571..7cb55ee93a 100644 --- a/hw/9pfs/cofile.c +++ b/hw/9pfs/cofile.c @@ -14,7 +14,7 @@ #include "fsdev/qemu-fsdev.h" #include "qemu/thread.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "virtio-9p-coth.h" int v9fs_co_st_gen(V9fsPDU *pdu, V9fsPath *path, mode_t st_mode, diff --git a/hw/9pfs/cofs.c b/hw/9pfs/cofs.c index 42ee614e27..e1953a9aa1 100644 --- a/hw/9pfs/cofs.c +++ b/hw/9pfs/cofs.c @@ -14,7 +14,7 @@ #include "fsdev/qemu-fsdev.h" #include "qemu/thread.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "virtio-9p-coth.h" static ssize_t __readlink(V9fsState *s, V9fsPath *path, V9fsString *buf) diff --git a/hw/9pfs/coxattr.c b/hw/9pfs/coxattr.c index 18ee08df0f..55c0d231cb 100644 --- a/hw/9pfs/coxattr.c +++ b/hw/9pfs/coxattr.c @@ -14,7 +14,7 @@ #include "fsdev/qemu-fsdev.h" #include "qemu/thread.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "virtio-9p-coth.h" int v9fs_co_llistxattr(V9fsPDU *pdu, V9fsPath *path, void *value, size_t size) diff --git a/hw/9pfs/virtio-9p-coth.c b/hw/9pfs/virtio-9p-coth.c index 8185c533c0..5057f8d220 100644 --- a/hw/9pfs/virtio-9p-coth.c +++ b/hw/9pfs/virtio-9p-coth.c @@ -15,7 +15,7 @@ #include "fsdev/qemu-fsdev.h" #include "qemu/thread.h" #include "qemu/event_notifier.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "virtio-9p-coth.h" /* v9fs glib thread pool */ diff --git a/hw/9pfs/virtio-9p-coth.h b/hw/9pfs/virtio-9p-coth.h index 4f51b250d1..0fbe49a946 100644 --- a/hw/9pfs/virtio-9p-coth.h +++ b/hw/9pfs/virtio-9p-coth.h @@ -16,7 +16,7 @@ #define _QEMU_VIRTIO_9P_COTH_H #include "qemu/thread.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "virtio-9p.h" #include diff --git a/hw/9pfs/virtio-9p.h b/hw/9pfs/virtio-9p.h index 2e7d488570..d7a4dc1e9a 100644 --- a/hw/9pfs/virtio-9p.h +++ b/hw/9pfs/virtio-9p.h @@ -13,7 +13,7 @@ #include "fsdev/file-op-9p.h" #include "fsdev/virtio-9p-marshal.h" #include "qemu/thread.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" enum { P9_TLERROR = 6, diff --git a/include/block/block.h b/include/block/block.h index 6d70eb42fe..84f05ad408 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -4,7 +4,7 @@ #include "block/aio.h" #include "qemu-common.h" #include "qemu/option.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "block/accounting.h" #include "qapi/qmp/qobject.h" #include "qapi-types.h" diff --git a/include/block/block_int.h b/include/block/block_int.h index c0e65138b1..a480f944cf 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -28,7 +28,7 @@ #include "block/block.h" #include "qemu/option.h" #include "qemu/queue.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "qemu/timer.h" #include "qapi-types.h" #include "qemu/hbitmap.h" diff --git a/include/block/coroutine.h b/include/block/coroutine.h deleted file mode 100644 index 20c027a7fd..0000000000 --- a/include/block/coroutine.h +++ /dev/null @@ -1,219 +0,0 @@ -/* - * QEMU coroutine implementation - * - * Copyright IBM, Corp. 2011 - * - * Authors: - * Stefan Hajnoczi - * Kevin Wolf - * - * This work is licensed under the terms of the GNU LGPL, version 2 or later. - * See the COPYING.LIB file in the top-level directory. - * - */ - -#ifndef QEMU_COROUTINE_H -#define QEMU_COROUTINE_H - -#include -#include "qemu/typedefs.h" -#include "qemu/queue.h" -#include "qemu/timer.h" - -/** - * Coroutines are a mechanism for stack switching and can be used for - * cooperative userspace threading. These functions provide a simple but - * useful flavor of coroutines that is suitable for writing sequential code, - * rather than callbacks, for operations that need to give up control while - * waiting for events to complete. - * - * These functions are re-entrant and may be used outside the global mutex. - */ - -/** - * Mark a function that executes in coroutine context - * - * Functions that execute in coroutine context cannot be called directly from - * normal functions. In the future it would be nice to enable compiler or - * static checker support for catching such errors. This annotation might make - * it possible and in the meantime it serves as documentation. - * - * For example: - * - * static void coroutine_fn foo(void) { - * .... - * } - */ -#define coroutine_fn - -typedef struct Coroutine Coroutine; - -/** - * Coroutine entry point - * - * When the coroutine is entered for the first time, opaque is passed in as an - * argument. - * - * When this function returns, the coroutine is destroyed automatically and - * execution continues in the caller who last entered the coroutine. - */ -typedef void coroutine_fn CoroutineEntry(void *opaque); - -/** - * Create a new coroutine - * - * Use qemu_coroutine_enter() to actually transfer control to the coroutine. - */ -Coroutine *qemu_coroutine_create(CoroutineEntry *entry); - -/** - * Transfer control to a coroutine - * - * The opaque argument is passed as the argument to the entry point when - * entering the coroutine for the first time. It is subsequently ignored. - */ -void qemu_coroutine_enter(Coroutine *coroutine, void *opaque); - -/** - * Transfer control back to a coroutine's caller - * - * This function does not return until the coroutine is re-entered using - * qemu_coroutine_enter(). - */ -void coroutine_fn qemu_coroutine_yield(void); - -/** - * Get the currently executing coroutine - */ -Coroutine *coroutine_fn qemu_coroutine_self(void); - -/** - * Return whether or not currently inside a coroutine - * - * This can be used to write functions that work both when in coroutine context - * and when not in coroutine context. Note that such functions cannot use the - * coroutine_fn annotation since they work outside coroutine context. - */ -bool qemu_in_coroutine(void); - - - -/** - * CoQueues are a mechanism to queue coroutines in order to continue executing - * them later. They provide the fundamental primitives on which coroutine locks - * are built. - */ -typedef struct CoQueue { - QTAILQ_HEAD(, Coroutine) entries; -} CoQueue; - -/** - * Initialise a CoQueue. This must be called before any other operation is used - * on the CoQueue. - */ -void qemu_co_queue_init(CoQueue *queue); - -/** - * Adds the current coroutine to the CoQueue and transfers control to the - * caller of the coroutine. - */ -void coroutine_fn qemu_co_queue_wait(CoQueue *queue); - -/** - * Restarts the next coroutine in the CoQueue and removes it from the queue. - * - * Returns true if a coroutine was restarted, false if the queue is empty. - */ -bool coroutine_fn qemu_co_queue_next(CoQueue *queue); - -/** - * Restarts all coroutines in the CoQueue and leaves the queue empty. - */ -void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue); - -/** - * Enter the next coroutine in the queue - */ -bool qemu_co_enter_next(CoQueue *queue); - -/** - * Checks if the CoQueue is empty. - */ -bool qemu_co_queue_empty(CoQueue *queue); - - -/** - * Provides a mutex that can be used to synchronise coroutines - */ -typedef struct CoMutex { - bool locked; - CoQueue queue; -} CoMutex; - -/** - * Initialises a CoMutex. This must be called before any other operation is used - * on the CoMutex. - */ -void qemu_co_mutex_init(CoMutex *mutex); - -/** - * Locks the mutex. If the lock cannot be taken immediately, control is - * transferred to the caller of the current coroutine. - */ -void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex); - -/** - * Unlocks the mutex and schedules the next coroutine that was waiting for this - * lock to be run. - */ -void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex); - -typedef struct CoRwlock { - bool writer; - int reader; - CoQueue queue; -} CoRwlock; - -/** - * Initialises a CoRwlock. This must be called before any other operation - * is used on the CoRwlock - */ -void qemu_co_rwlock_init(CoRwlock *lock); - -/** - * Read locks the CoRwlock. If the lock cannot be taken immediately because - * of a parallel writer, control is transferred to the caller of the current - * coroutine. - */ -void qemu_co_rwlock_rdlock(CoRwlock *lock); - -/** - * Write Locks the mutex. If the lock cannot be taken immediately because - * of a parallel reader, control is transferred to the caller of the current - * coroutine. - */ -void qemu_co_rwlock_wrlock(CoRwlock *lock); - -/** - * Unlocks the read/write lock and schedules the next coroutine that was - * waiting for this lock to be run. - */ -void qemu_co_rwlock_unlock(CoRwlock *lock); - -/** - * Yield the coroutine for a given duration - * - * Behaves similarly to co_sleep_ns(), but the sleeping coroutine will be - * resumed when using aio_poll(). - */ -void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, - int64_t ns); - -/** - * Yield until a file descriptor becomes readable - * - * Note that this function clobbers the handlers for the file descriptor. - */ -void coroutine_fn yield_until_fd_readable(int fd); - -#endif /* QEMU_COROUTINE_H */ diff --git a/include/block/coroutine_int.h b/include/block/coroutine_int.h deleted file mode 100644 index 9aa1aae5d5..0000000000 --- a/include/block/coroutine_int.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Coroutine internals - * - * Copyright (c) 2011 Kevin Wolf - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -#ifndef QEMU_COROUTINE_INT_H -#define QEMU_COROUTINE_INT_H - -#include "qemu/queue.h" -#include "block/coroutine.h" - -typedef enum { - COROUTINE_YIELD = 1, - COROUTINE_TERMINATE = 2, - COROUTINE_ENTER = 3, -} CoroutineAction; - -struct Coroutine { - CoroutineEntry *entry; - void *entry_arg; - Coroutine *caller; - QSLIST_ENTRY(Coroutine) pool_next; - - /* Coroutines that should be woken up when we yield or terminate */ - QTAILQ_HEAD(, Coroutine) co_queue_wakeup; - QTAILQ_ENTRY(Coroutine) co_queue_next; -}; - -Coroutine *qemu_coroutine_new(void); -void qemu_coroutine_delete(Coroutine *co); -CoroutineAction qemu_coroutine_switch(Coroutine *from, Coroutine *to, - CoroutineAction action); -void coroutine_fn qemu_co_queue_run_restart(Coroutine *co); - -#endif diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h new file mode 100644 index 0000000000..20c027a7fd --- /dev/null +++ b/include/qemu/coroutine.h @@ -0,0 +1,219 @@ +/* + * QEMU coroutine implementation + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * Kevin Wolf + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#ifndef QEMU_COROUTINE_H +#define QEMU_COROUTINE_H + +#include +#include "qemu/typedefs.h" +#include "qemu/queue.h" +#include "qemu/timer.h" + +/** + * Coroutines are a mechanism for stack switching and can be used for + * cooperative userspace threading. These functions provide a simple but + * useful flavor of coroutines that is suitable for writing sequential code, + * rather than callbacks, for operations that need to give up control while + * waiting for events to complete. + * + * These functions are re-entrant and may be used outside the global mutex. + */ + +/** + * Mark a function that executes in coroutine context + * + * Functions that execute in coroutine context cannot be called directly from + * normal functions. In the future it would be nice to enable compiler or + * static checker support for catching such errors. This annotation might make + * it possible and in the meantime it serves as documentation. + * + * For example: + * + * static void coroutine_fn foo(void) { + * .... + * } + */ +#define coroutine_fn + +typedef struct Coroutine Coroutine; + +/** + * Coroutine entry point + * + * When the coroutine is entered for the first time, opaque is passed in as an + * argument. + * + * When this function returns, the coroutine is destroyed automatically and + * execution continues in the caller who last entered the coroutine. + */ +typedef void coroutine_fn CoroutineEntry(void *opaque); + +/** + * Create a new coroutine + * + * Use qemu_coroutine_enter() to actually transfer control to the coroutine. + */ +Coroutine *qemu_coroutine_create(CoroutineEntry *entry); + +/** + * Transfer control to a coroutine + * + * The opaque argument is passed as the argument to the entry point when + * entering the coroutine for the first time. It is subsequently ignored. + */ +void qemu_coroutine_enter(Coroutine *coroutine, void *opaque); + +/** + * Transfer control back to a coroutine's caller + * + * This function does not return until the coroutine is re-entered using + * qemu_coroutine_enter(). + */ +void coroutine_fn qemu_coroutine_yield(void); + +/** + * Get the currently executing coroutine + */ +Coroutine *coroutine_fn qemu_coroutine_self(void); + +/** + * Return whether or not currently inside a coroutine + * + * This can be used to write functions that work both when in coroutine context + * and when not in coroutine context. Note that such functions cannot use the + * coroutine_fn annotation since they work outside coroutine context. + */ +bool qemu_in_coroutine(void); + + + +/** + * CoQueues are a mechanism to queue coroutines in order to continue executing + * them later. They provide the fundamental primitives on which coroutine locks + * are built. + */ +typedef struct CoQueue { + QTAILQ_HEAD(, Coroutine) entries; +} CoQueue; + +/** + * Initialise a CoQueue. This must be called before any other operation is used + * on the CoQueue. + */ +void qemu_co_queue_init(CoQueue *queue); + +/** + * Adds the current coroutine to the CoQueue and transfers control to the + * caller of the coroutine. + */ +void coroutine_fn qemu_co_queue_wait(CoQueue *queue); + +/** + * Restarts the next coroutine in the CoQueue and removes it from the queue. + * + * Returns true if a coroutine was restarted, false if the queue is empty. + */ +bool coroutine_fn qemu_co_queue_next(CoQueue *queue); + +/** + * Restarts all coroutines in the CoQueue and leaves the queue empty. + */ +void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue); + +/** + * Enter the next coroutine in the queue + */ +bool qemu_co_enter_next(CoQueue *queue); + +/** + * Checks if the CoQueue is empty. + */ +bool qemu_co_queue_empty(CoQueue *queue); + + +/** + * Provides a mutex that can be used to synchronise coroutines + */ +typedef struct CoMutex { + bool locked; + CoQueue queue; +} CoMutex; + +/** + * Initialises a CoMutex. This must be called before any other operation is used + * on the CoMutex. + */ +void qemu_co_mutex_init(CoMutex *mutex); + +/** + * Locks the mutex. If the lock cannot be taken immediately, control is + * transferred to the caller of the current coroutine. + */ +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex); + +/** + * Unlocks the mutex and schedules the next coroutine that was waiting for this + * lock to be run. + */ +void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex); + +typedef struct CoRwlock { + bool writer; + int reader; + CoQueue queue; +} CoRwlock; + +/** + * Initialises a CoRwlock. This must be called before any other operation + * is used on the CoRwlock + */ +void qemu_co_rwlock_init(CoRwlock *lock); + +/** + * Read locks the CoRwlock. If the lock cannot be taken immediately because + * of a parallel writer, control is transferred to the caller of the current + * coroutine. + */ +void qemu_co_rwlock_rdlock(CoRwlock *lock); + +/** + * Write Locks the mutex. If the lock cannot be taken immediately because + * of a parallel reader, control is transferred to the caller of the current + * coroutine. + */ +void qemu_co_rwlock_wrlock(CoRwlock *lock); + +/** + * Unlocks the read/write lock and schedules the next coroutine that was + * waiting for this lock to be run. + */ +void qemu_co_rwlock_unlock(CoRwlock *lock); + +/** + * Yield the coroutine for a given duration + * + * Behaves similarly to co_sleep_ns(), but the sleeping coroutine will be + * resumed when using aio_poll(). + */ +void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, + int64_t ns); + +/** + * Yield until a file descriptor becomes readable + * + * Note that this function clobbers the handlers for the file descriptor. + */ +void coroutine_fn yield_until_fd_readable(int fd); + +#endif /* QEMU_COROUTINE_H */ diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h new file mode 100644 index 0000000000..42d6838401 --- /dev/null +++ b/include/qemu/coroutine_int.h @@ -0,0 +1,54 @@ +/* + * Coroutine internals + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#ifndef QEMU_COROUTINE_INT_H +#define QEMU_COROUTINE_INT_H + +#include "qemu/queue.h" +#include "qemu/coroutine.h" + +typedef enum { + COROUTINE_YIELD = 1, + COROUTINE_TERMINATE = 2, + COROUTINE_ENTER = 3, +} CoroutineAction; + +struct Coroutine { + CoroutineEntry *entry; + void *entry_arg; + Coroutine *caller; + QSLIST_ENTRY(Coroutine) pool_next; + + /* Coroutines that should be woken up when we yield or terminate */ + QTAILQ_HEAD(, Coroutine) co_queue_wakeup; + QTAILQ_ENTRY(Coroutine) co_queue_next; +}; + +Coroutine *qemu_coroutine_new(void); +void qemu_coroutine_delete(Coroutine *co); +CoroutineAction qemu_coroutine_switch(Coroutine *from, Coroutine *to, + CoroutineAction action); +void coroutine_fn qemu_co_queue_run_restart(Coroutine *co); + +#endif diff --git a/migration/qemu-file-buf.c b/migration/qemu-file-buf.c index e3fd0859d6..49516b8643 100644 --- a/migration/qemu-file-buf.c +++ b/migration/qemu-file-buf.c @@ -29,7 +29,7 @@ #include "qemu/error-report.h" #include "qemu/iov.h" #include "qemu/sockets.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "migration/migration.h" #include "migration/qemu-file.h" #include "migration/qemu-file-internal.h" diff --git a/migration/qemu-file-stdio.c b/migration/qemu-file-stdio.c index 889ffb302c..9bde9db566 100644 --- a/migration/qemu-file-stdio.c +++ b/migration/qemu-file-stdio.c @@ -22,7 +22,7 @@ * THE SOFTWARE. */ #include "qemu-common.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "migration/qemu-file.h" typedef struct QEMUFileStdio { diff --git a/migration/qemu-file-unix.c b/migration/qemu-file-unix.c index bf7a0e4a2b..809bf070d7 100644 --- a/migration/qemu-file-unix.c +++ b/migration/qemu-file-unix.c @@ -24,7 +24,7 @@ #include "qemu-common.h" #include "qemu/iov.h" #include "qemu/sockets.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "migration/qemu-file.h" #include "migration/qemu-file-internal.h" diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 49addf6d06..df49023ed8 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -26,7 +26,7 @@ #include "qemu/error-report.h" #include "qemu/iov.h" #include "qemu/sockets.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "migration/migration.h" #include "migration/qemu-file.h" #include "migration/qemu-file-internal.h" diff --git a/migration/rdma.c b/migration/rdma.c index 7a7176f7c9..553fbd7503 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -19,7 +19,7 @@ #include "qemu/main-loop.h" #include "qemu/sockets.h" #include "qemu/bitmap.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include #include #include diff --git a/nbd.c b/nbd.c index 74859cbe09..fc34c449c8 100644 --- a/nbd.c +++ b/nbd.c @@ -19,7 +19,7 @@ #include "block/nbd.h" #include "sysemu/block-backend.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include #include diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c deleted file mode 100644 index 28dc7351ac..0000000000 --- a/qemu-coroutine-io.c +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Coroutine-aware I/O functions - * - * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation. - * Copyright (c) 2011, Red Hat, Inc. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -#include "qemu-common.h" -#include "qemu/sockets.h" -#include "block/coroutine.h" -#include "qemu/iov.h" -#include "qemu/main-loop.h" - -ssize_t coroutine_fn -qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt, - size_t offset, size_t bytes, bool do_send) -{ - size_t done = 0; - ssize_t ret; - int err; - while (done < bytes) { - ret = iov_send_recv(sockfd, iov, iov_cnt, - offset + done, bytes - done, do_send); - if (ret > 0) { - done += ret; - } else if (ret < 0) { - err = socket_error(); - if (err == EAGAIN || err == EWOULDBLOCK) { - qemu_coroutine_yield(); - } else if (done == 0) { - return -err; - } else { - break; - } - } else if (ret == 0 && !do_send) { - /* write (send) should never return 0. - * read (recv) returns 0 for end-of-file (-data). - * In both cases there's little point retrying, - * but we do for write anyway, just in case */ - break; - } - } - return done; -} - -ssize_t coroutine_fn -qemu_co_send_recv(int sockfd, void *buf, size_t bytes, bool do_send) -{ - struct iovec iov = { .iov_base = buf, .iov_len = bytes }; - return qemu_co_sendv_recvv(sockfd, &iov, 1, 0, bytes, do_send); -} - -typedef struct { - Coroutine *co; - int fd; -} FDYieldUntilData; - -static void fd_coroutine_enter(void *opaque) -{ - FDYieldUntilData *data = opaque; - qemu_set_fd_handler(data->fd, NULL, NULL, NULL); - qemu_coroutine_enter(data->co, NULL); -} - -void coroutine_fn yield_until_fd_readable(int fd) -{ - FDYieldUntilData data; - - assert(qemu_in_coroutine()); - data.co = qemu_coroutine_self(); - data.fd = fd; - qemu_set_fd_handler(fd, fd_coroutine_enter, NULL, &data); - qemu_coroutine_yield(); -} diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c deleted file mode 100644 index 6b4903334b..0000000000 --- a/qemu-coroutine-lock.c +++ /dev/null @@ -1,186 +0,0 @@ -/* - * coroutine queues and locks - * - * Copyright (c) 2011 Kevin Wolf - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -#include "qemu-common.h" -#include "block/coroutine.h" -#include "block/coroutine_int.h" -#include "qemu/queue.h" -#include "trace.h" - -void qemu_co_queue_init(CoQueue *queue) -{ - QTAILQ_INIT(&queue->entries); -} - -void coroutine_fn qemu_co_queue_wait(CoQueue *queue) -{ - Coroutine *self = qemu_coroutine_self(); - QTAILQ_INSERT_TAIL(&queue->entries, self, co_queue_next); - qemu_coroutine_yield(); - assert(qemu_in_coroutine()); -} - -/** - * qemu_co_queue_run_restart: - * - * Enter each coroutine that was previously marked for restart by - * qemu_co_queue_next() or qemu_co_queue_restart_all(). This function is - * invoked by the core coroutine code when the current coroutine yields or - * terminates. - */ -void qemu_co_queue_run_restart(Coroutine *co) -{ - Coroutine *next; - - trace_qemu_co_queue_run_restart(co); - while ((next = QTAILQ_FIRST(&co->co_queue_wakeup))) { - QTAILQ_REMOVE(&co->co_queue_wakeup, next, co_queue_next); - qemu_coroutine_enter(next, NULL); - } -} - -static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) -{ - Coroutine *self = qemu_coroutine_self(); - Coroutine *next; - - if (QTAILQ_EMPTY(&queue->entries)) { - return false; - } - - while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) { - QTAILQ_REMOVE(&queue->entries, next, co_queue_next); - QTAILQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next); - trace_qemu_co_queue_next(next); - if (single) { - break; - } - } - return true; -} - -bool coroutine_fn qemu_co_queue_next(CoQueue *queue) -{ - assert(qemu_in_coroutine()); - return qemu_co_queue_do_restart(queue, true); -} - -void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue) -{ - assert(qemu_in_coroutine()); - qemu_co_queue_do_restart(queue, false); -} - -bool qemu_co_enter_next(CoQueue *queue) -{ - Coroutine *next; - - next = QTAILQ_FIRST(&queue->entries); - if (!next) { - return false; - } - - QTAILQ_REMOVE(&queue->entries, next, co_queue_next); - qemu_coroutine_enter(next, NULL); - return true; -} - -bool qemu_co_queue_empty(CoQueue *queue) -{ - return QTAILQ_FIRST(&queue->entries) == NULL; -} - -void qemu_co_mutex_init(CoMutex *mutex) -{ - memset(mutex, 0, sizeof(*mutex)); - qemu_co_queue_init(&mutex->queue); -} - -void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) -{ - Coroutine *self = qemu_coroutine_self(); - - trace_qemu_co_mutex_lock_entry(mutex, self); - - while (mutex->locked) { - qemu_co_queue_wait(&mutex->queue); - } - - mutex->locked = true; - - trace_qemu_co_mutex_lock_return(mutex, self); -} - -void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) -{ - Coroutine *self = qemu_coroutine_self(); - - trace_qemu_co_mutex_unlock_entry(mutex, self); - - assert(mutex->locked == true); - assert(qemu_in_coroutine()); - - mutex->locked = false; - qemu_co_queue_next(&mutex->queue); - - trace_qemu_co_mutex_unlock_return(mutex, self); -} - -void qemu_co_rwlock_init(CoRwlock *lock) -{ - memset(lock, 0, sizeof(*lock)); - qemu_co_queue_init(&lock->queue); -} - -void qemu_co_rwlock_rdlock(CoRwlock *lock) -{ - while (lock->writer) { - qemu_co_queue_wait(&lock->queue); - } - lock->reader++; -} - -void qemu_co_rwlock_unlock(CoRwlock *lock) -{ - assert(qemu_in_coroutine()); - if (lock->writer) { - lock->writer = false; - qemu_co_queue_restart_all(&lock->queue); - } else { - lock->reader--; - assert(lock->reader >= 0); - /* Wakeup only one waiting writer */ - if (!lock->reader) { - qemu_co_queue_next(&lock->queue); - } - } -} - -void qemu_co_rwlock_wrlock(CoRwlock *lock) -{ - while (lock->writer || lock->reader) { - qemu_co_queue_wait(&lock->queue); - } - lock->writer = true; -} diff --git a/qemu-coroutine-sleep.c b/qemu-coroutine-sleep.c deleted file mode 100644 index 9abb7fdf31..0000000000 --- a/qemu-coroutine-sleep.c +++ /dev/null @@ -1,41 +0,0 @@ -/* - * QEMU coroutine sleep - * - * Copyright IBM, Corp. 2011 - * - * Authors: - * Stefan Hajnoczi - * - * This work is licensed under the terms of the GNU LGPL, version 2 or later. - * See the COPYING.LIB file in the top-level directory. - * - */ - -#include "block/coroutine.h" -#include "qemu/timer.h" -#include "block/aio.h" - -typedef struct CoSleepCB { - QEMUTimer *ts; - Coroutine *co; -} CoSleepCB; - -static void co_sleep_cb(void *opaque) -{ - CoSleepCB *sleep_cb = opaque; - - qemu_coroutine_enter(sleep_cb->co, NULL); -} - -void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, - int64_t ns) -{ - CoSleepCB sleep_cb = { - .co = qemu_coroutine_self(), - }; - sleep_cb.ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, &sleep_cb); - timer_mod(sleep_cb.ts, qemu_clock_get_ns(type) + ns); - qemu_coroutine_yield(); - timer_del(sleep_cb.ts); - timer_free(sleep_cb.ts); -} diff --git a/qemu-coroutine.c b/qemu-coroutine.c deleted file mode 100644 index c17a92b107..0000000000 --- a/qemu-coroutine.c +++ /dev/null @@ -1,146 +0,0 @@ -/* - * QEMU coroutines - * - * Copyright IBM, Corp. 2011 - * - * Authors: - * Stefan Hajnoczi - * Kevin Wolf - * - * This work is licensed under the terms of the GNU LGPL, version 2 or later. - * See the COPYING.LIB file in the top-level directory. - * - */ - -#include "trace.h" -#include "qemu-common.h" -#include "qemu/thread.h" -#include "qemu/atomic.h" -#include "block/coroutine.h" -#include "block/coroutine_int.h" - -enum { - POOL_BATCH_SIZE = 64, -}; - -/** Free list to speed up creation */ -static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool); -static unsigned int release_pool_size; -static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool); -static __thread unsigned int alloc_pool_size; -static __thread Notifier coroutine_pool_cleanup_notifier; - -static void coroutine_pool_cleanup(Notifier *n, void *value) -{ - Coroutine *co; - Coroutine *tmp; - - QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) { - QSLIST_REMOVE_HEAD(&alloc_pool, pool_next); - qemu_coroutine_delete(co); - } -} - -Coroutine *qemu_coroutine_create(CoroutineEntry *entry) -{ - Coroutine *co = NULL; - - if (CONFIG_COROUTINE_POOL) { - co = QSLIST_FIRST(&alloc_pool); - if (!co) { - if (release_pool_size > POOL_BATCH_SIZE) { - /* Slow path; a good place to register the destructor, too. */ - if (!coroutine_pool_cleanup_notifier.notify) { - coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup; - qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier); - } - - /* This is not exact; there could be a little skew between - * release_pool_size and the actual size of release_pool. But - * it is just a heuristic, it does not need to be perfect. - */ - alloc_pool_size = atomic_xchg(&release_pool_size, 0); - QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool); - co = QSLIST_FIRST(&alloc_pool); - } - } - if (co) { - QSLIST_REMOVE_HEAD(&alloc_pool, pool_next); - alloc_pool_size--; - } - } - - if (!co) { - co = qemu_coroutine_new(); - } - - co->entry = entry; - QTAILQ_INIT(&co->co_queue_wakeup); - return co; -} - -static void coroutine_delete(Coroutine *co) -{ - co->caller = NULL; - - if (CONFIG_COROUTINE_POOL) { - if (release_pool_size < POOL_BATCH_SIZE * 2) { - QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next); - atomic_inc(&release_pool_size); - return; - } - if (alloc_pool_size < POOL_BATCH_SIZE) { - QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next); - alloc_pool_size++; - return; - } - } - - qemu_coroutine_delete(co); -} - -void qemu_coroutine_enter(Coroutine *co, void *opaque) -{ - Coroutine *self = qemu_coroutine_self(); - CoroutineAction ret; - - trace_qemu_coroutine_enter(self, co, opaque); - - if (co->caller) { - fprintf(stderr, "Co-routine re-entered recursively\n"); - abort(); - } - - co->caller = self; - co->entry_arg = opaque; - ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER); - - qemu_co_queue_run_restart(co); - - switch (ret) { - case COROUTINE_YIELD: - return; - case COROUTINE_TERMINATE: - trace_qemu_coroutine_terminate(co); - coroutine_delete(co); - return; - default: - abort(); - } -} - -void coroutine_fn qemu_coroutine_yield(void) -{ - Coroutine *self = qemu_coroutine_self(); - Coroutine *to = self->caller; - - trace_qemu_coroutine_yield(self, to); - - if (!to) { - fprintf(stderr, "Co-routine is yielding to no one\n"); - abort(); - } - - self->caller = NULL; - qemu_coroutine_switch(self, to, COROUTINE_YIELD); -} diff --git a/tests/test-coroutine.c b/tests/test-coroutine.c index b552d9f5e9..f5951cb1f1 100644 --- a/tests/test-coroutine.c +++ b/tests/test-coroutine.c @@ -12,8 +12,8 @@ */ #include -#include "block/coroutine.h" -#include "block/coroutine_int.h" +#include "qemu/coroutine.h" +#include "qemu/coroutine_int.h" /* * Check that qemu_in_coroutine() works diff --git a/tests/test-vmstate.c b/tests/test-vmstate.c index 1d620e04fb..4d13bd09b3 100644 --- a/tests/test-vmstate.c +++ b/tests/test-vmstate.c @@ -27,7 +27,7 @@ #include "qemu-common.h" #include "migration/migration.h" #include "migration/vmstate.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" static char temp_file[] = "/tmp/vmst.test.XXXXXX"; static int temp_fd; diff --git a/thread-pool.c b/thread-pool.c index ac909f4986..402c778b47 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -18,7 +18,7 @@ #include "qemu/queue.h" #include "qemu/thread.h" #include "qemu/osdep.h" -#include "block/coroutine.h" +#include "qemu/coroutine.h" #include "trace.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" diff --git a/util/Makefile.objs b/util/Makefile.objs index 114d6578c4..d8d7e7a919 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -18,3 +18,6 @@ util-obj-y += getauxval.o util-obj-y += readline.o util-obj-y += rfifolock.o util-obj-y += rcu.o +util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o +util-obj-y += qemu-coroutine-sleep.o +util-obj-y += coroutine-$(CONFIG_COROUTINE_BACKEND).o diff --git a/util/coroutine-gthread.c b/util/coroutine-gthread.c new file mode 100644 index 0000000000..0bcd77867d --- /dev/null +++ b/util/coroutine-gthread.c @@ -0,0 +1,198 @@ +/* + * GThread coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Aneesh Kumar K.V + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.0 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +#include +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +typedef struct { + Coroutine base; + GThread *thread; + bool runnable; + bool free_on_thread_exit; + CoroutineAction action; +} CoroutineGThread; + +static CompatGMutex coroutine_lock; +static CompatGCond coroutine_cond; + +/* GLib 2.31 and beyond deprecated various parts of the thread API, + * but the new interfaces are not available in older GLib versions + * so we have to cope with both. + */ +#if GLIB_CHECK_VERSION(2, 31, 0) +/* Awkwardly, the GPrivate API doesn't provide a way to update the + * GDestroyNotify handler for the coroutine key dynamically. So instead + * we track whether or not the CoroutineGThread should be freed on + * thread exit / coroutine key update using the free_on_thread_exit + * field. + */ +static void coroutine_destroy_notify(gpointer data) +{ + CoroutineGThread *co = data; + if (co && co->free_on_thread_exit) { + g_free(co); + } +} + +static GPrivate coroutine_key = G_PRIVATE_INIT(coroutine_destroy_notify); + +static inline CoroutineGThread *get_coroutine_key(void) +{ + return g_private_get(&coroutine_key); +} + +static inline void set_coroutine_key(CoroutineGThread *co, + bool free_on_thread_exit) +{ + /* Unlike g_static_private_set() this does not call the GDestroyNotify + * if the previous value of the key was NULL. Fortunately we only need + * the GDestroyNotify in the non-NULL key case. + */ + co->free_on_thread_exit = free_on_thread_exit; + g_private_replace(&coroutine_key, co); +} + +static inline GThread *create_thread(GThreadFunc func, gpointer data) +{ + return g_thread_new("coroutine", func, data); +} + +#else + +/* Handle older GLib versions */ + +static GStaticPrivate coroutine_key = G_STATIC_PRIVATE_INIT; + +static inline CoroutineGThread *get_coroutine_key(void) +{ + return g_static_private_get(&coroutine_key); +} + +static inline void set_coroutine_key(CoroutineGThread *co, + bool free_on_thread_exit) +{ + g_static_private_set(&coroutine_key, co, + free_on_thread_exit ? (GDestroyNotify)g_free : NULL); +} + +static inline GThread *create_thread(GThreadFunc func, gpointer data) +{ + return g_thread_create_full(func, data, 0, TRUE, TRUE, + G_THREAD_PRIORITY_NORMAL, NULL); +} + +#endif + + +static void __attribute__((constructor)) coroutine_init(void) +{ +#if !GLIB_CHECK_VERSION(2, 31, 0) + if (!g_thread_supported()) { + g_thread_init(NULL); + } +#endif +} + +static void coroutine_wait_runnable_locked(CoroutineGThread *co) +{ + while (!co->runnable) { + g_cond_wait(&coroutine_cond, &coroutine_lock); + } +} + +static void coroutine_wait_runnable(CoroutineGThread *co) +{ + g_mutex_lock(&coroutine_lock); + coroutine_wait_runnable_locked(co); + g_mutex_unlock(&coroutine_lock); +} + +static gpointer coroutine_thread(gpointer opaque) +{ + CoroutineGThread *co = opaque; + + set_coroutine_key(co, false); + coroutine_wait_runnable(co); + co->base.entry(co->base.entry_arg); + qemu_coroutine_switch(&co->base, co->base.caller, COROUTINE_TERMINATE); + return NULL; +} + +Coroutine *qemu_coroutine_new(void) +{ + CoroutineGThread *co; + + co = g_malloc0(sizeof(*co)); + co->thread = create_thread(coroutine_thread, co); + if (!co->thread) { + g_free(co); + return NULL; + } + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineGThread *co = DO_UPCAST(CoroutineGThread, base, co_); + + g_thread_join(co->thread); + g_free(co); +} + +CoroutineAction qemu_coroutine_switch(Coroutine *from_, + Coroutine *to_, + CoroutineAction action) +{ + CoroutineGThread *from = DO_UPCAST(CoroutineGThread, base, from_); + CoroutineGThread *to = DO_UPCAST(CoroutineGThread, base, to_); + + g_mutex_lock(&coroutine_lock); + from->runnable = false; + from->action = action; + to->runnable = true; + to->action = action; + g_cond_broadcast(&coroutine_cond); + + if (action != COROUTINE_TERMINATE) { + coroutine_wait_runnable_locked(from); + } + g_mutex_unlock(&coroutine_lock); + return from->action; +} + +Coroutine *qemu_coroutine_self(void) +{ + CoroutineGThread *co = get_coroutine_key(); + if (!co) { + co = g_malloc0(sizeof(*co)); + co->runnable = true; + set_coroutine_key(co, true); + } + + return &co->base; +} + +bool qemu_in_coroutine(void) +{ + CoroutineGThread *co = get_coroutine_key(); + + return co && co->base.caller; +} diff --git a/util/coroutine-sigaltstack.c b/util/coroutine-sigaltstack.c new file mode 100644 index 0000000000..39842a4a90 --- /dev/null +++ b/util/coroutine-sigaltstack.c @@ -0,0 +1,293 @@ +/* + * sigaltstack coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Kevin Wolf + * Copyright (C) 2012 Alex Barcelo +** This file is partly based on pth_mctx.c, from the GNU Portable Threads +** Copyright (c) 1999-2006 Ralf S. Engelschall + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ +#ifdef _FORTIFY_SOURCE +#undef _FORTIFY_SOURCE +#endif +#include +#include +#include +#include +#include +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +typedef struct { + Coroutine base; + void *stack; + sigjmp_buf env; +} CoroutineUContext; + +/** + * Per-thread coroutine bookkeeping + */ +typedef struct { + /** Currently executing coroutine */ + Coroutine *current; + + /** The default coroutine */ + CoroutineUContext leader; + + /** Information for the signal handler (trampoline) */ + sigjmp_buf tr_reenter; + volatile sig_atomic_t tr_called; + void *tr_handler; +} CoroutineThreadState; + +static pthread_key_t thread_state_key; + +static CoroutineThreadState *coroutine_get_thread_state(void) +{ + CoroutineThreadState *s = pthread_getspecific(thread_state_key); + + if (!s) { + s = g_malloc0(sizeof(*s)); + s->current = &s->leader.base; + pthread_setspecific(thread_state_key, s); + } + return s; +} + +static void qemu_coroutine_thread_cleanup(void *opaque) +{ + CoroutineThreadState *s = opaque; + + g_free(s); +} + +static void __attribute__((constructor)) coroutine_init(void) +{ + int ret; + + ret = pthread_key_create(&thread_state_key, qemu_coroutine_thread_cleanup); + if (ret != 0) { + fprintf(stderr, "unable to create leader key: %s\n", strerror(errno)); + abort(); + } +} + +/* "boot" function + * This is what starts the coroutine, is called from the trampoline + * (from the signal handler when it is not signal handling, read ahead + * for more information). + */ +static void coroutine_bootstrap(CoroutineUContext *self, Coroutine *co) +{ + /* Initialize longjmp environment and switch back the caller */ + if (!sigsetjmp(self->env, 0)) { + siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); + } + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +/* + * This is used as the signal handler. This is called with the brand new stack + * (thanks to sigaltstack). We have to return, given that this is a signal + * handler and the sigmask and some other things are changed. + */ +static void coroutine_trampoline(int signal) +{ + CoroutineUContext *self; + Coroutine *co; + CoroutineThreadState *coTS; + + /* Get the thread specific information */ + coTS = coroutine_get_thread_state(); + self = coTS->tr_handler; + coTS->tr_called = 1; + co = &self->base; + + /* + * Here we have to do a bit of a ping pong between the caller, given that + * this is a signal handler and we have to do a return "soon". Then the + * caller can reestablish everything and do a siglongjmp here again. + */ + if (!sigsetjmp(coTS->tr_reenter, 0)) { + return; + } + + /* + * Ok, the caller has siglongjmp'ed back to us, so now prepare + * us for the real machine state switching. We have to jump + * into another function here to get a new stack context for + * the auto variables (which have to be auto-variables + * because the start of the thread happens later). Else with + * PIC (i.e. Position Independent Code which is used when PTH + * is built as a shared library) most platforms would + * horrible core dump as experience showed. + */ + coroutine_bootstrap(self, co); +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineUContext *co; + CoroutineThreadState *coTS; + struct sigaction sa; + struct sigaction osa; + stack_t ss; + stack_t oss; + sigset_t sigs; + sigset_t osigs; + sigjmp_buf old_env; + + /* The way to manipulate stack is with the sigaltstack function. We + * prepare a stack, with it delivering a signal to ourselves and then + * put sigsetjmp/siglongjmp where needed. + * This has been done keeping coroutine-ucontext as a model and with the + * pth ideas (GNU Portable Threads). See coroutine-ucontext for the basics + * of the coroutines and see pth_mctx.c (from the pth project) for the + * sigaltstack way of manipulating stacks. + */ + + co = g_malloc0(sizeof(*co)); + co->stack = g_malloc(stack_size); + co->base.entry_arg = &old_env; /* stash away our jmp_buf */ + + coTS = coroutine_get_thread_state(); + coTS->tr_handler = co; + + /* + * Preserve the SIGUSR2 signal state, block SIGUSR2, + * and establish our signal handler. The signal will + * later transfer control onto the signal stack. + */ + sigemptyset(&sigs); + sigaddset(&sigs, SIGUSR2); + pthread_sigmask(SIG_BLOCK, &sigs, &osigs); + sa.sa_handler = coroutine_trampoline; + sigfillset(&sa.sa_mask); + sa.sa_flags = SA_ONSTACK; + if (sigaction(SIGUSR2, &sa, &osa) != 0) { + abort(); + } + + /* + * Set the new stack. + */ + ss.ss_sp = co->stack; + ss.ss_size = stack_size; + ss.ss_flags = 0; + if (sigaltstack(&ss, &oss) < 0) { + abort(); + } + + /* + * Now transfer control onto the signal stack and set it up. + * It will return immediately via "return" after the sigsetjmp() + * was performed. Be careful here with race conditions. The + * signal can be delivered the first time sigsuspend() is + * called. + */ + coTS->tr_called = 0; + pthread_kill(pthread_self(), SIGUSR2); + sigfillset(&sigs); + sigdelset(&sigs, SIGUSR2); + while (!coTS->tr_called) { + sigsuspend(&sigs); + } + + /* + * Inform the system that we are back off the signal stack by + * removing the alternative signal stack. Be careful here: It + * first has to be disabled, before it can be removed. + */ + sigaltstack(NULL, &ss); + ss.ss_flags = SS_DISABLE; + if (sigaltstack(&ss, NULL) < 0) { + abort(); + } + sigaltstack(NULL, &ss); + if (!(oss.ss_flags & SS_DISABLE)) { + sigaltstack(&oss, NULL); + } + + /* + * Restore the old SIGUSR2 signal handler and mask + */ + sigaction(SIGUSR2, &osa, NULL); + pthread_sigmask(SIG_SETMASK, &osigs, NULL); + + /* + * Now enter the trampoline again, but this time not as a signal + * handler. Instead we jump into it directly. The functionally + * redundant ping-pong pointer arithmetic is necessary to avoid + * type-conversion warnings related to the `volatile' qualifier and + * the fact that `jmp_buf' usually is an array type. + */ + if (!sigsetjmp(old_env, 0)) { + siglongjmp(coTS->tr_reenter, 1); + } + + /* + * Ok, we returned again, so now we're finished + */ + + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); + + g_free(co->stack); + g_free(co); +} + +CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); + CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); + CoroutineThreadState *s = coroutine_get_thread_state(); + int ret; + + s->current = to_; + + ret = sigsetjmp(from->env, 0); + if (ret == 0) { + siglongjmp(to->env, action); + } + return ret; +} + +Coroutine *qemu_coroutine_self(void) +{ + CoroutineThreadState *s = coroutine_get_thread_state(); + + return s->current; +} + +bool qemu_in_coroutine(void) +{ + CoroutineThreadState *s = pthread_getspecific(thread_state_key); + + return s && s->current->caller; +} + diff --git a/util/coroutine-ucontext.c b/util/coroutine-ucontext.c new file mode 100644 index 0000000000..26cbebb7a7 --- /dev/null +++ b/util/coroutine-ucontext.c @@ -0,0 +1,194 @@ +/* + * ucontext coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Kevin Wolf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.0 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ +#ifdef _FORTIFY_SOURCE +#undef _FORTIFY_SOURCE +#endif +#include +#include +#include +#include +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +#ifdef CONFIG_VALGRIND_H +#include +#endif + +typedef struct { + Coroutine base; + void *stack; + sigjmp_buf env; + +#ifdef CONFIG_VALGRIND_H + unsigned int valgrind_stack_id; +#endif + +} CoroutineUContext; + +/** + * Per-thread coroutine bookkeeping + */ +static __thread CoroutineUContext leader; +static __thread Coroutine *current; + +/* + * va_args to makecontext() must be type 'int', so passing + * the pointer we need may require several int args. This + * union is a quick hack to let us do that + */ +union cc_arg { + void *p; + int i[2]; +}; + +static void coroutine_trampoline(int i0, int i1) +{ + union cc_arg arg; + CoroutineUContext *self; + Coroutine *co; + + arg.i[0] = i0; + arg.i[1] = i1; + self = arg.p; + co = &self->base; + + /* Initialize longjmp environment and switch back the caller */ + if (!sigsetjmp(self->env, 0)) { + siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); + } + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineUContext *co; + ucontext_t old_uc, uc; + sigjmp_buf old_env; + union cc_arg arg = {0}; + + /* The ucontext functions preserve signal masks which incurs a + * system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not + * preserve signal masks but only works on the current stack. + * Since we need a way to create and switch to a new stack, use + * the ucontext functions for that but sigsetjmp()/siglongjmp() for + * everything else. + */ + + if (getcontext(&uc) == -1) { + abort(); + } + + co = g_malloc0(sizeof(*co)); + co->stack = g_malloc(stack_size); + co->base.entry_arg = &old_env; /* stash away our jmp_buf */ + + uc.uc_link = &old_uc; + uc.uc_stack.ss_sp = co->stack; + uc.uc_stack.ss_size = stack_size; + uc.uc_stack.ss_flags = 0; + +#ifdef CONFIG_VALGRIND_H + co->valgrind_stack_id = + VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size); +#endif + + arg.p = co; + + makecontext(&uc, (void (*)(void))coroutine_trampoline, + 2, arg.i[0], arg.i[1]); + + /* swapcontext() in, siglongjmp() back out */ + if (!sigsetjmp(old_env, 0)) { + swapcontext(&old_uc, &uc); + } + return &co->base; +} + +#ifdef CONFIG_VALGRIND_H +#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE +/* Work around an unused variable in the valgrind.h macro... */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-but-set-variable" +#endif +static inline void valgrind_stack_deregister(CoroutineUContext *co) +{ + VALGRIND_STACK_DEREGISTER(co->valgrind_stack_id); +} +#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE +#pragma GCC diagnostic pop +#endif +#endif + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); + +#ifdef CONFIG_VALGRIND_H + valgrind_stack_deregister(co); +#endif + + g_free(co->stack); + g_free(co); +} + +/* This function is marked noinline to prevent GCC from inlining it + * into coroutine_trampoline(). If we allow it to do that then it + * hoists the code to get the address of the TLS variable "current" + * out of the while() loop. This is an invalid transformation because + * the sigsetjmp() call may be called when running thread A but + * return in thread B, and so we might be in a different thread + * context each time round the loop. + */ +CoroutineAction __attribute__((noinline)) +qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); + CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); + int ret; + + current = to_; + + ret = sigsetjmp(from->env, 0); + if (ret == 0) { + siglongjmp(to->env, action); + } + return ret; +} + +Coroutine *qemu_coroutine_self(void) +{ + if (!current) { + current = &leader.base; + } + return current; +} + +bool qemu_in_coroutine(void) +{ + return current && current->caller; +} diff --git a/util/coroutine-win32.c b/util/coroutine-win32.c new file mode 100644 index 0000000000..4f922c53af --- /dev/null +++ b/util/coroutine-win32.c @@ -0,0 +1,101 @@ +/* + * Win32 coroutine initialization code + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +typedef struct +{ + Coroutine base; + + LPVOID fiber; + CoroutineAction action; +} CoroutineWin32; + +static __thread CoroutineWin32 leader; +static __thread Coroutine *current; + +/* This function is marked noinline to prevent GCC from inlining it + * into coroutine_trampoline(). If we allow it to do that then it + * hoists the code to get the address of the TLS variable "current" + * out of the while() loop. This is an invalid transformation because + * the SwitchToFiber() call may be called when running thread A but + * return in thread B, and so we might be in a different thread + * context each time round the loop. + */ +CoroutineAction __attribute__((noinline)) +qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineWin32 *from = DO_UPCAST(CoroutineWin32, base, from_); + CoroutineWin32 *to = DO_UPCAST(CoroutineWin32, base, to_); + + current = to_; + + to->action = action; + SwitchToFiber(to->fiber); + return from->action; +} + +static void CALLBACK coroutine_trampoline(void *co_) +{ + Coroutine *co = co_; + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineWin32 *co; + + co = g_malloc0(sizeof(*co)); + co->fiber = CreateFiber(stack_size, coroutine_trampoline, &co->base); + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineWin32 *co = DO_UPCAST(CoroutineWin32, base, co_); + + DeleteFiber(co->fiber); + g_free(co); +} + +Coroutine *qemu_coroutine_self(void) +{ + if (!current) { + current = &leader.base; + leader.fiber = ConvertThreadToFiber(NULL); + } + return current; +} + +bool qemu_in_coroutine(void) +{ + return current && current->caller; +} diff --git a/util/qemu-coroutine-io.c b/util/qemu-coroutine-io.c new file mode 100644 index 0000000000..e1eae7331e --- /dev/null +++ b/util/qemu-coroutine-io.c @@ -0,0 +1,91 @@ +/* + * Coroutine-aware I/O functions + * + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation. + * Copyright (c) 2011, Red Hat, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu-common.h" +#include "qemu/sockets.h" +#include "qemu/coroutine.h" +#include "qemu/iov.h" +#include "qemu/main-loop.h" + +ssize_t coroutine_fn +qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt, + size_t offset, size_t bytes, bool do_send) +{ + size_t done = 0; + ssize_t ret; + int err; + while (done < bytes) { + ret = iov_send_recv(sockfd, iov, iov_cnt, + offset + done, bytes - done, do_send); + if (ret > 0) { + done += ret; + } else if (ret < 0) { + err = socket_error(); + if (err == EAGAIN || err == EWOULDBLOCK) { + qemu_coroutine_yield(); + } else if (done == 0) { + return -err; + } else { + break; + } + } else if (ret == 0 && !do_send) { + /* write (send) should never return 0. + * read (recv) returns 0 for end-of-file (-data). + * In both cases there's little point retrying, + * but we do for write anyway, just in case */ + break; + } + } + return done; +} + +ssize_t coroutine_fn +qemu_co_send_recv(int sockfd, void *buf, size_t bytes, bool do_send) +{ + struct iovec iov = { .iov_base = buf, .iov_len = bytes }; + return qemu_co_sendv_recvv(sockfd, &iov, 1, 0, bytes, do_send); +} + +typedef struct { + Coroutine *co; + int fd; +} FDYieldUntilData; + +static void fd_coroutine_enter(void *opaque) +{ + FDYieldUntilData *data = opaque; + qemu_set_fd_handler(data->fd, NULL, NULL, NULL); + qemu_coroutine_enter(data->co, NULL); +} + +void coroutine_fn yield_until_fd_readable(int fd) +{ + FDYieldUntilData data; + + assert(qemu_in_coroutine()); + data.co = qemu_coroutine_self(); + data.fd = fd; + qemu_set_fd_handler(fd, fd_coroutine_enter, NULL, &data); + qemu_coroutine_yield(); +} diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c new file mode 100644 index 0000000000..130ee19d17 --- /dev/null +++ b/util/qemu-coroutine-lock.c @@ -0,0 +1,186 @@ +/* + * coroutine queues and locks + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu-common.h" +#include "qemu/coroutine.h" +#include "qemu/coroutine_int.h" +#include "qemu/queue.h" +#include "trace.h" + +void qemu_co_queue_init(CoQueue *queue) +{ + QTAILQ_INIT(&queue->entries); +} + +void coroutine_fn qemu_co_queue_wait(CoQueue *queue) +{ + Coroutine *self = qemu_coroutine_self(); + QTAILQ_INSERT_TAIL(&queue->entries, self, co_queue_next); + qemu_coroutine_yield(); + assert(qemu_in_coroutine()); +} + +/** + * qemu_co_queue_run_restart: + * + * Enter each coroutine that was previously marked for restart by + * qemu_co_queue_next() or qemu_co_queue_restart_all(). This function is + * invoked by the core coroutine code when the current coroutine yields or + * terminates. + */ +void qemu_co_queue_run_restart(Coroutine *co) +{ + Coroutine *next; + + trace_qemu_co_queue_run_restart(co); + while ((next = QTAILQ_FIRST(&co->co_queue_wakeup))) { + QTAILQ_REMOVE(&co->co_queue_wakeup, next, co_queue_next); + qemu_coroutine_enter(next, NULL); + } +} + +static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) +{ + Coroutine *self = qemu_coroutine_self(); + Coroutine *next; + + if (QTAILQ_EMPTY(&queue->entries)) { + return false; + } + + while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) { + QTAILQ_REMOVE(&queue->entries, next, co_queue_next); + QTAILQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next); + trace_qemu_co_queue_next(next); + if (single) { + break; + } + } + return true; +} + +bool coroutine_fn qemu_co_queue_next(CoQueue *queue) +{ + assert(qemu_in_coroutine()); + return qemu_co_queue_do_restart(queue, true); +} + +void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue) +{ + assert(qemu_in_coroutine()); + qemu_co_queue_do_restart(queue, false); +} + +bool qemu_co_enter_next(CoQueue *queue) +{ + Coroutine *next; + + next = QTAILQ_FIRST(&queue->entries); + if (!next) { + return false; + } + + QTAILQ_REMOVE(&queue->entries, next, co_queue_next); + qemu_coroutine_enter(next, NULL); + return true; +} + +bool qemu_co_queue_empty(CoQueue *queue) +{ + return QTAILQ_FIRST(&queue->entries) == NULL; +} + +void qemu_co_mutex_init(CoMutex *mutex) +{ + memset(mutex, 0, sizeof(*mutex)); + qemu_co_queue_init(&mutex->queue); +} + +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) +{ + Coroutine *self = qemu_coroutine_self(); + + trace_qemu_co_mutex_lock_entry(mutex, self); + + while (mutex->locked) { + qemu_co_queue_wait(&mutex->queue); + } + + mutex->locked = true; + + trace_qemu_co_mutex_lock_return(mutex, self); +} + +void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) +{ + Coroutine *self = qemu_coroutine_self(); + + trace_qemu_co_mutex_unlock_entry(mutex, self); + + assert(mutex->locked == true); + assert(qemu_in_coroutine()); + + mutex->locked = false; + qemu_co_queue_next(&mutex->queue); + + trace_qemu_co_mutex_unlock_return(mutex, self); +} + +void qemu_co_rwlock_init(CoRwlock *lock) +{ + memset(lock, 0, sizeof(*lock)); + qemu_co_queue_init(&lock->queue); +} + +void qemu_co_rwlock_rdlock(CoRwlock *lock) +{ + while (lock->writer) { + qemu_co_queue_wait(&lock->queue); + } + lock->reader++; +} + +void qemu_co_rwlock_unlock(CoRwlock *lock) +{ + assert(qemu_in_coroutine()); + if (lock->writer) { + lock->writer = false; + qemu_co_queue_restart_all(&lock->queue); + } else { + lock->reader--; + assert(lock->reader >= 0); + /* Wakeup only one waiting writer */ + if (!lock->reader) { + qemu_co_queue_next(&lock->queue); + } + } +} + +void qemu_co_rwlock_wrlock(CoRwlock *lock) +{ + while (lock->writer || lock->reader) { + qemu_co_queue_wait(&lock->queue); + } + lock->writer = true; +} diff --git a/util/qemu-coroutine-sleep.c b/util/qemu-coroutine-sleep.c new file mode 100644 index 0000000000..b35db56356 --- /dev/null +++ b/util/qemu-coroutine-sleep.c @@ -0,0 +1,41 @@ +/* + * QEMU coroutine sleep + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#include "qemu/coroutine.h" +#include "qemu/timer.h" +#include "block/aio.h" + +typedef struct CoSleepCB { + QEMUTimer *ts; + Coroutine *co; +} CoSleepCB; + +static void co_sleep_cb(void *opaque) +{ + CoSleepCB *sleep_cb = opaque; + + qemu_coroutine_enter(sleep_cb->co, NULL); +} + +void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, + int64_t ns) +{ + CoSleepCB sleep_cb = { + .co = qemu_coroutine_self(), + }; + sleep_cb.ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, &sleep_cb); + timer_mod(sleep_cb.ts, qemu_clock_get_ns(type) + ns); + qemu_coroutine_yield(); + timer_del(sleep_cb.ts); + timer_free(sleep_cb.ts); +} diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c new file mode 100644 index 0000000000..8953560223 --- /dev/null +++ b/util/qemu-coroutine.c @@ -0,0 +1,146 @@ +/* + * QEMU coroutines + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * Kevin Wolf + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#include "trace.h" +#include "qemu-common.h" +#include "qemu/thread.h" +#include "qemu/atomic.h" +#include "qemu/coroutine.h" +#include "qemu/coroutine_int.h" + +enum { + POOL_BATCH_SIZE = 64, +}; + +/** Free list to speed up creation */ +static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool); +static unsigned int release_pool_size; +static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool); +static __thread unsigned int alloc_pool_size; +static __thread Notifier coroutine_pool_cleanup_notifier; + +static void coroutine_pool_cleanup(Notifier *n, void *value) +{ + Coroutine *co; + Coroutine *tmp; + + QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) { + QSLIST_REMOVE_HEAD(&alloc_pool, pool_next); + qemu_coroutine_delete(co); + } +} + +Coroutine *qemu_coroutine_create(CoroutineEntry *entry) +{ + Coroutine *co = NULL; + + if (CONFIG_COROUTINE_POOL) { + co = QSLIST_FIRST(&alloc_pool); + if (!co) { + if (release_pool_size > POOL_BATCH_SIZE) { + /* Slow path; a good place to register the destructor, too. */ + if (!coroutine_pool_cleanup_notifier.notify) { + coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup; + qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier); + } + + /* This is not exact; there could be a little skew between + * release_pool_size and the actual size of release_pool. But + * it is just a heuristic, it does not need to be perfect. + */ + alloc_pool_size = atomic_xchg(&release_pool_size, 0); + QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool); + co = QSLIST_FIRST(&alloc_pool); + } + } + if (co) { + QSLIST_REMOVE_HEAD(&alloc_pool, pool_next); + alloc_pool_size--; + } + } + + if (!co) { + co = qemu_coroutine_new(); + } + + co->entry = entry; + QTAILQ_INIT(&co->co_queue_wakeup); + return co; +} + +static void coroutine_delete(Coroutine *co) +{ + co->caller = NULL; + + if (CONFIG_COROUTINE_POOL) { + if (release_pool_size < POOL_BATCH_SIZE * 2) { + QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next); + atomic_inc(&release_pool_size); + return; + } + if (alloc_pool_size < POOL_BATCH_SIZE) { + QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next); + alloc_pool_size++; + return; + } + } + + qemu_coroutine_delete(co); +} + +void qemu_coroutine_enter(Coroutine *co, void *opaque) +{ + Coroutine *self = qemu_coroutine_self(); + CoroutineAction ret; + + trace_qemu_coroutine_enter(self, co, opaque); + + if (co->caller) { + fprintf(stderr, "Co-routine re-entered recursively\n"); + abort(); + } + + co->caller = self; + co->entry_arg = opaque; + ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER); + + qemu_co_queue_run_restart(co); + + switch (ret) { + case COROUTINE_YIELD: + return; + case COROUTINE_TERMINATE: + trace_qemu_coroutine_terminate(co); + coroutine_delete(co); + return; + default: + abort(); + } +} + +void coroutine_fn qemu_coroutine_yield(void) +{ + Coroutine *self = qemu_coroutine_self(); + Coroutine *to = self->caller; + + trace_qemu_coroutine_yield(self, to); + + if (!to) { + fprintf(stderr, "Co-routine is yielding to no one\n"); + abort(); + } + + self->caller = NULL; + qemu_coroutine_switch(self, to, COROUTINE_YIELD); +} -- cgit v1.2.1