From ae255e523c256cf0708f1c16cb946ff96340a800 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 8 Sep 2011 14:28:59 +0200 Subject: nbd: switch to asynchronous operation Signed-off-by: Paolo Bonzini --- block/nbd.c | 188 +++++++++++++++++++++++++++++++++++++++--------------------- nbd.c | 8 +++ 2 files changed, 131 insertions(+), 65 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 95212dac64..bea7acd213 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -47,13 +47,17 @@ #endif typedef struct BDRVNBDState { - CoMutex lock; int sock; uint32_t nbdflags; off_t size; size_t blocksize; char *export_name; /* An NBD server may export several devices */ + CoMutex mutex; + Coroutine *coroutine; + + struct nbd_reply reply; + /* If it begins with '/', this is a UNIX domain socket. Otherwise, * it's a string of the form :port */ @@ -106,6 +110,95 @@ out: return err; } +static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) +{ + qemu_co_mutex_lock(&s->mutex); + s->coroutine = qemu_coroutine_self(); + request->handle = (uint64_t)(intptr_t)s; +} + +static int nbd_have_request(void *opaque) +{ + BDRVNBDState *s = opaque; + + return !!s->coroutine; +} + +static void nbd_reply_ready(void *opaque) +{ + BDRVNBDState *s = opaque; + + if (s->reply.handle == 0) { + /* No reply already in flight. Fetch a header. */ + if (nbd_receive_reply(s->sock, &s->reply) < 0) { + s->reply.handle = 0; + } + } + + /* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. */ + if (s->coroutine) { + qemu_coroutine_enter(s->coroutine, NULL); + } +} + +static void nbd_restart_write(void *opaque) +{ + BDRVNBDState *s = opaque; + qemu_coroutine_enter(s->coroutine, NULL); +} + +static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, + struct iovec *iov, int offset) +{ + int rc, ret; + + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, + nbd_have_request, NULL, s); + rc = nbd_send_request(s->sock, request); + if (rc != -1 && iov) { + ret = qemu_co_sendv(s->sock, iov, request->len, offset); + if (ret != request->len) { + errno = -EIO; + rc = -1; + } + } + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); + return rc; +} + +static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, + struct nbd_reply *reply, + struct iovec *iov, int offset) +{ + int ret; + + /* Wait until we're woken up by the read handler. */ + qemu_coroutine_yield(); + *reply = s->reply; + if (reply->handle != request->handle) { + reply->error = EIO; + } else { + if (iov && reply->error == 0) { + ret = qemu_co_recvv(s->sock, iov, request->len, offset); + if (ret != request->len) { + reply->error = EIO; + } + } + + /* Tell the read handler to read another header. */ + s->reply.handle = 0; + } +} + +static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) +{ + s->coroutine = NULL; + qemu_co_mutex_unlock(&s->mutex); +} + static int nbd_establish_connection(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; @@ -135,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs) return -errno; } - /* Now that we're connected, set the socket to be non-blocking */ + /* Now that we're connected, set the socket to be non-blocking and + * kick the reply mechanism. */ socket_set_nonblock(sock); + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); s->sock = sock; s->size = size; @@ -152,11 +248,11 @@ static void nbd_teardown_connection(BlockDriverState *bs) struct nbd_request request; request.type = NBD_CMD_DISC; - request.handle = (uint64_t)(intptr_t)bs; request.from = 0; request.len = 0; nbd_send_request(s->sock, &request); + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); closesocket(s->sock); } @@ -165,6 +261,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; + qemu_co_mutex_init(&s->mutex); + /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags); if (result != 0) { @@ -176,90 +274,50 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) */ result = nbd_establish_connection(bs); - qemu_co_mutex_init(&s->lock); return result; } -static int nbd_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_READ; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) - return -EIO; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; - return 0; } -static int nbd_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_WRITE; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) - return -EIO; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - return 0; -} - -static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) -{ - int ret; - BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_read(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; -} - -static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) -{ - int ret; - BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_write(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; } static void nbd_close(BlockDriverState *bs) @@ -282,8 +340,8 @@ static BlockDriver bdrv_nbd = { .format_name = "nbd", .instance_size = sizeof(BDRVNBDState), .bdrv_file_open = nbd_open, - .bdrv_read = nbd_co_read, - .bdrv_write = nbd_co_write, + .bdrv_co_readv = nbd_co_readv, + .bdrv_co_writev = nbd_co_writev, .bdrv_close = nbd_close, .bdrv_getlength = nbd_getlength, .protocol_name = "nbd", diff --git a/nbd.c b/nbd.c index de880fe3c6..ff701d3dc8 100644 --- a/nbd.c +++ b/nbd.c @@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; + if (qemu_in_coroutine()) { + if (do_read) { + return qemu_co_recv(fd, buffer, size); + } else { + return qemu_co_send(fd, buffer, size); + } + } + while (offset < size) { ssize_t len; -- cgit v1.2.1