On Tue, Sep 14, 2021 at 05:54:36AM +0000, Jiang Wang wrote:
> This patch supports dgram on vhost side, including
> tx and rx. The vhost send packets asynchronously.

how exactly is fairness ensured? what prevents one socket
from monopolizing the device? spec proposal seems to
leave it to the implementation.

> Also, ignore vq errors when vq number is larger than 2,
> so it will be comptaible with old versions.

this needs more explanation.

> Signed-off-by: Jiang Wang <[email protected]>



> ---
>  drivers/vhost/vsock.c | 217 ++++++++++++++++++++++++++++++++++++------
>  1 file changed, 189 insertions(+), 28 deletions(-)
> 
> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
> index c79789af0365..a8755cbebd40 100644
> --- a/drivers/vhost/vsock.c
> +++ b/drivers/vhost/vsock.c
> @@ -28,7 +28,10 @@
>   * small pkts.
>   */
>  #define VHOST_VSOCK_PKT_WEIGHT 256
> +#define VHOST_VSOCK_DGRM_MAX_PENDING_PKT 128
>  
> +/* Max wait time in busy poll in microseconds */
> +#define VHOST_VSOCK_BUSY_POLL_TIMEOUT 20

While adding busy polling isn't out of the question, I would think
it should be kept separate from initial dgram support.


