This patch adds a new export option for storage-export-daemon to enable
FUSE-over-io_uring via the switch io-uring=on|off (disableby default).
It also implements the protocol handshake with the Linux kernel
during the FUSE-over-io_uring initialization phase.

See: https://docs.kernel.org/filesystems/fuse-io-uring.html

The kernel documentation describes in detail how FUSE-over-io_uring
works. This patch implements the Initial SQE stage shown in thediagram:
it initializes one queue per IOThread, each currently supporting a
single submission queue entry (SQE). When the FUSE driver sends the
first FUSE request (FUSE_INIT), storage-export-daemon calls
fuse_uring_start() to complete initialization, ultimately submitting
the SQE with the FUSE_IO_URING_CMD_REGISTER command to confirm
successful initialization with the kernel.

We also added support for multiple IOThreads. The current Linux kernel
requires registering $(nproc) queues when setting up FUSE-over-io_uring
To let users customize the number of FUSE Queues (i.e., IOThreads),
we first create nproc Ring Queues as required by the kernel, then
distribute them in a round-robin manner to the FUSE Queues for
registration. In addition, to support multiple in-flight requests,
we configure each Ring Queue with FUSE_DEFAULT_RING_QUEUE_DEPTH
entries/requests.

Suggested-by: Kevin Wolf <[email protected]>
Suggested-by: Stefan Hajnoczi <[email protected]>
Signed-off-by: Brian Song <[email protected]>
---
 block/export/fuse.c                  | 310 +++++++++++++++++++++++++--
 docs/tools/qemu-storage-daemon.rst   |  11 +-
 qapi/block-export.json               |   5 +-
 storage-daemon/qemu-storage-daemon.c |   1 +
 util/fdmon-io_uring.c                |   5 +-
 5 files changed, 309 insertions(+), 23 deletions(-)

diff --git a/block/export/fuse.c b/block/export/fuse.c
index c0ad4696ce..19bf9e5f74 100644
--- a/block/export/fuse.c
+++ b/block/export/fuse.c
@@ -48,6 +48,9 @@
 #include <linux/fs.h>
 #endif
 
+/* room needed in buffer to accommodate header */
+#define FUSE_BUFFER_HEADER_SIZE 0x1000
+
 /* Prevent overly long bounce buffer allocations */
 #define FUSE_MAX_READ_BYTES (MIN(BDRV_REQUEST_MAX_BYTES, 1 * 1024 * 1024))
 /*
@@ -63,12 +66,59 @@
     (FUSE_MAX_WRITE_BYTES - FUSE_IN_PLACE_WRITE_BYTES)
 
 typedef struct FuseExport FuseExport;
+typedef struct FuseQueue FuseQueue;
+
+#ifdef CONFIG_LINUX_IO_URING
+#define FUSE_DEFAULT_RING_QUEUE_DEPTH 64
+#define FUSE_DEFAULT_MAX_PAGES_PER_REQ 32
+
+typedef struct FuseRingQueue FuseRingQueue;
+typedef struct FuseRingEnt {
+    /* back pointer */
+    FuseRingQueue *rq;
+
+    /* commit id of a fuse request */
+    uint64_t req_commit_id;
+
+    /* fuse request header and payload */
+    struct fuse_uring_req_header req_header;
+    void *op_payload;
+    size_t req_payload_sz;
+
+    /* The vector passed to the kernel */
+    struct iovec iov[2];
+
+    CqeHandler fuse_cqe_handler;
+} FuseRingEnt;
+
+struct FuseRingQueue {
+    int rqid;
+
+    /* back pointer */
+    FuseQueue *q;
+    FuseRingEnt *ent;
+
+    /* List entry for ring_queues */
+    QLIST_ENTRY(FuseRingQueue) next;
+};
+
+/*
+ * Round-robin distribution of ring queues across FUSE queues.
+ * This structure manages the mapping between kernel ring queues and user
+ * FUSE queues.
+ */
+typedef struct FuseRingQueueManager {
+    FuseRingQueue *ring_queues;
+    int num_ring_queues;
+    int num_fuse_queues;
+} FuseRingQueueManager;
+#endif
 
 /*
  * One FUSE "queue", representing one FUSE FD from which requests are fetched
  * and processed.  Each queue is tied to an AioContext.
  */
