summaryrefslogtreecommitdiff
path: root/monitor.c
diff options
context:
space:
mode:
Diffstat (limited to 'monitor.c')
-rw-r--r--monitor.c706
1 files changed, 625 insertions, 81 deletions
diff --git a/monitor.c b/monitor.c
index 3117a3e689..6ccd2fc089 100644
--- a/monitor.c
+++ b/monitor.c
@@ -35,6 +35,8 @@
#include "net/net.h"
#include "net/slirp.h"
#include "chardev/char-fe.h"
+#include "chardev/char-io.h"
+#include "chardev/char-mux.h"
#include "ui/qemu-spice.h"
#include "sysemu/numa.h"
#include "monitor/monitor.h"
@@ -58,6 +60,7 @@
#include "qapi/qmp/qjson.h"
#include "qapi/qmp/json-streamer.h"
#include "qapi/qmp/json-parser.h"
+#include "qapi/qmp/qlist.h"
#include "qom/object_interfaces.h"
#include "trace-root.h"
#include "trace/control.h"
@@ -79,6 +82,7 @@
#include "qapi/qapi-introspect.h"
#include "sysemu/qtest.h"
#include "sysemu/cpus.h"
+#include "sysemu/iothread.h"
#include "qemu/cutils.h"
#if defined(TARGET_S390X)
@@ -168,6 +172,16 @@ typedef struct {
* mode.
*/
QmpCommandList *commands;
+ bool qmp_caps[QMP_CAPABILITY__MAX];
+ /*
+ * Protects qmp request/response queue. Please take monitor_lock
+ * first when used together.
+ */
+ QemuMutex qmp_queue_lock;
+ /* Input queue that holds all the parsed QMP requests */
+ GQueue *qmp_requests;
+ /* Output queue contains all the QMP responses in order */
+ GQueue *qmp_responses;
} MonitorQMP;
/*
@@ -190,9 +204,11 @@ struct Monitor {
CharBackend chr;
int reset_seen;
int flags;
- int suspend_cnt;
+ int suspend_cnt; /* Needs to be accessed atomically */
bool skip_flush;
+ bool use_io_thr;
+ /* We can't access guest memory when holding the lock */
QemuMutex out_lock;
QString *outbuf;
guint out_watch;
@@ -207,16 +223,25 @@ struct Monitor {
void *password_opaque;
mon_cmd_t *cmd_table;
QLIST_HEAD(,mon_fd_t) fds;
- QLIST_ENTRY(Monitor) entry;
+ QTAILQ_ENTRY(Monitor) entry;
};
+/* Let's add monitor global variables to this struct. */
+static struct {
+ IOThread *mon_iothread;
+ /* Bottom half to dispatch the requests received from IO thread */
+ QEMUBH *qmp_dispatcher_bh;
+ /* Bottom half to deliver the responses back to clients */
+ QEMUBH *qmp_respond_bh;
+} mon_global;
+
/* QMP checker flags */
#define QMP_ACCEPT_UNKNOWNS 1
/* Protects mon_list, monitor_event_state. */
static QemuMutex monitor_lock;
-static QLIST_HEAD(mon_list, Monitor) mon_list;
+static QTAILQ_HEAD(mon_list, Monitor) mon_list;
static QLIST_HEAD(mon_fdsets, MonFdset) mon_fdsets;
static int mon_refcount;
@@ -241,6 +266,21 @@ static inline bool monitor_is_qmp(const Monitor *mon)
}
/**
+ * Whether @mon is using readline? Note: not all HMP monitors use
+ * readline, e.g., gdbserver has a non-interactive HMP monitor, so
+ * readline is not used there.
+ */
+static inline bool monitor_uses_readline(const Monitor *mon)
+{
+ return mon->flags & MONITOR_USE_READLINE;
+}
+
+static inline bool monitor_is_hmp_non_interactive(const Monitor *mon)
+{
+ return !monitor_is_qmp(mon) && !monitor_uses_readline(mon);
+}
+
+/**
* Is the current monitor, if any, a QMP monitor?
*/
bool monitor_cur_is_qmp(void)
@@ -382,7 +422,8 @@ int monitor_fprintf(FILE *stream, const char *fmt, ...)
return 0;
}
-static void monitor_json_emitter(Monitor *mon, const QObject *data)
+static void monitor_json_emitter_raw(Monitor *mon,
+ QObject *data)
{
QString *json;
@@ -396,6 +437,71 @@ static void monitor_json_emitter(Monitor *mon, const QObject *data)
QDECREF(json);
}
+static void monitor_json_emitter(Monitor *mon, QObject *data)
+{
+ if (mon->use_io_thr) {
+ /*
+ * If using IO thread, we need to queue the item so that IO
+ * thread will do the rest for us. Take refcount so that
+ * caller won't free the data (which will be finally freed in
+ * responder thread).
+ */
+ qobject_incref(data);
+ qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+ g_queue_push_tail(mon->qmp.qmp_responses, (void *)data);
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+ qemu_bh_schedule(mon_global.qmp_respond_bh);
+ } else {
+ /*
+ * If not using monitor IO thread, then we are in main thread.
+ * Do the emission right away.
+ */
+ monitor_json_emitter_raw(mon, data);
+ }
+}
+
+struct QMPResponse {
+ Monitor *mon;
+ QObject *data;
+};
+typedef struct QMPResponse QMPResponse;
+
+/*
+ * Return one QMPResponse. The response is only valid if
+ * response.data is not NULL.
+ */
+static QMPResponse monitor_qmp_response_pop_one(void)
+{
+ Monitor *mon;
+ QObject *data = NULL;
+
+ qemu_mutex_lock(&monitor_lock);
+ QTAILQ_FOREACH(mon, &mon_list, entry) {
+ qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+ data = g_queue_pop_head(mon->qmp.qmp_responses);
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+ if (data) {
+ break;
+ }
+ }
+ qemu_mutex_unlock(&monitor_lock);
+ return (QMPResponse) { .mon = mon, .data = data };
+}
+
+static void monitor_qmp_bh_responder(void *opaque)
+{
+ QMPResponse response;
+
+ while (true) {
+ response = monitor_qmp_response_pop_one();
+ if (!response.data) {
+ break;
+ }
+ monitor_json_emitter_raw(response.mon, response.data);
+ qobject_decref(response.data);
+ }
+}
+
static MonitorQAPIEventConf monitor_qapi_event_conf[QAPI_EVENT__MAX] = {
/* Limit guest-triggerable events to 1 per second */
[QAPI_EVENT_RTC_CHANGE] = { 1000 * SCALE_MS },
@@ -417,7 +523,7 @@ static void monitor_qapi_event_emit(QAPIEvent event, QDict *qdict)
Monitor *mon;
trace_monitor_protocol_event_emit(event, qdict);
- QLIST_FOREACH(mon, &mon_list, entry) {
+ QTAILQ_FOREACH(mon, &mon_list, entry) {
if (monitor_is_qmp(mon)
&& mon->qmp.commands != &qmp_cap_negotiation_commands) {
monitor_json_emitter(mon, QOBJECT(qdict));
@@ -447,7 +553,7 @@ monitor_qapi_event_queue(QAPIEvent event, QDict *qdict, Error **errp)
/* Unthrottled event */
monitor_qapi_event_emit(event, qdict);
} else {
- QDict *data = qobject_to_qdict(qdict_get(qdict, "data"));
+ QDict *data = qobject_to(QDict, qdict_get(qdict, "data"));
MonitorQAPIEventState key = { .event = event, .data = data };
evstate = g_hash_table_lookup(monitor_qapi_event_state, &key);
@@ -570,13 +676,19 @@ static void monitor_qapi_event_init(void)
static void handle_hmp_command(Monitor *mon, const char *cmdline);
-static void monitor_data_init(Monitor *mon)
+static void monitor_data_init(Monitor *mon, bool skip_flush,
+ bool use_io_thr)
{
memset(mon, 0, sizeof(Monitor));
qemu_mutex_init(&mon->out_lock);
+ qemu_mutex_init(&mon->qmp.qmp_queue_lock);
mon->outbuf = qstring_new();
/* Use *mon_cmds by default. */
mon->cmd_table = mon_cmds;
+ mon->skip_flush = skip_flush;
+ mon->use_io_thr = use_io_thr;
+ mon->qmp.qmp_requests = g_queue_new();
+ mon->qmp.qmp_responses = g_queue_new();
}
static void monitor_data_destroy(Monitor *mon)
@@ -589,6 +701,9 @@ static void monitor_data_destroy(Monitor *mon)
readline_free(mon->rs);
QDECREF(mon->outbuf);
qemu_mutex_destroy(&mon->out_lock);
+ qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
+ g_queue_free(mon->qmp.qmp_requests);
+ g_queue_free(mon->qmp.qmp_responses);
}
char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
@@ -597,8 +712,7 @@ char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
char *output = NULL;
Monitor *old_mon, hmp;
- monitor_data_init(&hmp);
- hmp.skip_flush = true;
+ monitor_data_init(&hmp, true, false);
old_mon = cur_mon;
cur_mon = &hmp;
@@ -956,7 +1070,7 @@ EventInfoList *qmp_query_events(Error **errp)
static void qmp_query_qmp_schema(QDict *qdict, QObject **ret_data,
Error **errp)
{
- *ret_data = qobject_from_json(qmp_schema_json, &error_abort);
+ *ret_data = qobject_from_qlit(&qmp_schema_qlit);
}
/*
@@ -1006,7 +1120,7 @@ static void qmp_unregister_commands_hack(void)
#endif
}
-void monitor_init_qmp_commands(void)
+static void monitor_init_qmp_commands(void)
{
/*
* Two command lists:
@@ -1032,8 +1146,90 @@ void monitor_init_qmp_commands(void)
qmp_marshal_qmp_capabilities, QCO_NO_OPTIONS);
}
-void qmp_qmp_capabilities(Error **errp)
+static bool qmp_cap_enabled(Monitor *mon, QMPCapability cap)
+{
+ return mon->qmp.qmp_caps[cap];
+}
+
+static bool qmp_oob_enabled(Monitor *mon)
+{
+ return qmp_cap_enabled(mon, QMP_CAPABILITY_OOB);
+}
+
+static void qmp_caps_check(Monitor *mon, QMPCapabilityList *list,
+ Error **errp)
+{
+ for (; list; list = list->next) {
+ assert(list->value < QMP_CAPABILITY__MAX);
+ switch (list->value) {
+ case QMP_CAPABILITY_OOB:
+ if (!mon->use_io_thr) {
+ /*
+ * Out-Of-Band only works with monitors that are
+ * running on dedicated IOThread.
+ */
+ error_setg(errp, "This monitor does not support "
+ "Out-Of-Band (OOB)");
+ return;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+/* This function should only be called after capabilities are checked. */
+static void qmp_caps_apply(Monitor *mon, QMPCapabilityList *list)
+{
+ for (; list; list = list->next) {
+ mon->qmp.qmp_caps[list->value] = true;
+ }
+}
+
+/*
+ * Return true if check successful, or false otherwise. When false is
+ * returned, detailed error will be in errp if provided.
+ */
+static bool qmp_cmd_oob_check(Monitor *mon, QDict *req, Error **errp)
+{
+ const char *command;
+ QmpCommand *cmd;
+
+ command = qdict_get_try_str(req, "execute");
+ if (!command) {
+ error_setg(errp, "Command field 'execute' missing");
+ return false;
+ }
+
+ cmd = qmp_find_command(mon->qmp.commands, command);
+ if (!cmd) {
+ error_set(errp, ERROR_CLASS_COMMAND_NOT_FOUND,
+ "The command %s has not been found", command);
+ return false;
+ }
+
+ if (qmp_is_oob(req)) {
+ if (!qmp_oob_enabled(mon)) {
+ error_setg(errp, "Please enable Out-Of-Band first "
+ "for the session during capabilities negotiation");
+ return false;
+ }
+ if (!(cmd->options & QCO_ALLOW_OOB)) {
+ error_setg(errp, "The command %s does not support OOB",
+ command);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void qmp_qmp_capabilities(bool has_enable, QMPCapabilityList *enable,
+ Error **errp)
{
+ Error *local_err = NULL;
+
if (cur_mon->qmp.commands == &qmp_commands) {
error_set(errp, ERROR_CLASS_COMMAND_NOT_FOUND,
"Capabilities negotiation is already complete, command "
@@ -1041,6 +1237,21 @@ void qmp_qmp_capabilities(Error **errp)
return;
}
+ /* Enable QMP capabilities provided by the client if applicable. */
+ if (has_enable) {
+ qmp_caps_check(cur_mon, enable, &local_err);
+ if (local_err) {
+ /*
+ * Failed check on any of the capabilities will fail the
+ * entire command (and thus not apply any of the other
+ * capabilities that were also requested).
+ */
+ error_propagate(errp, local_err);
+ return;
+ }
+ qmp_caps_apply(cur_mon, enable);
+ }
+
cur_mon->qmp.commands = &qmp_commands;
}
@@ -3758,31 +3969,74 @@ static int monitor_can_read(void *opaque)
{
Monitor *mon = opaque;
- return (mon->suspend_cnt == 0) ? 1 : 0;
+ return !atomic_mb_read(&mon->suspend_cnt);
}
-static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
+/*
+ * 1. This function takes ownership of rsp, err, and id.
+ * 2. rsp, err, and id may be NULL.
+ * 3. If err != NULL then rsp must be NULL.
+ */
+static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
+ Error *err, QObject *id)
{
- QObject *req, *rsp = NULL, *id = NULL;
QDict *qdict = NULL;
- Monitor *mon = cur_mon;
- Error *err = NULL;
- req = json_parser_parse_err(tokens, NULL, &err);
- if (!req && !err) {
- /* json_parser_parse_err() sucks: can fail without setting @err */
- error_setg(&err, QERR_JSON_PARSING);
- }
if (err) {
- goto err_out;
+ assert(!rsp);
+ qdict = qdict_new();
+ qdict_put_obj(qdict, "error", qmp_build_error_object(err));
+ error_free(err);
+ rsp = QOBJECT(qdict);
+ }
+
+ if (rsp) {
+ if (id) {
+ /* This is for the qdict below. */
+ qobject_incref(id);
+ qdict_put_obj(qobject_to(QDict, rsp), "id", id);
+ }
+
+ monitor_json_emitter(mon, rsp);
}
- qdict = qobject_to_qdict(req);
- if (qdict) {
- id = qdict_get(qdict, "id");
- qobject_incref(id);
- qdict_del(qdict, "id");
- } /* else will fail qmp_dispatch() */
+ qobject_decref(id);
+ qobject_decref(rsp);
+}
+
+struct QMPRequest {
+ /* Owner of the request */
+ Monitor *mon;
+ /* "id" field of the request */
+ QObject *id;
+ /* Request object to be handled */
+ QObject *req;
+ /*
+ * Whether we need to resume the monitor afterward. This flag is
+ * used to emulate the old QMP server behavior that the current
+ * command must be completed before execution of the next one.
+ */
+ bool need_resume;
+};
+typedef struct QMPRequest QMPRequest;
+
+/*
+ * Dispatch one single QMP request. The function will free the req_obj
+ * and objects inside it before return.
+ */
+static void monitor_qmp_dispatch_one(QMPRequest *req_obj)
+{
+ Monitor *mon, *old_mon;
+ QObject *req, *rsp = NULL, *id;
+ QDict *qdict = NULL;
+ bool need_resume;
+
+ req = req_obj->req;
+ mon = req_obj->mon;
+ id = req_obj->id;
+ need_resume = req_obj->need_resume;
+
+ g_free(req_obj);
if (trace_event_get_state_backends(TRACE_HANDLE_QMP_COMMAND)) {
QString *req_json = qobject_to_json(req);
@@ -3790,10 +4044,15 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
QDECREF(req_json);
}
- rsp = qmp_dispatch(cur_mon->qmp.commands, req);
+ old_mon = cur_mon;
+ cur_mon = mon;
+
+ rsp = qmp_dispatch(mon->qmp.commands, req);
+
+ cur_mon = old_mon;
if (mon->qmp.commands == &qmp_cap_negotiation_commands) {
- qdict = qdict_get_qdict(qobject_to_qdict(rsp), "error");
+ qdict = qdict_get_qdict(qobject_to(QDict, rsp), "error");
if (qdict
&& !g_strcmp0(qdict_get_try_str(qdict, "class"),
QapiErrorClass_str(ERROR_CLASS_COMMAND_NOT_FOUND))) {
@@ -3804,37 +4063,171 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
}
}
-err_out:
- if (err) {
- qdict = qdict_new();
- qdict_put_obj(qdict, "error", qmp_build_error_object(err));
- error_free(err);
- rsp = QOBJECT(qdict);
+ /* Respond if necessary */
+ monitor_qmp_respond(mon, rsp, NULL, id);
+
+ /* This pairs with the monitor_suspend() in handle_qmp_command(). */
+ if (need_resume) {
+ monitor_resume(mon);
}
- if (rsp) {
- if (id) {
- qdict_put_obj(qobject_to_qdict(rsp), "id", id);
- id = NULL;
+ qobject_decref(req);
+}
+
+/*
+ * Pop one QMP request from monitor queues, return NULL if not found.
+ * We are using round-robin fashion to pop the request, to avoid
+ * processing commands only on a very busy monitor. To achieve that,
+ * when we process one request on a specific monitor, we put that
+ * monitor to the end of mon_list queue.
+ */
+static QMPRequest *monitor_qmp_requests_pop_one(void)
+{
+ QMPRequest *req_obj = NULL;
+ Monitor *mon;
+
+ qemu_mutex_lock(&monitor_lock);
+
+ QTAILQ_FOREACH(mon, &mon_list, entry) {
+ qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+ req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+ if (req_obj) {
+ break;
}
+ }
- monitor_json_emitter(mon, rsp);
+ if (req_obj) {
+ /*
+ * We found one request on the monitor. Degrade this monitor's
+ * priority to lowest by re-inserting it to end of queue.
+ */
+ QTAILQ_REMOVE(&mon_list, mon, entry);
+ QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
}
- qobject_decref(id);
- qobject_decref(rsp);
- qobject_decref(req);
+ qemu_mutex_unlock(&monitor_lock);
+
+ return req_obj;
}
-static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
+static void monitor_qmp_bh_dispatcher(void *data)
{
- Monitor *old_mon = cur_mon;
+ QMPRequest *req_obj = monitor_qmp_requests_pop_one();
- cur_mon = opaque;
+ if (req_obj) {
+ trace_monitor_qmp_cmd_in_band(qobject_get_try_str(req_obj->id) ?: "");
+ monitor_qmp_dispatch_one(req_obj);
+ /* Reschedule instead of looping so the main loop stays responsive */
+ qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
+ }
+}
- json_message_parser_feed(&cur_mon->qmp.parser, (const char *) buf, size);
+#define QMP_REQ_QUEUE_LEN_MAX (8)
- cur_mon = old_mon;
+static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
+{
+ QObject *req, *id = NULL;
+ QDict *qdict = NULL;
+ MonitorQMP *mon_qmp = container_of(parser, MonitorQMP, parser);
+ Monitor *mon = container_of(mon_qmp, Monitor, qmp);
+ Error *err = NULL;
+ QMPRequest *req_obj;
+
+ req = json_parser_parse_err(tokens, NULL, &err);
+ if (!req && !err) {
+ /* json_parser_parse_err() sucks: can fail without setting @err */
+ error_setg(&err, QERR_JSON_PARSING);
+ }
+ if (err) {
+ goto err;
+ }
+
+ /* Check against the request in general layout */
+ qdict = qmp_dispatch_check_obj(req, &err);
+ if (!qdict) {
+ goto err;
+ }
+
+ /* Check against OOB specific */
+ if (!qmp_cmd_oob_check(mon, qdict, &err)) {
+ goto err;
+ }
+
+ id = qdict_get(qdict, "id");
+
+ /* When OOB is enabled, the "id" field is mandatory. */
+ if (qmp_oob_enabled(mon) && !id) {
+ error_setg(&err, "Out-Of-Band capability requires that "
+ "every command contains an 'id' field");
+ goto err;
+ }
+
+ qobject_incref(id);
+ qdict_del(qdict, "id");
+
+ req_obj = g_new0(QMPRequest, 1);
+ req_obj->mon = mon;
+ req_obj->id = id;
+ req_obj->req = req;
+ req_obj->need_resume = false;
+
+ if (qmp_is_oob(qdict)) {
+ /* Out-Of-Band (OOB) requests are executed directly in parser. */
+ trace_monitor_qmp_cmd_out_of_band(qobject_get_try_str(req_obj->id)
+ ?: "");
+ monitor_qmp_dispatch_one(req_obj);
+ return;
+ }
+
+ /* Protect qmp_requests and fetching its length. */
+ qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+
+ /*
+ * If OOB is not enabled on the current monitor, we'll emulate the
+ * old behavior that we won't process the current monitor any more
+ * until it has responded. This helps make sure that as long as
+ * OOB is not enabled, the server will never drop any command.
+ */
+ if (!qmp_oob_enabled(mon)) {
+ monitor_suspend(mon);
+ req_obj->need_resume = true;
+ } else {
+ /* Drop the request if queue is full. */
+ if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX) {
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+ qapi_event_send_command_dropped(id,
+ COMMAND_DROP_REASON_QUEUE_FULL,
+ &error_abort);
+ qobject_decref(id);
+ qobject_decref(req);
+ g_free(req_obj);
+ return;
+ }
+ }
+
+ /*
+ * Put the request to the end of queue so that requests will be
+ * handled in time order. Ownership for req_obj, req, id,
+ * etc. will be delivered to the handler side.
+ */
+ g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+
+ /* Kick the dispatcher routine */
+ qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
+ return;
+
+err:
+ monitor_qmp_respond(mon, NULL, err, NULL);
+ qobject_decref(req);
+}
+
+static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
+{
+ Monitor *mon = opaque;
+
+ json_message_parser_feed(&mon->qmp.parser, (const char *) buf, size);
}
static void monitor_read(void *opaque, const uint8_t *buf, int size)
@@ -3869,28 +4262,70 @@ static void monitor_command_cb(void *opaque, const char *cmdline,
int monitor_suspend(Monitor *mon)
{
- if (!mon->rs)
+ if (monitor_is_hmp_non_interactive(mon)) {
return -ENOTTY;
- mon->suspend_cnt++;
+ }
+
+ atomic_inc(&mon->suspend_cnt);
+
+ if (monitor_is_qmp(mon)) {
+ /*
+ * Kick iothread to make sure this takes effect. It'll be
+ * evaluated again in prepare() of the watch object.
+ */
+ aio_notify(iothread_get_aio_context(mon_global.mon_iothread));
+ }
+
+ trace_monitor_suspend(mon, 1);
return 0;
}
void monitor_resume(Monitor *mon)
{
- if (!mon->rs)
+ if (monitor_is_hmp_non_interactive(mon)) {
return;
- if (--mon->suspend_cnt == 0)
- readline_show_prompt(mon->rs);
+ }
+
+ if (atomic_dec_fetch(&mon->suspend_cnt) == 0) {
+ if (monitor_is_qmp(mon)) {
+ /*
+ * For QMP monitors that are running in IOThread, let's
+ * kick the thread in case it's sleeping.
+ */
+ if (mon->use_io_thr) {
+ aio_notify(iothread_get_aio_context(mon_global.mon_iothread));
+ }
+ } else {
+ assert(mon->rs);
+ readline_show_prompt(mon->rs);
+ }
+ }
+ trace_monitor_suspend(mon, -1);
}
-static QObject *get_qmp_greeting(void)
+static QObject *get_qmp_greeting(Monitor *mon)
{
+ QList *cap_list = qlist_new();
QObject *ver = NULL;
+ QMPCapability cap;
qmp_marshal_query_version(NULL, &ver, NULL);
- return qobject_from_jsonf("{'QMP': {'version': %p, 'capabilities': []}}",
- ver);
+ for (cap = 0; cap < QMP_CAPABILITY__MAX; cap++) {
+ if (!mon->use_io_thr && cap == QMP_CAPABILITY_OOB) {
+ /* Monitors that are not using IOThread won't support OOB */
+ continue;
+ }
+ qlist_append(cap_list, qstring_from_str(QMPCapability_str(cap)));
+ }
+
+ return qobject_from_jsonf("{'QMP': {'version': %p, 'capabilities': %p}}",
+ ver, cap_list);
+}
+
+static void monitor_qmp_caps_reset(Monitor *mon)
+{
+ memset(mon->qmp.qmp_caps, 0, sizeof(mon->qmp.qmp_caps));
}
static void monitor_qmp_event(void *opaque, int event)
@@ -3901,7 +4336,8 @@ static void monitor_qmp_event(void *opaque, int event)
switch (event) {
case CHR_EVENT_OPENED:
mon->qmp.commands = &qmp_cap_negotiation_commands;
- data = get_qmp_greeting();
+ monitor_qmp_caps_reset(mon);
+ data = get_qmp_greeting(mon);
monitor_json_emitter(mon, data);
qobject_decref(data);
mon_refcount++;
@@ -3929,19 +4365,19 @@ static void monitor_event(void *opaque, int event)
monitor_resume(mon);
monitor_flush(mon);
} else {
- mon->suspend_cnt = 0;
+ atomic_mb_set(&mon->suspend_cnt, 0);
}
break;
case CHR_EVENT_MUX_OUT:
if (mon->reset_seen) {
- if (mon->suspend_cnt == 0) {
+ if (atomic_mb_read(&mon->suspend_cnt) == 0) {
monitor_printf(mon, "\n");
}
monitor_flush(mon);
monitor_suspend(mon);
} else {
- mon->suspend_cnt++;
+ atomic_inc(&mon->suspend_cnt);
}
qemu_mutex_lock(&mon->out_lock);
mon->mux_out = 1;
@@ -3985,6 +4421,49 @@ static void sortcmdlist(void)
qsort((void *)info_cmds, array_num, elem_size, compare_mon_cmd);
}
+static GMainContext *monitor_get_io_context(void)
+{
+ return iothread_get_g_main_context(mon_global.mon_iothread);
+}
+
+static AioContext *monitor_get_aio_context(void)
+{
+ return iothread_get_aio_context(mon_global.mon_iothread);
+}
+
+static void monitor_iothread_init(void)
+{
+ mon_global.mon_iothread = iothread_create("mon_iothread",
+ &error_abort);
+
+ /*
+ * This MUST be on main loop thread since we have commands that
+ * have assumption to be run on main loop thread. It would be
+ * nice that one day we can remove this assumption in the future.
+ */
+ mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(),
+ monitor_qmp_bh_dispatcher,
+ NULL);
+
+ /*
+ * Unlike the dispatcher BH, this must be run on the monitor IO
+ * thread, so that monitors that are using IO thread will make
+ * sure read/write operations are all done on the IO thread.
+ */
+ mon_global.qmp_respond_bh = aio_bh_new(monitor_get_aio_context(),
+ monitor_qmp_bh_responder,
+ NULL);
+}
+
+void monitor_init_globals(void)
+{
+ monitor_init_qmp_commands();
+ monitor_qapi_event_init();
+ sortcmdlist();
+ qemu_mutex_init(&monitor_lock);
+ monitor_iothread_init();
+}
+
/* These functions just adapt the readline interface in a typesafe way. We
* could cast function pointers but that discards compiler checks.
*/
@@ -4025,24 +4504,43 @@ void error_vprintf_unless_qmp(const char *fmt, va_list ap)
}
}
-static void __attribute__((constructor)) monitor_lock_init(void)
+static void monitor_list_append(Monitor *mon)
{
- qemu_mutex_init(&monitor_lock);
+ qemu_mutex_lock(&monitor_lock);
+ QTAILQ_INSERT_HEAD(&mon_list, mon, entry);
+ qemu_mutex_unlock(&monitor_lock);
}
-void monitor_init(Chardev *chr, int flags)
+static void monitor_qmp_setup_handlers_bh(void *opaque)
{
- static int is_first_init = 1;
- Monitor *mon;
-
- if (is_first_init) {
- monitor_qapi_event_init();
- sortcmdlist();
- is_first_init = 0;
+ Monitor *mon = opaque;
+ GMainContext *context;
+
+ if (mon->use_io_thr) {
+ /*
+ * When use_io_thr is set, we use the global shared dedicated
+ * IO thread for this monitor to handle input/output.
+ */
+ context = monitor_get_io_context();
+ /* We should have inited globals before reaching here. */
+ assert(context);
+ } else {
+ /* The default main loop, which is the main thread */
+ context = NULL;
}
- mon = g_malloc(sizeof(*mon));
- monitor_data_init(mon);
+ qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, monitor_qmp_read,
+ monitor_qmp_event, NULL, mon, context, true);
+ monitor_list_append(mon);
+}
+
+void monitor_init(Chardev *chr, int flags)
+{
+ Monitor *mon = g_malloc(sizeof(*mon));
+ /* Enable IOThread for QMPs that are not using MUX chardev backends. */
+ bool use_io_thr = (!CHARDEV_IS_MUX(chr)) && (flags & MONITOR_USE_CONTROL);
+
+ monitor_data_init(mon, false, use_io_thr);
qemu_chr_fe_init(&mon->chr, chr, &error_abort);
mon->flags = flags;
@@ -4055,31 +4553,77 @@ void monitor_init(Chardev *chr, int flags)
}
if (monitor_is_qmp(mon)) {
- qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, monitor_qmp_read,
- monitor_qmp_event, NULL, mon, NULL, true);
qemu_chr_fe_set_echo(&mon->chr, true);
json_message_parser_init(&mon->qmp.parser, handle_qmp_command);
+ if (mon->use_io_thr) {
+ /*
+ * Make sure the old iowatch is gone. It's possible when
+ * e.g. the chardev is in client mode, with wait=on.
+ */
+ remove_fd_in_watch(chr);
+ /*
+ * We can't call qemu_chr_fe_set_handlers() directly here
+ * since during the procedure the chardev will be active
+ * and running in monitor iothread, while we'll still do
+ * something before returning from it, which is a possible
+ * race too. To avoid that, we just create a BH to setup
+ * the handlers.
+ */
+ aio_bh_schedule_oneshot(monitor_get_aio_context(),
+ monitor_qmp_setup_handlers_bh, mon);
+ /* We'll add this to mon_list in the BH when setup done */
+ return;
+ } else {
+ qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read,
+ monitor_qmp_read, monitor_qmp_event,
+ NULL, mon, NULL, true);
+ }
} else {
qemu_chr_fe_set_handlers(&mon->chr, monitor_can_read, monitor_read,
monitor_event, NULL, mon, NULL, true);
}
- qemu_mutex_lock(&monitor_lock);
- QLIST_INSERT_HEAD(&mon_list, mon, entry);
- qemu_mutex_unlock(&monitor_lock);
+ monitor_list_append(mon);
}
void monitor_cleanup(void)
{
Monitor *mon, *next;
+ /*
+ * We need to explicitly stop the iothread (but not destroy it),
+ * cleanup the monitor resources, then destroy the iothread since
+ * we need to unregister from chardev below in
+ * monitor_data_destroy(), and chardev is not thread-safe yet
+ */
+ iothread_stop(mon_global.mon_iothread);
+
+ /*
+ * After we have IOThread to send responses, it's possible that
+ * when we stop the IOThread there are still replies queued in the
+ * responder queue. Flush all of them. Note that even after this
+ * flush it's still possible that out buffer is not flushed.
+ * It'll be done in below monitor_flush() as the last resort.
+ */
+ monitor_qmp_bh_responder(NULL);
+
qemu_mutex_lock(&monitor_lock);
- QLIST_FOREACH_SAFE(mon, &mon_list, entry, next) {
- QLIST_REMOVE(mon, entry);
+ QTAILQ_FOREACH_SAFE(mon, &mon_list, entry, next) {
+ QTAILQ_REMOVE(&mon_list, mon, entry);
+ monitor_flush(mon);
monitor_data_destroy(mon);
g_free(mon);
}
qemu_mutex_unlock(&monitor_lock);
+
+ /* QEMUBHs needs to be deleted before destroying the IOThread. */
+ qemu_bh_delete(mon_global.qmp_dispatcher_bh);
+ mon_global.qmp_dispatcher_bh = NULL;
+ qemu_bh_delete(mon_global.qmp_respond_bh);
+ mon_global.qmp_respond_bh = NULL;
+
+ iothread_destroy(mon_global.mon_iothread);
+ mon_global.mon_iothread = NULL;
}
QemuOptsList qemu_mon_opts = {