>  enum {
>       VHOST_VSOCK_FEATURES = VHOST_FEATURES |
>                              (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
> @@ -46,7 +49,7 @@ static DEFINE_READ_MOSTLY_HASHTABLE(vhost_vsock_hash, 8);
>  
>  struct vhost_vsock {
>       struct vhost_dev dev;
> -     struct vhost_virtqueue vqs[2];
> +     struct vhost_virtqueue vqs[4];
>  
>       /* Link to global vhost_vsock_hash, writes use vhost_vsock_mutex */
>       struct hlist_node hash;
> @@ -55,6 +58,11 @@ struct vhost_vsock {
>       spinlock_t send_pkt_list_lock;
>       struct list_head send_pkt_list; /* host->guest pending packets */
>  
> +     spinlock_t dgram_send_pkt_list_lock;
> +     struct list_head dgram_send_pkt_list;   /* host->guest pending packets 
> */
> +     struct vhost_work dgram_send_pkt_work;
> +     int  dgram_used; /*pending packets to be send */

to be sent?

> +
>       atomic_t queued_replies;
>  
>       u32 guest_cid;
> @@ -92,10 +100,22 @@ static void
>  vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>                           struct vhost_virtqueue *vq)
>  {
> -     struct vhost_virtqueue *tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> +     struct vhost_virtqueue *tx_vq;
>       int pkts = 0, total_len = 0;
>       bool added = false;
>       bool restart_tx = false;
> +     spinlock_t *lock;
> +     struct list_head *send_pkt_list;
> +
> +     if (vq == &vsock->vqs[VSOCK_VQ_RX]) {
> +             tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> +             lock = &vsock->send_pkt_list_lock;
> +             send_pkt_list = &vsock->send_pkt_list;
> +     } else {
> +             tx_vq = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +             lock = &vsock->dgram_send_pkt_list_lock;
> +             send_pkt_list = &vsock->dgram_send_pkt_list;
> +     }
>  
>       mutex_lock(&vq->mutex);
>  
> @@ -116,36 +136,48 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>               size_t iov_len, payload_len;
>               int head;
>               bool restore_flag = false;
> +             bool is_dgram = false;
>  
> -             spin_lock_bh(&vsock->send_pkt_list_lock);
> -             if (list_empty(&vsock->send_pkt_list)) {
> -                     spin_unlock_bh(&vsock->send_pkt_list_lock);
> +             spin_lock_bh(lock);
> +             if (list_empty(send_pkt_list)) {
> +                     spin_unlock_bh(lock);
>                       vhost_enable_notify(&vsock->dev, vq);
>                       break;
>               }
>  
> -             pkt = list_first_entry(&vsock->send_pkt_list,
> +             pkt = list_first_entry(send_pkt_list,
>                                      struct virtio_vsock_pkt, list);
>               list_del_init(&pkt->list);
> -             spin_unlock_bh(&vsock->send_pkt_list_lock);
> +             spin_unlock_bh(lock);
> +
> +             if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM)
> +                     is_dgram = true;
>  
>               head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
>                                        &out, &in, NULL, NULL);
>               if (head < 0) {
> -                     spin_lock_bh(&vsock->send_pkt_list_lock);
> -                     list_add(&pkt->list, &vsock->send_pkt_list);
> -                     spin_unlock_bh(&vsock->send_pkt_list_lock);
> +                     spin_lock_bh(lock);
> +                     list_add(&pkt->list, send_pkt_list);
> +                     spin_unlock_bh(lock);
>                       break;
>               }
>  
>               if (head == vq->num) {
> -                     spin_lock_bh(&vsock->send_pkt_list_lock);
> -                     list_add(&pkt->list, &vsock->send_pkt_list);
> -                     spin_unlock_bh(&vsock->send_pkt_list_lock);
> +                     if (is_dgram) {
> +                             virtio_transport_free_pkt(pkt);
> +                             vq_err(vq, "Dgram virtqueue is full!");
> +                             spin_lock_bh(lock);
> +                             vsock->dgram_used--;
> +                             spin_unlock_bh(lock);
> +                             break;
> +                     }
> +                     spin_lock_bh(lock);
> +                     list_add(&pkt->list, send_pkt_list);
> +                     spin_unlock_bh(lock);
>  
>                       /* We cannot finish yet if more buffers snuck in while
> -                      * re-enabling notify.
> -                      */
> +                     * re-enabling notify.
> +                     */

looks like you are breaking alignment of comments...

>                       if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
>                               vhost_disable_notify(&vsock->dev, vq);
>                               continue;
> @@ -156,6 +188,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>               if (out) {
>                       virtio_transport_free_pkt(pkt);
>                       vq_err(vq, "Expected 0 output buffers, got %u\n", out);
> +                     if (is_dgram) {
> +                             spin_lock_bh(lock);
> +                             vsock->dgram_used--;
> +                             spin_unlock_bh(lock);
> +                     }
> +
>                       break;
>               }
>  
> @@ -163,6 +201,18 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>               if (iov_len < sizeof(pkt->hdr)) {
>                       virtio_transport_free_pkt(pkt);
>                       vq_err(vq, "Buffer len [%zu] too small\n", iov_len);
> +                     if (is_dgram) {
> +                             spin_lock_bh(lock);
> +                             vsock->dgram_used--;
> +                             spin_unlock_bh(lock);
> +                     }
> +                     break;
> +             }
> +
> +             if (iov_len < pkt->len - pkt->off &&
> +                     vq == &vsock->vqs[VSOCK_VQ_DGRAM_RX]) {
> +                     virtio_transport_free_pkt(pkt);
> +                     vq_err(vq, "Buffer len [%zu] too small for dgram\n", 
> iov_len);
>                       break;
>               }
>  
> @@ -199,6 +249,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>               if (nbytes != sizeof(pkt->hdr)) {
>                       virtio_transport_free_pkt(pkt);
>                       vq_err(vq, "Faulted on copying pkt hdr\n");
> +                     if (is_dgram) {
> +                             spin_lock_bh(lock);
> +                             vsock->dgram_used--;
> +                             spin_unlock_bh(lock);
> +                     }
>                       break;
>               }
>  
> @@ -224,19 +279,19 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>               /* If we didn't send all the payload we can requeue the packet
>                * to send it with the next available buffer.
>                */
> -             if (pkt->off < pkt->len) {
> +             if ((pkt->off < pkt->len)
> +                     && (vq == &vsock->vqs[VSOCK_VQ_RX])) {

&& on the previous line and no need for extra ().

>                       if (restore_flag)
>                               pkt->hdr.flags |= 
> cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
> -

no need to drop this imho.

>                       /* We are queueing the same virtio_vsock_pkt to handle
>                        * the remaining bytes, and we want to deliver it
>                        * to monitoring devices in the next iteration.
>                        */
>                       pkt->tap_delivered = false;
>  
> -                     spin_lock_bh(&vsock->send_pkt_list_lock);
> -                     list_add(&pkt->list, &vsock->send_pkt_list);
> -                     spin_unlock_bh(&vsock->send_pkt_list_lock);
> +                     spin_lock_bh(lock);
> +                     list_add(&pkt->list, send_pkt_list);
> +                     spin_unlock_bh(lock);
>               } else {
>                       if (pkt->reply) {
>                               int val;
> @@ -251,6 +306,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>                       }
>  
>                       virtio_transport_free_pkt(pkt);
> +                     if (is_dgram) {
> +                             spin_lock_bh(lock);
> +                             vsock->dgram_used--;
> +                             spin_unlock_bh(lock);
> +                     }
>               }
>       } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>       if (added)
> @@ -274,11 +334,25 @@ static void vhost_transport_send_pkt_work(struct 
> vhost_work *work)
>       vhost_transport_do_send_pkt(vsock, vq);
>  }
>  
> +static void vhost_transport_dgram_send_pkt_work(struct vhost_work *work)
> +{
> +     struct vhost_virtqueue *vq;
> +     struct vhost_vsock *vsock;
> +
> +     vsock = container_of(work, struct vhost_vsock, dgram_send_pkt_work);
> +     vq = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
> +
> +     vhost_transport_do_send_pkt(vsock, vq);
> +}
> +
>  static int
>  vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>  {
>       struct vhost_vsock *vsock;
>       int len = pkt->len;
> +     spinlock_t *lock;
> +     struct list_head *send_pkt_list;
> +     struct vhost_work *work;
>  
>       rcu_read_lock();
>  
> @@ -290,14 +364,39 @@ vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>               return -ENODEV;
>       }
>  
> +     if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM ||
> +          le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
> +             lock = &vsock->send_pkt_list_lock;
> +             send_pkt_list = &vsock->send_pkt_list;
> +             work = &vsock->send_pkt_work;
> +     } else if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +             lock = &vsock->dgram_send_pkt_list_lock;
> +             send_pkt_list = &vsock->dgram_send_pkt_list;
> +             work = &vsock->dgram_send_pkt_work;
> +     } else {
> +             rcu_read_unlock();
> +             virtio_transport_free_pkt(pkt);
> +             return -EINVAL;
> +     }
> +
> +
>       if (pkt->reply)
>               atomic_inc(&vsock->queued_replies);
>  
> -     spin_lock_bh(&vsock->send_pkt_list_lock);
> -     list_add_tail(&pkt->list, &vsock->send_pkt_list);
> -     spin_unlock_bh(&vsock->send_pkt_list_lock);
> +     spin_lock_bh(lock);
> +     if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +             if (vsock->dgram_used  == VHOST_VSOCK_DGRM_MAX_PENDING_PKT)
> +                     len = -ENOMEM;
> +             else {
> +                     vsock->dgram_used++;
> +                     list_add_tail(&pkt->list, send_pkt_list);
> +             }
> +     } else
> +             list_add_tail(&pkt->list, send_pkt_list);
>  
> -     vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> +     spin_unlock_bh(lock);
> +
> +     vhost_work_queue(&vsock->dev, work);
>  
>       rcu_read_unlock();
>       return len;
> @@ -487,6 +586,18 @@ static bool vhost_transport_seqpacket_allow(u32 
> remote_cid)
>       return seqpacket_allow;
>  }
>  
> +static inline unsigned long busy_clock(void)
> +{
> +     return local_clock() >> 10;
> +}
> +
> +static bool vhost_can_busy_poll(unsigned long endtime)
> +{
> +     return likely(!need_resched() && !time_after(busy_clock(), endtime) &&
> +                   !signal_pending(current));
> +}
> +
> +
>  static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  {
>       struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
> @@ -497,6 +608,8 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work 
> *work)
>       int head, pkts = 0, total_len = 0;
>       unsigned int out, in;
>       bool added = false;
> +     unsigned long busyloop_timeout = VHOST_VSOCK_BUSY_POLL_TIMEOUT;
> +     unsigned long endtime;
>  
>       mutex_lock(&vq->mutex);
>  
> @@ -506,11 +619,14 @@ static void vhost_vsock_handle_tx_kick(struct 
> vhost_work *work)
>       if (!vq_meta_prefetch(vq))
>               goto out;
>  
> +     endtime = busy_clock() + busyloop_timeout;
>       vhost_disable_notify(&vsock->dev, vq);
> +     preempt_disable();

