diff options
Diffstat (limited to 'monitor.c')
-rw-r--r-- | monitor.c | 706 |
1 files changed, 625 insertions, 81 deletions
@@ -35,6 +35,8 @@ #include "net/net.h" #include "net/slirp.h" #include "chardev/char-fe.h" +#include "chardev/char-io.h" +#include "chardev/char-mux.h" #include "ui/qemu-spice.h" #include "sysemu/numa.h" #include "monitor/monitor.h" @@ -58,6 +60,7 @@ #include "qapi/qmp/qjson.h" #include "qapi/qmp/json-streamer.h" #include "qapi/qmp/json-parser.h" +#include "qapi/qmp/qlist.h" #include "qom/object_interfaces.h" #include "trace-root.h" #include "trace/control.h" @@ -79,6 +82,7 @@ #include "qapi/qapi-introspect.h" #include "sysemu/qtest.h" #include "sysemu/cpus.h" +#include "sysemu/iothread.h" #include "qemu/cutils.h" #if defined(TARGET_S390X) @@ -168,6 +172,16 @@ typedef struct { * mode. */ QmpCommandList *commands; + bool qmp_caps[QMP_CAPABILITY__MAX]; + /* + * Protects qmp request/response queue. Please take monitor_lock + * first when used together. + */ + QemuMutex qmp_queue_lock; + /* Input queue that holds all the parsed QMP requests */ + GQueue *qmp_requests; + /* Output queue contains all the QMP responses in order */ + GQueue *qmp_responses; } MonitorQMP; /* @@ -190,9 +204,11 @@ struct Monitor { CharBackend chr; int reset_seen; int flags; - int suspend_cnt; + int suspend_cnt; /* Needs to be accessed atomically */ bool skip_flush; + bool use_io_thr; + /* We can't access guest memory when holding the lock */ QemuMutex out_lock; QString *outbuf; guint out_watch; @@ -207,16 +223,25 @@ struct Monitor { void *password_opaque; mon_cmd_t *cmd_table; QLIST_HEAD(,mon_fd_t) fds; - QLIST_ENTRY(Monitor) entry; + QTAILQ_ENTRY(Monitor) entry; }; +/* Let's add monitor global variables to this struct. */ +static struct { + IOThread *mon_iothread; + /* Bottom half to dispatch the requests received from IO thread */ + QEMUBH *qmp_dispatcher_bh; + /* Bottom half to deliver the responses back to clients */ + QEMUBH *qmp_respond_bh; +} mon_global; + /* QMP checker flags */ #define QMP_ACCEPT_UNKNOWNS 1 /* Protects mon_list, monitor_event_state. */ static QemuMutex monitor_lock; -static QLIST_HEAD(mon_list, Monitor) mon_list; +static QTAILQ_HEAD(mon_list, Monitor) mon_list; static QLIST_HEAD(mon_fdsets, MonFdset) mon_fdsets; static int mon_refcount; @@ -241,6 +266,21 @@ static inline bool monitor_is_qmp(const Monitor *mon) } /** + * Whether @mon is using readline? Note: not all HMP monitors use + * readline, e.g., gdbserver has a non-interactive HMP monitor, so + * readline is not used there. + */ +static inline bool monitor_uses_readline(const Monitor *mon) +{ + return mon->flags & MONITOR_USE_READLINE; +} + +static inline bool monitor_is_hmp_non_interactive(const Monitor *mon) +{ + return !monitor_is_qmp(mon) && !monitor_uses_readline(mon); +} + +/** * Is the current monitor, if any, a QMP monitor? */ bool monitor_cur_is_qmp(void) @@ -382,7 +422,8 @@ int monitor_fprintf(FILE *stream, const char *fmt, ...) return 0; } -static void monitor_json_emitter(Monitor *mon, const QObject *data) +static void monitor_json_emitter_raw(Monitor *mon, + QObject *data) { QString *json; @@ -396,6 +437,71 @@ static void monitor_json_emitter(Monitor *mon, const QObject *data) QDECREF(json); } +static void monitor_json_emitter(Monitor *mon, QObject *data) +{ + if (mon->use_io_thr) { + /* + * If using IO thread, we need to queue the item so that IO + * thread will do the rest for us. Take refcount so that + * caller won't free the data (which will be finally freed in + * responder thread). + */ + qobject_incref(data); + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); + g_queue_push_tail(mon->qmp.qmp_responses, (void *)data); + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); + qemu_bh_schedule(mon_global.qmp_respond_bh); + } else { + /* + * If not using monitor IO thread, then we are in main thread. + * Do the emission right away. + */ + monitor_json_emitter_raw(mon, data); + } +} + +struct QMPResponse { + Monitor *mon; + QObject *data; +}; +typedef struct QMPResponse QMPResponse; + +/* + * Return one QMPResponse. The response is only valid if + * response.data is not NULL. + */ +static QMPResponse monitor_qmp_response_pop_one(void) +{ + Monitor *mon; + QObject *data = NULL; + + qemu_mutex_lock(&monitor_lock); + QTAILQ_FOREACH(mon, &mon_list, entry) { + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); + data = g_queue_pop_head(mon->qmp.qmp_responses); + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); + if (data) { + break; + } + } + qemu_mutex_unlock(&monitor_lock); + return (QMPResponse) { .mon = mon, .data = data }; +} + +static void monitor_qmp_bh_responder(void *opaque) +{ + QMPResponse response; + + while (true) { + response = monitor_qmp_response_pop_one(); + if (!response.data) { + break; + } + monitor_json_emitter_raw(response.mon, response.data); + qobject_decref(response.data); + } +} + static MonitorQAPIEventConf monitor_qapi_event_conf[QAPI_EVENT__MAX] = { /* Limit guest-triggerable events to 1 per second */ [QAPI_EVENT_RTC_CHANGE] = { 1000 * SCALE_MS }, @@ -417,7 +523,7 @@ static void monitor_qapi_event_emit(QAPIEvent event, QDict *qdict) Monitor *mon; trace_monitor_protocol_event_emit(event, qdict); - QLIST_FOREACH(mon, &mon_list, entry) { + QTAILQ_FOREACH(mon, &mon_list, entry) { if (monitor_is_qmp(mon) && mon->qmp.commands != &qmp_cap_negotiation_commands) { monitor_json_emitter(mon, QOBJECT(qdict)); @@ -447,7 +553,7 @@ monitor_qapi_event_queue(QAPIEvent event, QDict *qdict, Error **errp) /* Unthrottled event */ monitor_qapi_event_emit(event, qdict); } else { - QDict *data = qobject_to_qdict(qdict_get(qdict, "data")); + QDict *data = qobject_to(QDict, qdict_get(qdict, "data")); MonitorQAPIEventState key = { .event = event, .data = data }; evstate = g_hash_table_lookup(monitor_qapi_event_state, &key); @@ -570,13 +676,19 @@ static void monitor_qapi_event_init(void) static void handle_hmp_command(Monitor *mon, const char *cmdline); -static void monitor_data_init(Monitor *mon) +static void monitor_data_init(Monitor *mon, bool skip_flush, + bool use_io_thr) { memset(mon, 0, sizeof(Monitor)); qemu_mutex_init(&mon->out_lock); + qemu_mutex_init(&mon->qmp.qmp_queue_lock); mon->outbuf = qstring_new(); /* Use *mon_cmds by default. */ mon->cmd_table = mon_cmds; + mon->skip_flush = skip_flush; + mon->use_io_thr = use_io_thr; + mon->qmp.qmp_requests = g_queue_new(); + mon->qmp.qmp_responses = g_queue_new(); } static void monitor_data_destroy(Monitor *mon) @@ -589,6 +701,9 @@ static void monitor_data_destroy(Monitor *mon) readline_free(mon->rs); QDECREF(mon->outbuf); qemu_mutex_destroy(&mon->out_lock); + qemu_mutex_destroy(&mon->qmp.qmp_queue_lock); + g_queue_free(mon->qmp.qmp_requests); + g_queue_free(mon->qmp.qmp_responses); } char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index, @@ -597,8 +712,7 @@ char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index, char *output = NULL; Monitor *old_mon, hmp; - monitor_data_init(&hmp); - hmp.skip_flush = true; + monitor_data_init(&hmp, true, false); old_mon = cur_mon; cur_mon = &hmp; @@ -956,7 +1070,7 @@ EventInfoList *qmp_query_events(Error **errp) static void qmp_query_qmp_schema(QDict *qdict, QObject **ret_data, Error **errp) { - *ret_data = qobject_from_json(qmp_schema_json, &error_abort); + *ret_data = qobject_from_qlit(&qmp_schema_qlit); } /* @@ -1006,7 +1120,7 @@ static void qmp_unregister_commands_hack(void) #endif } -void monitor_init_qmp_commands(void) +static void monitor_init_qmp_commands(void) { /* * Two command lists: @@ -1032,8 +1146,90 @@ void monitor_init_qmp_commands(void) qmp_marshal_qmp_capabilities, QCO_NO_OPTIONS); } -void qmp_qmp_capabilities(Error **errp) +static bool qmp_cap_enabled(Monitor *mon, QMPCapability cap) +{ + return mon->qmp.qmp_caps[cap]; +} + +static bool qmp_oob_enabled(Monitor *mon) +{ + return qmp_cap_enabled(mon, QMP_CAPABILITY_OOB); +} + +static void qmp_caps_check(Monitor *mon, QMPCapabilityList *list, + Error **errp) +{ + for (; list; list = list->next) { + assert(list->value < QMP_CAPABILITY__MAX); + switch (list->value) { + case QMP_CAPABILITY_OOB: + if (!mon->use_io_thr) { + /* + * Out-Of-Band only works with monitors that are + * running on dedicated IOThread. + */ + error_setg(errp, "This monitor does not support " + "Out-Of-Band (OOB)"); + return; + } + break; + default: + break; + } + } +} + +/* This function should only be called after capabilities are checked. */ +static void qmp_caps_apply(Monitor *mon, QMPCapabilityList *list) +{ + for (; list; list = list->next) { + mon->qmp.qmp_caps[list->value] = true; + } +} + +/* + * Return true if check successful, or false otherwise. When false is + * returned, detailed error will be in errp if provided. + */ +static bool qmp_cmd_oob_check(Monitor *mon, QDict *req, Error **errp) +{ + const char *command; + QmpCommand *cmd; + + command = qdict_get_try_str(req, "execute"); + if (!command) { + error_setg(errp, "Command field 'execute' missing"); + return false; + } + + cmd = qmp_find_command(mon->qmp.commands, command); + if (!cmd) { + error_set(errp, ERROR_CLASS_COMMAND_NOT_FOUND, + "The command %s has not been found", command); + return false; + } + + if (qmp_is_oob(req)) { + if (!qmp_oob_enabled(mon)) { + error_setg(errp, "Please enable Out-Of-Band first " + "for the session during capabilities negotiation"); + return false; + } + if (!(cmd->options & QCO_ALLOW_OOB)) { + error_setg(errp, "The command %s does not support OOB", + command); + return false; + } + } + + return true; +} + +void qmp_qmp_capabilities(bool has_enable, QMPCapabilityList *enable, + Error **errp) { + Error *local_err = NULL; + if (cur_mon->qmp.commands == &qmp_commands) { error_set(errp, ERROR_CLASS_COMMAND_NOT_FOUND, "Capabilities negotiation is already complete, command " @@ -1041,6 +1237,21 @@ void qmp_qmp_capabilities(Error **errp) return; } + /* Enable QMP capabilities provided by the client if applicable. */ + if (has_enable) { + qmp_caps_check(cur_mon, enable, &local_err); + if (local_err) { + /* + * Failed check on any of the capabilities will fail the + * entire command (and thus not apply any of the other + * capabilities that were also requested). + */ + error_propagate(errp, local_err); + return; + } + qmp_caps_apply(cur_mon, enable); + } + cur_mon->qmp.commands = &qmp_commands; } @@ -3758,31 +3969,74 @@ static int monitor_can_read(void *opaque) { Monitor *mon = opaque; - return (mon->suspend_cnt == 0) ? 1 : 0; + return !atomic_mb_read(&mon->suspend_cnt); } -static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) +/* + * 1. This function takes ownership of rsp, err, and id. + * 2. rsp, err, and id may be NULL. + * 3. If err != NULL then rsp must be NULL. + */ +static void monitor_qmp_respond(Monitor *mon, QObject *rsp, + Error *err, QObject *id) { - QObject *req, *rsp = NULL, *id = NULL; QDict *qdict = NULL; - Monitor *mon = cur_mon; - Error *err = NULL; - req = json_parser_parse_err(tokens, NULL, &err); - if (!req && !err) { - /* json_parser_parse_err() sucks: can fail without setting @err */ - error_setg(&err, QERR_JSON_PARSING); - } if (err) { - goto err_out; + assert(!rsp); + qdict = qdict_new(); + qdict_put_obj(qdict, "error", qmp_build_error_object(err)); + error_free(err); + rsp = QOBJECT(qdict); + } + + if (rsp) { + if (id) { + /* This is for the qdict below. */ + qobject_incref(id); + qdict_put_obj(qobject_to(QDict, rsp), "id", id); + } + + monitor_json_emitter(mon, rsp); } - qdict = qobject_to_qdict(req); - if (qdict) { - id = qdict_get(qdict, "id"); - qobject_incref(id); - qdict_del(qdict, "id"); - } /* else will fail qmp_dispatch() */ + qobject_decref(id); + qobject_decref(rsp); +} + +struct QMPRequest { + /* Owner of the request */ + Monitor *mon; + /* "id" field of the request */ + QObject *id; + /* Request object to be handled */ + QObject *req; + /* + * Whether we need to resume the monitor afterward. This flag is + * used to emulate the old QMP server behavior that the current + * command must be completed before execution of the next one. + */ + bool need_resume; +}; +typedef struct QMPRequest QMPRequest; + +/* + * Dispatch one single QMP request. The function will free the req_obj + * and objects inside it before return. + */ +static void monitor_qmp_dispatch_one(QMPRequest *req_obj) +{ + Monitor *mon, *old_mon; + QObject *req, *rsp = NULL, *id; + QDict *qdict = NULL; + bool need_resume; + + req = req_obj->req; + mon = req_obj->mon; + id = req_obj->id; + need_resume = req_obj->need_resume; + + g_free(req_obj); if (trace_event_get_state_backends(TRACE_HANDLE_QMP_COMMAND)) { QString *req_json = qobject_to_json(req); @@ -3790,10 +4044,15 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) QDECREF(req_json); } - rsp = qmp_dispatch(cur_mon->qmp.commands, req); + old_mon = cur_mon; + cur_mon = mon; + + rsp = qmp_dispatch(mon->qmp.commands, req); + + cur_mon = old_mon; if (mon->qmp.commands == &qmp_cap_negotiation_commands) { - qdict = qdict_get_qdict(qobject_to_qdict(rsp), "error"); + qdict = qdict_get_qdict(qobject_to(QDict, rsp), "error"); if (qdict && !g_strcmp0(qdict_get_try_str(qdict, "class"), QapiErrorClass_str(ERROR_CLASS_COMMAND_NOT_FOUND))) { @@ -3804,37 +4063,171 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) } } -err_out: - if (err) { - qdict = qdict_new(); - qdict_put_obj(qdict, "error", qmp_build_error_object(err)); - error_free(err); - rsp = QOBJECT(qdict); + /* Respond if necessary */ + monitor_qmp_respond(mon, rsp, NULL, id); + + /* This pairs with the monitor_suspend() in handle_qmp_command(). */ + if (need_resume) { + monitor_resume(mon); } - if (rsp) { - if (id) { - qdict_put_obj(qobject_to_qdict(rsp), "id", id); - id = NULL; + qobject_decref(req); +} + +/* + * Pop one QMP request from monitor queues, return NULL if not found. + * We are using round-robin fashion to pop the request, to avoid + * processing commands only on a very busy monitor. To achieve that, + * when we process one request on a specific monitor, we put that + * monitor to the end of mon_list queue. + */ +static QMPRequest *monitor_qmp_requests_pop_one(void) +{ + QMPRequest *req_obj = NULL; + Monitor *mon; + + qemu_mutex_lock(&monitor_lock); + + QTAILQ_FOREACH(mon, &mon_list, entry) { + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); + req_obj = g_queue_pop_head(mon->qmp.qmp_requests); + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); + if (req_obj) { + break; } + } - monitor_json_emitter(mon, rsp); + if (req_obj) { + /* + * We found one request on the monitor. Degrade this monitor's + * priority to lowest by re-inserting it to end of queue. + */ + QTAILQ_REMOVE(&mon_list, mon, entry); + QTAILQ_INSERT_TAIL(&mon_list, mon, entry); } - qobject_decref(id); - qobject_decref(rsp); - qobject_decref(req); + qemu_mutex_unlock(&monitor_lock); + + return req_obj; } -static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) +static void monitor_qmp_bh_dispatcher(void *data) { - Monitor *old_mon = cur_mon; + QMPRequest *req_obj = monitor_qmp_requests_pop_one(); - cur_mon = opaque; + if (req_obj) { + trace_monitor_qmp_cmd_in_band(qobject_get_try_str(req_obj->id) ?: ""); + monitor_qmp_dispatch_one(req_obj); + /* Reschedule instead of looping so the main loop stays responsive */ + qemu_bh_schedule(mon_global.qmp_dispatcher_bh); + } +} - json_message_parser_feed(&cur_mon->qmp.parser, (const char *) buf, size); +#define QMP_REQ_QUEUE_LEN_MAX (8) - cur_mon = old_mon; +static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) +{ + QObject *req, *id = NULL; + QDict *qdict = NULL; + MonitorQMP *mon_qmp = container_of(parser, MonitorQMP, parser); + Monitor *mon = container_of(mon_qmp, Monitor, qmp); + Error *err = NULL; + QMPRequest *req_obj; + + req = json_parser_parse_err(tokens, NULL, &err); + if (!req && !err) { + /* json_parser_parse_err() sucks: can fail without setting @err */ + error_setg(&err, QERR_JSON_PARSING); + } + if (err) { + goto err; + } + + /* Check against the request in general layout */ + qdict = qmp_dispatch_check_obj(req, &err); + if (!qdict) { + goto err; + } + + /* Check against OOB specific */ + if (!qmp_cmd_oob_check(mon, qdict, &err)) { + goto err; + } + + id = qdict_get(qdict, "id"); + + /* When OOB is enabled, the "id" field is mandatory. */ + if (qmp_oob_enabled(mon) && !id) { + error_setg(&err, "Out-Of-Band capability requires that " + "every command contains an 'id' field"); + goto err; + } + + qobject_incref(id); + qdict_del(qdict, "id"); + + req_obj = g_new0(QMPRequest, 1); + req_obj->mon = mon; + req_obj->id = id; + req_obj->req = req; + req_obj->need_resume = false; + + if (qmp_is_oob(qdict)) { + /* Out-Of-Band (OOB) requests are executed directly in parser. */ + trace_monitor_qmp_cmd_out_of_band(qobject_get_try_str(req_obj->id) + ?: ""); + monitor_qmp_dispatch_one(req_obj); + return; + } + + /* Protect qmp_requests and fetching its length. */ + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); + + /* + * If OOB is not enabled on the current monitor, we'll emulate the + * old behavior that we won't process the current monitor any more + * until it has responded. This helps make sure that as long as + * OOB is not enabled, the server will never drop any command. + */ + if (!qmp_oob_enabled(mon)) { + monitor_suspend(mon); + req_obj->need_resume = true; + } else { + /* Drop the request if queue is full. */ + if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX) { + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); + qapi_event_send_command_dropped(id, + COMMAND_DROP_REASON_QUEUE_FULL, + &error_abort); + qobject_decref(id); + qobject_decref(req); + g_free(req_obj); + return; + } + } + + /* + * Put the request to the end of queue so that requests will be + * handled in time order. Ownership for req_obj, req, id, + * etc. will be delivered to the handler side. + */ + g_queue_push_tail(mon->qmp.qmp_requests, req_obj); + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); + + /* Kick the dispatcher routine */ + qemu_bh_schedule(mon_global.qmp_dispatcher_bh); + return; + +err: + monitor_qmp_respond(mon, NULL, err, NULL); + qobject_decref(req); +} + +static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) +{ + Monitor *mon = opaque; + + json_message_parser_feed(&mon->qmp.parser, (const char *) buf, size); } static void monitor_read(void *opaque, const uint8_t *buf, int size) @@ -3869,28 +4262,70 @@ static void monitor_command_cb(void *opaque, const char *cmdline, int monitor_suspend(Monitor *mon) { - if (!mon->rs) + if (monitor_is_hmp_non_interactive(mon)) { return -ENOTTY; - mon->suspend_cnt++; + } + + atomic_inc(&mon->suspend_cnt); + + if (monitor_is_qmp(mon)) { + /* + * Kick iothread to make sure this takes effect. It'll be + * evaluated again in prepare() of the watch object. + */ + aio_notify(iothread_get_aio_context(mon_global.mon_iothread)); + } + + trace_monitor_suspend(mon, 1); return 0; } void monitor_resume(Monitor *mon) { - if (!mon->rs) + if (monitor_is_hmp_non_interactive(mon)) { return; - if (--mon->suspend_cnt == 0) - readline_show_prompt(mon->rs); + } + + if (atomic_dec_fetch(&mon->suspend_cnt) == 0) { + if (monitor_is_qmp(mon)) { + /* + * For QMP monitors that are running in IOThread, let's + * kick the thread in case it's sleeping. + */ + if (mon->use_io_thr) { + aio_notify(iothread_get_aio_context(mon_global.mon_iothread)); + } + } else { + assert(mon->rs); + readline_show_prompt(mon->rs); + } + } + trace_monitor_suspend(mon, -1); } -static QObject *get_qmp_greeting(void) +static QObject *get_qmp_greeting(Monitor *mon) { + QList *cap_list = qlist_new(); QObject *ver = NULL; + QMPCapability cap; qmp_marshal_query_version(NULL, &ver, NULL); - return qobject_from_jsonf("{'QMP': {'version': %p, 'capabilities': []}}", - ver); + for (cap = 0; cap < QMP_CAPABILITY__MAX; cap++) { + if (!mon->use_io_thr && cap == QMP_CAPABILITY_OOB) { + /* Monitors that are not using IOThread won't support OOB */ + continue; + } + qlist_append(cap_list, qstring_from_str(QMPCapability_str(cap))); + } + + return qobject_from_jsonf("{'QMP': {'version': %p, 'capabilities': %p}}", + ver, cap_list); +} + +static void monitor_qmp_caps_reset(Monitor *mon) +{ + memset(mon->qmp.qmp_caps, 0, sizeof(mon->qmp.qmp_caps)); } static void monitor_qmp_event(void *opaque, int event) @@ -3901,7 +4336,8 @@ static void monitor_qmp_event(void *opaque, int event) switch (event) { case CHR_EVENT_OPENED: mon->qmp.commands = &qmp_cap_negotiation_commands; - data = get_qmp_greeting(); + monitor_qmp_caps_reset(mon); + data = get_qmp_greeting(mon); monitor_json_emitter(mon, data); qobject_decref(data); mon_refcount++; @@ -3929,19 +4365,19 @@ static void monitor_event(void *opaque, int event) monitor_resume(mon); monitor_flush(mon); } else { - mon->suspend_cnt = 0; + atomic_mb_set(&mon->suspend_cnt, 0); } break; case CHR_EVENT_MUX_OUT: if (mon->reset_seen) { - if (mon->suspend_cnt == 0) { + if (atomic_mb_read(&mon->suspend_cnt) == 0) { monitor_printf(mon, "\n"); } monitor_flush(mon); monitor_suspend(mon); } else { - mon->suspend_cnt++; + atomic_inc(&mon->suspend_cnt); } qemu_mutex_lock(&mon->out_lock); mon->mux_out = 1; @@ -3985,6 +4421,49 @@ static void sortcmdlist(void) qsort((void *)info_cmds, array_num, elem_size, compare_mon_cmd); } +static GMainContext *monitor_get_io_context(void) +{ + return iothread_get_g_main_context(mon_global.mon_iothread); +} + +static AioContext *monitor_get_aio_context(void) +{ + return iothread_get_aio_context(mon_global.mon_iothread); +} + +static void monitor_iothread_init(void) +{ + mon_global.mon_iothread = iothread_create("mon_iothread", + &error_abort); + + /* + * This MUST be on main loop thread since we have commands that + * have assumption to be run on main loop thread. It would be + * nice that one day we can remove this assumption in the future. + */ + mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(), + monitor_qmp_bh_dispatcher, + NULL); + + /* + * Unlike the dispatcher BH, this must be run on the monitor IO + * thread, so that monitors that are using IO thread will make + * sure read/write operations are all done on the IO thread. + */ + mon_global.qmp_respond_bh = aio_bh_new(monitor_get_aio_context(), + monitor_qmp_bh_responder, + NULL); +} + +void monitor_init_globals(void) +{ + monitor_init_qmp_commands(); + monitor_qapi_event_init(); + sortcmdlist(); + qemu_mutex_init(&monitor_lock); + monitor_iothread_init(); +} + /* These functions just adapt the readline interface in a typesafe way. We * could cast function pointers but that discards compiler checks. */ @@ -4025,24 +4504,43 @@ void error_vprintf_unless_qmp(const char *fmt, va_list ap) } } -static void __attribute__((constructor)) monitor_lock_init(void) +static void monitor_list_append(Monitor *mon) { - qemu_mutex_init(&monitor_lock); + qemu_mutex_lock(&monitor_lock); + QTAILQ_INSERT_HEAD(&mon_list, mon, entry); + qemu_mutex_unlock(&monitor_lock); } -void monitor_init(Chardev *chr, int flags) +static void monitor_qmp_setup_handlers_bh(void *opaque) { - static int is_first_init = 1; - Monitor *mon; - - if (is_first_init) { - monitor_qapi_event_init(); - sortcmdlist(); - is_first_init = 0; + Monitor *mon = opaque; + GMainContext *context; + + if (mon->use_io_thr) { + /* + * When use_io_thr is set, we use the global shared dedicated + * IO thread for this monitor to handle input/output. + */ + context = monitor_get_io_context(); + /* We should have inited globals before reaching here. */ + assert(context); + } else { + /* The default main loop, which is the main thread */ + context = NULL; } - mon = g_malloc(sizeof(*mon)); - monitor_data_init(mon); + qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, monitor_qmp_read, + monitor_qmp_event, NULL, mon, context, true); + monitor_list_append(mon); +} + +void monitor_init(Chardev *chr, int flags) +{ + Monitor *mon = g_malloc(sizeof(*mon)); + /* Enable IOThread for QMPs that are not using MUX chardev backends. */ + bool use_io_thr = (!CHARDEV_IS_MUX(chr)) && (flags & MONITOR_USE_CONTROL); + + monitor_data_init(mon, false, use_io_thr); qemu_chr_fe_init(&mon->chr, chr, &error_abort); mon->flags = flags; @@ -4055,31 +4553,77 @@ void monitor_init(Chardev *chr, int flags) } if (monitor_is_qmp(mon)) { - qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, monitor_qmp_read, - monitor_qmp_event, NULL, mon, NULL, true); qemu_chr_fe_set_echo(&mon->chr, true); json_message_parser_init(&mon->qmp.parser, handle_qmp_command); + if (mon->use_io_thr) { + /* + * Make sure the old iowatch is gone. It's possible when + * e.g. the chardev is in client mode, with wait=on. + */ + remove_fd_in_watch(chr); + /* + * We can't call qemu_chr_fe_set_handlers() directly here + * since during the procedure the chardev will be active + * and running in monitor iothread, while we'll still do + * something before returning from it, which is a possible + * race too. To avoid that, we just create a BH to setup + * the handlers. + */ + aio_bh_schedule_oneshot(monitor_get_aio_context(), + monitor_qmp_setup_handlers_bh, mon); + /* We'll add this to mon_list in the BH when setup done */ + return; + } else { + qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, + monitor_qmp_read, monitor_qmp_event, + NULL, mon, NULL, true); + } } else { qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, monitor_read, monitor_event, NULL, mon, NULL, true); } - qemu_mutex_lock(&monitor_lock); - QLIST_INSERT_HEAD(&mon_list, mon, entry); - qemu_mutex_unlock(&monitor_lock); + monitor_list_append(mon); } void monitor_cleanup(void) { Monitor *mon, *next; + /* + * We need to explicitly stop the iothread (but not destroy it), + * cleanup the monitor resources, then destroy the iothread since + * we need to unregister from chardev below in + * monitor_data_destroy(), and chardev is not thread-safe yet + */ + iothread_stop(mon_global.mon_iothread); + + /* + * After we have IOThread to send responses, it's possible that + * when we stop the IOThread there are still replies queued in the + * responder queue. Flush all of them. Note that even after this + * flush it's still possible that out buffer is not flushed. + * It'll be done in below monitor_flush() as the last resort. + */ + monitor_qmp_bh_responder(NULL); + qemu_mutex_lock(&monitor_lock); - QLIST_FOREACH_SAFE(mon, &mon_list, entry, next) { - QLIST_REMOVE(mon, entry); + QTAILQ_FOREACH_SAFE(mon, &mon_list, entry, next) { + QTAILQ_REMOVE(&mon_list, mon, entry); + monitor_flush(mon); monitor_data_destroy(mon); g_free(mon); } qemu_mutex_unlock(&monitor_lock); + + /* QEMUBHs needs to be deleted before destroying the IOThread. */ + qemu_bh_delete(mon_global.qmp_dispatcher_bh); + mon_global.qmp_dispatcher_bh = NULL; + qemu_bh_delete(mon_global.qmp_respond_bh); + mon_global.qmp_respond_bh = NULL; + + iothread_destroy(mon_global.mon_iothread); + mon_global.mon_iothread = NULL; } QemuOptsList qemu_mon_opts = { |