-typedef struct FuseQueue {
+struct FuseQueue {
     FuseExport *exp;
 
     AioContext *ctx;
@@ -109,15 +159,11 @@ typedef struct FuseQueue {
      * Free this buffer with qemu_vfree().
      */
     void *spillover_buf;
-} FuseQueue;
 
-/*
- * Verify that FuseQueue.request_buf plus the spill-over buffer together
- * are big enough to be accepted by the FUSE kernel driver.
- */
-QEMU_BUILD_BUG_ON(sizeof(((FuseQueue *)0)->request_buf) +
-                  FUSE_SPILLOVER_BUF_SIZE <
-                  FUSE_MIN_READ_BUFFER);
+#ifdef CONFIG_LINUX_IO_URING
+    QLIST_HEAD(, FuseRingQueue) ring_queue_list;
+#endif
+};
 
 struct FuseExport {
     BlockExport common;
@@ -133,7 +179,7 @@ struct FuseExport {
      */
     bool halted;
 
-    int num_queues;
+    size_t num_queues;
     FuseQueue *queues;
     /*
      * True if this export should follow the generic export's AioContext.
@@ -149,6 +195,12 @@ struct FuseExport {
     /* Whether allow_other was used as a mount option or not */
     bool allow_other;
 
+#ifdef CONFIG_LINUX_IO_URING
+    bool is_uring;
+    size_t ring_queue_depth;
+    FuseRingQueueManager *ring_queue_manager;
+#endif
+
     mode_t st_mode;
     uid_t st_uid;
     gid_t st_gid;
@@ -205,7 +257,7 @@ static void fuse_attach_handlers(FuseExport *exp)
         return;
     }
 
-    for (int i = 0; i < exp->num_queues; i++) {
+    for (size_t i = 0; i < exp->num_queues; i++) {
         aio_set_fd_handler(exp->queues[i].ctx, exp->queues[i].fuse_fd,
                            read_from_fuse_fd, NULL, NULL, NULL,
                            &exp->queues[i]);
@@ -257,6 +309,189 @@ static const BlockDevOps fuse_export_blk_dev_ops = {
     .drained_poll  = fuse_export_drained_poll,
 };
 
+#ifdef CONFIG_LINUX_IO_URING
+static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req,
+                    const unsigned int rqid,
+                    const unsigned int commit_id)
+{
+    req->qid = rqid;
+    req->commit_id = commit_id;
+    req->flags = 0;
+}
+
+static void fuse_uring_sqe_prepare(struct io_uring_sqe *sqe, FuseQueue *q,
+               __u32 cmd_op)
+{
+    sqe->opcode = IORING_OP_URING_CMD;
+
+    sqe->fd = q->fuse_fd;
+    sqe->rw_flags = 0;
+    sqe->ioprio = 0;
+    sqe->off = 0;
+
+    sqe->cmd_op = cmd_op;
+    sqe->__pad1 = 0;
+}
+
+static void fuse_uring_prep_sqe_register(struct io_uring_sqe *sqe, void 
*opaque)
+{
+    FuseRingEnt *ent = opaque;
+    struct fuse_uring_cmd_req *req = (void *)&sqe->cmd[0];
+
+    fuse_uring_sqe_prepare(sqe, ent->rq->q, FUSE_IO_URING_CMD_REGISTER);
+
+    sqe->addr = (uint64_t)(ent->iov);
+    sqe->len = 2;
+
+    fuse_uring_sqe_set_req_data(req, ent->rq->rqid, 0);
+}
+
+static void fuse_uring_submit_register(void *opaque)
+{
+    FuseRingEnt *ent = opaque;
+    FuseExport *exp = ent->rq->q->exp;
+
+
+    aio_add_sqe(fuse_uring_prep_sqe_register, ent, &(ent->fuse_cqe_handler));
+}
+
+/**
+ * Distribute ring queues across FUSE queues using round-robin algorithm.
+ * This ensures even distribution of kernel ring queues across user-specified
+ * FUSE queues.
+ */
+static
+FuseRingQueueManager *fuse_ring_queue_manager_create(int num_fuse_queues,
+                                                    size_t ring_queue_depth,
+                                                    size_t bufsize)
+{
+    int num_ring_queues = get_nprocs();
+    FuseRingQueueManager *manager = g_new(FuseRingQueueManager, 1);
+
+    if (!manager) {
+        return NULL;
+    }
+
+    manager->ring_queues = g_new(FuseRingQueue, num_ring_queues);
+    manager->num_ring_queues = num_ring_queues;
+    manager->num_fuse_queues = num_fuse_queues;
+
+    if (!manager->ring_queues) {
+        g_free(manager);
+        return NULL;
+    }
+
+    for (int i = 0; i < num_ring_queues; i++) {
+        FuseRingQueue *rq = &manager->ring_queues[i];
+        rq->rqid = i;
+        rq->ent = g_new(FuseRingEnt, ring_queue_depth);
+
+        if (!rq->ent) {
+            for (int j = 0; j < i; j++) {
+                g_free(manager->ring_queues[j].ent);
+            }
+            g_free(manager->ring_queues);
+            g_free(manager);
+            return NULL;
+        }
+
+        for (size_t j = 0; j < ring_queue_depth; j++) {
+            FuseRingEnt *ent = &rq->ent[j];
+            ent->rq = rq;
+            ent->req_payload_sz = bufsize - FUSE_BUFFER_HEADER_SIZE;
+            ent->op_payload = g_malloc0(ent->req_payload_sz);
+
+            if (!ent->op_payload) {
+                for (size_t k = 0; k < j; k++) {
+                    g_free(rq->ent[k].op_payload);
+                }
+                g_free(rq->ent);
+                for (int k = 0; k < i; k++) {
+                    g_free(manager->ring_queues[k].ent);
+                }
+                g_free(manager->ring_queues);
+                g_free(manager);
+                return NULL;
+            }
+
+            ent->iov[0] = (struct iovec) {
+                &(ent->req_header),
+                sizeof(struct fuse_uring_req_header)
+            };
+            ent->iov[1] = (struct iovec) {
+                ent->op_payload,
+                ent->req_payload_sz
+            };
+
+            ent->fuse_cqe_handler.cb = fuse_uring_cqe_handler;
+        }
+    }
+
+    return manager;
+}
+
+static
+void fuse_distribute_ring_queues(FuseExport *exp, FuseRingQueueManager 
*manager)
+{
+    int queue_index = 0;
+
+    for (int i = 0; i < manager->num_ring_queues; i++) {
+        FuseRingQueue *rq = &manager->ring_queues[i];
+
+        rq->q = &exp->queues[queue_index];
+        QLIST_INSERT_HEAD(&(rq->q->ring_queue_list), rq, next);
+
+        queue_index = (queue_index + 1) % manager->num_fuse_queues;
+    }
+}
+
+static
+void fuse_schedule_ring_queue_registrations(FuseExport *exp,
+                                            FuseRingQueueManager *manager)
+{
+    for (int i = 0; i < manager->num_fuse_queues; i++) {
+        FuseQueue *q = &exp->queues[i];
+        FuseRingQueue *rq;
+
+        QLIST_FOREACH(rq, &q->ring_queue_list, next) {
+            for (int j = 0; j < exp->ring_queue_depth; j++) {
+                aio_bh_schedule_oneshot(q->ctx, fuse_uring_submit_register,
+                                        &(rq->ent[j]));
+            }
+        }
+    }
+}
+
+static void fuse_uring_start(FuseExport *exp, struct fuse_init_out *out)
+{
+    /*
+     * Since we didn't enable the FUSE_MAX_PAGES feature, the value of
+     * fc->max_pages should be FUSE_DEFAULT_MAX_PAGES_PER_REQ, which is set by
+     * the kernel by default. Also, max_write should not exceed
+     * FUSE_DEFAULT_MAX_PAGES_PER_REQ * PAGE_SIZE.
+     */
+    size_t bufsize = out->max_write + FUSE_BUFFER_HEADER_SIZE;
+
+    if (!(out->flags & FUSE_MAX_PAGES)) {
+        bufsize = FUSE_DEFAULT_MAX_PAGES_PER_REQ * qemu_real_host_page_size()
+                         + FUSE_BUFFER_HEADER_SIZE;
+    }
+
+    exp->ring_queue_manager = fuse_ring_queue_manager_create(
+        exp->num_queues, exp->ring_queue_depth, bufsize);
+
+    if (!exp->ring_queue_manager) {
+        error_report("Failed to create ring queue manager");
+        return;
+    }
+
+    /* Distribute ring queues across FUSE queues using round-robin */
+    fuse_distribute_ring_queues(exp, exp->ring_queue_manager);
+
+    fuse_schedule_ring_queue_registrations(exp, exp->ring_queue_manager);
+}
+#endif
+
 static int fuse_export_create(BlockExport *blk_exp,
                               BlockExportOptions *blk_exp_args,
                               AioContext *const *multithread,
@@ -270,6 +505,11 @@ static int fuse_export_create(BlockExport *blk_exp,
 
     assert(blk_exp_args->type == BLOCK_EXPORT_TYPE_FUSE);
 
+#ifdef CONFIG_LINUX_IO_URING
+    exp->is_uring = args->io_uring;
+    exp->ring_queue_depth = FUSE_DEFAULT_RING_QUEUE_DEPTH;
+#endif
+
     if (multithread) {
         /* Guaranteed by common export code */
         assert(mt_count >= 1);
@@ -283,6 +523,10 @@ static int fuse_export_create(BlockExport *blk_exp,
                 .exp = exp,
                 .ctx = multithread[i],
                 .fuse_fd = -1,
+#ifdef CONFIG_LINUX_IO_URING
+                .ring_queue_list =
+                    QLIST_HEAD_INITIALIZER(exp->queues[i].ring_queue_list),
+#endif
             };
         }
     } else {
@@ -296,6 +540,10 @@ static int fuse_export_create(BlockExport *blk_exp,
             .exp = exp,
             .ctx = exp->common.ctx,
             .fuse_fd = -1,
+#ifdef CONFIG_LINUX_IO_URING
+            .ring_queue_list =
+                QLIST_HEAD_INITIALIZER(exp->queues[0].ring_queue_list),
+#endif
         };
     }
 
@@ -685,17 +933,39 @@ static bool is_regular_file(const char *path, Error 
**errp)
  */
 static ssize_t coroutine_fn
 fuse_co_init(FuseExport *exp, struct fuse_init_out *out,
-             uint32_t max_readahead, uint32_t flags)
+             uint32_t max_readahead, const struct fuse_init_in *in)
 {
-    const uint32_t supported_flags = FUSE_ASYNC_READ | FUSE_ASYNC_DIO;
+    uint64_t supported_flags = FUSE_ASYNC_READ | FUSE_ASYNC_DIO
+                                     | FUSE_INIT_EXT;
+    uint64_t outargflags = 0;
+    uint64_t inargflags = in->flags;
+
+    ssize_t ret = 0;
+
+    if (inargflags & FUSE_INIT_EXT) {
+        inargflags = inargflags | (uint64_t) in->flags2 << 32;
+    }
+
+#ifdef CONFIG_LINUX_IO_URING
+    if (exp->is_uring) {
+        if (inargflags & FUSE_OVER_IO_URING) {
+            supported_flags |= FUSE_OVER_IO_URING;
+        } else {
+            exp->is_uring = false;
+            ret = -ENODEV;
+        }
+    }
+#endif
+
+    outargflags = inargflags & supported_flags;
 
     *out = (struct fuse_init_out) {
         .major = FUSE_KERNEL_VERSION,
         .minor = FUSE_KERNEL_MINOR_VERSION,
         .max_readahead = max_readahead,
         .max_write = FUSE_MAX_WRITE_BYTES,
-        .flags = flags & supported_flags,
-        .flags2 = 0,
+        .flags = outargflags,
+        .flags2 = outargflags >> 32,
 
         /* libfuse maximum: 2^16 - 1 */
         .max_background = UINT16_MAX,
@@ -717,7 +987,7 @@ fuse_co_init(FuseExport *exp, struct fuse_init_out *out,
         .map_alignment = 0,
     };
 
-    return sizeof(*out);
+    return ret < 0 ? ret : sizeof(*out);
 }
 
 /**
@@ -1506,6 +1776,14 @@ fuse_co_process_request(FuseQueue *q, void 
*spillover_buf)
         fuse_write_buf_response(q->fuse_fd, req_id, out_hdr,
                                 out_data_buffer, ret);
         qemu_vfree(out_data_buffer);
+#ifdef CONFIG_LINUX_IO_URING
+    /* Handle FUSE-over-io_uring initialization */
+    if (unlikely(opcode == FUSE_INIT && exp->is_uring)) {
+        struct fuse_init_out *out =
+            (struct fuse_init_out *)FUSE_OUT_OP_STRUCT(out_buf);
+        fuse_uring_start(exp, out);
+    }
+#endif
     } else {
         fuse_write_response(q->fuse_fd, req_id, out_hdr,
                             ret < 0 ? ret : 0,
diff --git a/docs/tools/qemu-storage-daemon.rst 
b/docs/tools/qemu-storage-daemon.rst
index 35ab2d7807..c5076101e0 100644
--- a/docs/tools/qemu-storage-daemon.rst
+++ b/docs/tools/qemu-storage-daemon.rst
@@ -78,7 +78,7 @@ Standard options:
 .. option:: --export 
[type=]nbd,id=<id>,node-name=<node-name>[,name=<export-name>][,writable=on|off][,bitmap=<name>]
   --export 
[type=]vhost-user-blk,id=<id>,node-name=<node-name>,addr.type=unix,addr.path=<socket-path>[,writable=on|off][,logical-block-size=<block-size>][,num-queues=<num-queues>]
   --export 
[type=]vhost-user-blk,id=<id>,node-name=<node-name>,addr.type=fd,addr.str=<fd>[,writable=on|off][,logical-block-size=<block-size>][,num-queues=<num-queues>]
-  --export 
[type=]fuse,id=<id>,node-name=<node-name>,mountpoint=<file>[,growable=on|off][,writable=on|off][,allow-other=on|off|auto]
+  --export 
[type=]fuse,id=<id>,node-name=<node-name>,mountpoint=<file>[,growable=on|off][,writable=on|off][,allow-other=on|off|auto][,io-uring=on|off]
   --export 
[type=]vduse-blk,id=<id>,node-name=<node-name>,name=<vduse-name>[,writable=on|off][,num-queues=<num-queues>][,queue-size=<queue-size>][,logical-block-size=<block-size>][,serial=<serial-number>]
 
   is a block export definition. ``node-name`` is the block node that should be
@@ -111,10 +111,11 @@ Standard options:
   that enabling this option as a non-root user requires enabling the
   user_allow_other option in the global fuse.conf configuration file.  Setting
   ``allow-other`` to auto (the default) will try enabling this option, and on
-  error fall back to disabling it.
-
-  The ``vduse-blk`` export type takes a ``name`` (must be unique across the 
host)
-  to create the VDUSE device.
+  error fall back to disabling it. Once ``io-uring`` is enabled (off by 
default),
+  the FUSE-over-io_uring-related settings will be initialized to bypass the
+  traditional /dev/fuse communication mechanism and instead use io_uring to
+  handle FUSE operations. The ``vduse-blk`` export type takes a ``name``
+  (must be unique across the host) to create the VDUSE device.
   ``num-queues`` sets the number of virtqueues (the default is 1).
   ``queue-size`` sets the virtqueue descriptor table size (the default is 256).
 
diff --git a/qapi/block-export.json b/qapi/block-export.json
index 9ae703ad01..37f2fc47e2 100644
--- a/qapi/block-export.json
+++ b/qapi/block-export.json
@@ -184,12 +184,15 @@
 #     mount the export with allow_other, and if that fails, try again
 #     without.  (since 6.1; default: auto)
 #
+# @io-uring: Use FUSE-over-io-uring.  (since 10.2; default: false)
+#
 # Since: 6.0
 ##
 { 'struct': 'BlockExportOptionsFuse',
   'data': { 'mountpoint': 'str',
             '*growable': 'bool',
-            '*allow-other': 'FuseExportAllowOther' },
+            '*allow-other': 'FuseExportAllowOther',
+            '*io-uring': 'bool' },
   'if': 'CONFIG_FUSE' }
 
 ##
diff --git a/storage-daemon/qemu-storage-daemon.c 
b/storage-daemon/qemu-storage-daemon.c
index eb72561358..0cd4cd2b58 100644
--- a/storage-daemon/qemu-storage-daemon.c
+++ b/storage-daemon/qemu-storage-daemon.c
@@ -107,6 +107,7 @@ static void help(void)
 #ifdef CONFIG_FUSE
 "  --export [type=]fuse,id=<id>,node-name=<node-name>,mountpoint=<file>\n"
 "           [,growable=on|off][,writable=on|off][,allow-other=on|off|auto]\n"
+"           [,io-uring=on|off]"
 "                         export the specified block node over FUSE\n"
 "\n"
 #endif /* CONFIG_FUSE */
diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c
index d2433d1d99..68d3fe8e01 100644
--- a/util/fdmon-io_uring.c
+++ b/util/fdmon-io_uring.c
@@ -452,10 +452,13 @@ static const FDMonOps fdmon_io_uring_ops = {
 void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
 {
     int ret;
+    int flags;
 
     ctx->io_uring_fd_tag = NULL;
+    flags = IORING_SETUP_SQE128;
 
-    ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
+    ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES,
+                            &ctx->fdmon_io_uring, flags);
     if (ret != 0) {
         error_setg_errno(errp, -ret, "Failed to initialize io_uring");
         return;
-- 
2.45.2


Reply via email to