That's quite a bit of work done with preempt disabled. Why?
What's going on here?

>       do {
>               u32 len;
>  
> -             if (!vhost_vsock_more_replies(vsock)) {
> +             if (vq == &vsock->vqs[VSOCK_VQ_TX]
> +                     && !vhost_vsock_more_replies(vsock)) {
>                       /* Stop tx until the device processes already
>                        * pending replies.  Leave tx virtqueue
>                        * callbacks disabled.
> @@ -524,6 +640,11 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work 
> *work)
>                       break;
>  
>               if (head == vq->num) {
> +                     if (vhost_can_busy_poll(endtime)) {
> +                             cpu_relax();
> +                             continue;
> +                     }
> +
>                       if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
>                               vhost_disable_notify(&vsock->dev, vq);
>                               continue;
> @@ -555,6 +676,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work 
> *work)
>               total_len += len;
>               added = true;
>       } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
> +     preempt_enable();
>  
>  no_more_replies:
>       if (added)
> @@ -593,14 +715,30 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>  
>               if (!vhost_vq_access_ok(vq)) {
>                       ret = -EFAULT;
> +                     /* when running with old guest and qemu, vq 2 may
> +                      * not exist, so return 0 in this case.

Can't the new stuff be gated by a feature bit as usual?

> +                      */
> +                     if (i == 2) {
> +                             mutex_unlock(&vq->mutex);
> +                             break;
> +                     }
>                       goto err_vq;
>               }
>  
>               if (!vhost_vq_get_backend(vq)) {
>                       vhost_vq_set_backend(vq, vsock);
>                       ret = vhost_vq_init_access(vq);
> -                     if (ret)
> -                             goto err_vq;
> +                     if (ret) {
> +                             mutex_unlock(&vq->mutex);
> +                             /* when running with old guest and qemu, vq 2 
> may
> +                              * not exist, so return 0 in this case.
> +                              */
> +                             if (i == 2) {
> +                                     mutex_unlock(&vq->mutex);
> +                                     break;
> +                             }
> +                             goto err;
> +                     }
>               }
>  
>               mutex_unlock(&vq->mutex);
> @@ -610,6 +748,7 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>        * let's kick the send worker to send them.
>        */
>       vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> +     vhost_work_queue(&vsock->dev, &vsock->dgram_send_pkt_work);
>  
>       mutex_unlock(&vsock->dev.mutex);
>       return 0;
> @@ -684,8 +823,14 @@ static int vhost_vsock_dev_open(struct inode *inode, 
> struct file *file)
>  
>       vqs[VSOCK_VQ_TX] = &vsock->vqs[VSOCK_VQ_TX];
>       vqs[VSOCK_VQ_RX] = &vsock->vqs[VSOCK_VQ_RX];
> +     vqs[VSOCK_VQ_DGRAM_TX] = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +     vqs[VSOCK_VQ_DGRAM_RX] = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
>       vsock->vqs[VSOCK_VQ_TX].handle_kick = vhost_vsock_handle_tx_kick;
>       vsock->vqs[VSOCK_VQ_RX].handle_kick = vhost_vsock_handle_rx_kick;
> +     vsock->vqs[VSOCK_VQ_DGRAM_TX].handle_kick =
> +                                             vhost_vsock_handle_tx_kick;
> +     vsock->vqs[VSOCK_VQ_DGRAM_RX].handle_kick =
> +                                             vhost_vsock_handle_rx_kick;
>  
>       vhost_dev_init(&vsock->dev, vqs, ARRAY_SIZE(vsock->vqs),
>                      UIO_MAXIOV, VHOST_VSOCK_PKT_WEIGHT,
> @@ -695,6 +840,11 @@ static int vhost_vsock_dev_open(struct inode *inode, 
> struct file *file)
>       spin_lock_init(&vsock->send_pkt_list_lock);
>       INIT_LIST_HEAD(&vsock->send_pkt_list);
>       vhost_work_init(&vsock->send_pkt_work, vhost_transport_send_pkt_work);
> +     spin_lock_init(&vsock->dgram_send_pkt_list_lock);
> +     INIT_LIST_HEAD(&vsock->dgram_send_pkt_list);
> +     vhost_work_init(&vsock->dgram_send_pkt_work,
> +                     vhost_transport_dgram_send_pkt_work);
> +
>       return 0;
>  
>  out:
> @@ -769,6 +919,17 @@ static int vhost_vsock_dev_release(struct inode *inode, 
> struct file *file)
>       }
>       spin_unlock_bh(&vsock->send_pkt_list_lock);
>  
> +     spin_lock_bh(&vsock->dgram_send_pkt_list_lock);
> +     while (!list_empty(&vsock->dgram_send_pkt_list)) {
> +             struct virtio_vsock_pkt *pkt;
> +
> +             pkt = list_first_entry(&vsock->dgram_send_pkt_list,
> +                             struct virtio_vsock_pkt, list);
> +             list_del_init(&pkt->list);
> +             virtio_transport_free_pkt(pkt);
> +     }
> +     spin_unlock_bh(&vsock->dgram_send_pkt_list_lock);
> +
>       vhost_dev_cleanup(&vsock->dev);
>       kfree(vsock->dev.vqs);
>       vhost_vsock_free(vsock);
> @@ -954,7 +1115,7 @@ static int __init vhost_vsock_init(void)
>       int ret;
>  
>       ret = vsock_core_register(&vhost_transport.transport,
> -                               VSOCK_TRANSPORT_F_H2G);
> +                               VSOCK_TRANSPORT_F_H2G | 
> VSOCK_TRANSPORT_F_DGRAM);
>       if (ret < 0)
>               return ret;
>       return misc_register(&vhost_vsock_misc);
> -- 
> 2.20.1

_______________________________________________
Virtualization mailing list
[email protected]
https://lists.linuxfoundation.org/mailman/listinfo/virtualization

Reply via email to