On Mon, Nov 27, 2023 at 05:25:53PM -0300, Fabiano Rosas wrote:
> For the upcoming support to the new 'fixed-ram' migration stream
> format, we cannot use multifd packets because each write into the
> ramblock section in the migration file is expected to contain only the
> guest pages. They are written at their respective offsets relative to
> the ramblock section header.
> 
> There is no space for the packet information and the expected gains
> from the new approach come partly from being able to write the pages
> sequentially without extraneous data in between.
> 
> The new format also doesn't need the packets and all necessary
> information can be taken from the standard migration headers with some
> (future) changes to multifd code.
> 
> Use the presence of the fixed-ram capability to decide whether to send
> packets. For now this has no effect as fixed-ram cannot yet be enabled
> with multifd.
> 
> Signed-off-by: Fabiano Rosas <[email protected]>
> ---
> - moved more of the packet code under use_packets
> ---
>  migration/multifd.c | 138 +++++++++++++++++++++++++++-----------------
>  migration/options.c |   5 ++
>  migration/options.h |   1 +
>  3 files changed, 91 insertions(+), 53 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index ec58c58082..9625640d61 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -654,18 +654,22 @@ static void *multifd_send_thread(void *opaque)
>      Error *local_err = NULL;
>      int ret = 0;
>      bool use_zero_copy_send = migrate_zero_copy_send();
> +    bool use_packets = migrate_multifd_packets();
>  
>      thread = migration_threads_add(p->name, qemu_get_thread_id());
>  
>      trace_multifd_send_thread_start(p->id);
>      rcu_register_thread();
>  
> -    if (multifd_send_initial_packet(p, &local_err) < 0) {
> -        ret = -1;
> -        goto out;
> +    if (use_packets) {
> +        if (multifd_send_initial_packet(p, &local_err) < 0) {
> +            ret = -1;
> +            goto out;
> +        }
> +
> +        /* initial packet */
> +        p->num_packets = 1;
>      }
> -    /* initial packet */
> -    p->num_packets = 1;
>  
>      while (true) {
>          qemu_sem_post(&multifd_send_state->channels_ready);
> @@ -677,11 +681,10 @@ static void *multifd_send_thread(void *opaque)
>          qemu_mutex_lock(&p->mutex);
>  
>          if (p->pending_job) {
> -            uint64_t packet_num = p->packet_num;
>              uint32_t flags;
>              p->normal_num = 0;
>  
> -            if (use_zero_copy_send) {
> +            if (!use_packets || use_zero_copy_send) {
>                  p->iovs_num = 0;
>              } else {
>                  p->iovs_num = 1;
> @@ -699,16 +702,20 @@ static void *multifd_send_thread(void *opaque)
>                      break;
>                  }
>              }
> -            multifd_send_fill_packet(p);
> +
> +            if (use_packets) {
> +                multifd_send_fill_packet(p);
> +                p->num_packets++;
> +            }
> +
>              flags = p->flags;
>              p->flags = 0;
> -            p->num_packets++;
>              p->total_normal_pages += p->normal_num;
>              p->pages->num = 0;
>              p->pages->block = NULL;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
> +            trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
>                                 p->next_packet_size);
>  
>              if (use_zero_copy_send) {
> @@ -718,7 +725,7 @@ static void *multifd_send_thread(void *opaque)
>                  if (ret != 0) {
>                      break;
>                  }
> -            } else {
> +            } else if (use_packets) {
>                  /* Send header using the same writev call */
>                  p->iov[0].iov_len = p->packet_len;
>                  p->iov[0].iov_base = p->packet;
> @@ -904,6 +911,7 @@ int multifd_save_setup(Error **errp)
>  {
>      int thread_count;
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    bool use_packets = migrate_multifd_packets();
>      uint8_t i;
>  
>      if (!migrate_multifd()) {
> @@ -928,14 +936,20 @@ int multifd_save_setup(Error **errp)
>          p->pending_job = 0;
>          p->id = i;
>          p->pages = multifd_pages_init(page_count);
> -        p->packet_len = sizeof(MultiFDPacket_t)
> -                      + sizeof(uint64_t) * page_count;
> -        p->packet = g_malloc0(p->packet_len);
> -        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> -        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> +
> +        if (use_packets) {
> +            p->packet_len = sizeof(MultiFDPacket_t)
> +                          + sizeof(uint64_t) * page_count;
> +            p->packet = g_malloc0(p->packet_len);
> +            p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> +            p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> +
> +            /* We need one extra place for the packet header */
> +            p->iov = g_new0(struct iovec, page_count + 1);
> +        } else {
> +            p->iov = g_new0(struct iovec, page_count);
> +        }
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        /* We need one extra place for the packet header */
> -        p->iov = g_new0(struct iovec, page_count + 1);
>          p->normal = g_new0(ram_addr_t, page_count);
>          p->page_size = qemu_target_page_size();
>          p->page_count = page_count;
> @@ -1067,7 +1081,7 @@ void multifd_recv_sync_main(void)
>  {
>      int i;
>  
> -    if (!migrate_multifd()) {
> +    if (!migrate_multifd() || !migrate_multifd_packets()) {
>          return;
>      }
>      for (i = 0; i < migrate_multifd_channels(); i++) {

This noops the recv sync when use_packets=1, makes sense.

How about multifd_send_sync_main()?  Should we do the same?

> @@ -1094,38 +1108,44 @@ static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
>      Error *local_err = NULL;
> +    bool use_packets = migrate_multifd_packets();
>      int ret;
>  
>      trace_multifd_recv_thread_start(p->id);
>      rcu_register_thread();
>  
>      while (true) {
> -        uint32_t flags;
> +        uint32_t flags = 0;
> +        p->normal_num = 0;
>  
>          if (p->quit) {
>              break;
>          }
>  
> -        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> -                                       p->packet_len, &local_err);
> -        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> -            break;
> -        }
> +        if (use_packets) {
> +            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                           p->packet_len, &local_err);
> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> +                break;
> +            }
> +
> +            qemu_mutex_lock(&p->mutex);
> +            ret = multifd_recv_unfill_packet(p, &local_err);
> +            if (ret) {
> +                qemu_mutex_unlock(&p->mutex);
> +                break;
> +            }
> +            p->num_packets++;
> +
> +            flags = p->flags;
> +            /* recv methods don't know how to handle the SYNC flag */
> +            p->flags &= ~MULTIFD_FLAG_SYNC;
> +            trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
> +                               p->next_packet_size);
>  
> -        qemu_mutex_lock(&p->mutex);
> -        ret = multifd_recv_unfill_packet(p, &local_err);
> -        if (ret) {
> -            qemu_mutex_unlock(&p->mutex);
> -            break;
> +            p->total_normal_pages += p->normal_num;
>          }
>  
> -        flags = p->flags;
> -        /* recv methods don't know how to handle the SYNC flag */
> -        p->flags &= ~MULTIFD_FLAG_SYNC;
> -        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
> -                           p->next_packet_size);
> -        p->num_packets++;
> -        p->total_normal_pages += p->normal_num;
>          qemu_mutex_unlock(&p->mutex);
>  
>          if (p->normal_num) {
> @@ -1135,7 +1155,7 @@ static void *multifd_recv_thread(void *opaque)
>              }
>          }
>  
> -        if (flags & MULTIFD_FLAG_SYNC) {
> +        if (use_packets && (flags & MULTIFD_FLAG_SYNC)) {
>              qemu_sem_post(&multifd_recv_state->sem_sync);
>              qemu_sem_wait(&p->sem_sync);
>          }
> @@ -1159,6 +1179,7 @@ int multifd_load_setup(Error **errp)
>  {
>      int thread_count;
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    bool use_packets = migrate_multifd_packets();
>      uint8_t i;
>  
>      /*
> @@ -1183,9 +1204,12 @@ int multifd_load_setup(Error **errp)
>          qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->id = i;
> -        p->packet_len = sizeof(MultiFDPacket_t)
> -                      + sizeof(uint64_t) * page_count;
> -        p->packet = g_malloc0(p->packet_len);
> +
> +        if (use_packets) {
> +            p->packet_len = sizeof(MultiFDPacket_t)
> +                + sizeof(uint64_t) * page_count;
> +            p->packet = g_malloc0(p->packet_len);
> +        }
>          p->name = g_strdup_printf("multifdrecv_%d", i);
>          p->iov = g_new0(struct iovec, page_count);
>          p->normal = g_new0(ram_addr_t, page_count);
> @@ -1231,18 +1255,27 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error 
> **errp)
>  {
>      MultiFDRecvParams *p;
>      Error *local_err = NULL;
> -    int id;
> +    bool use_packets = migrate_multifd_packets();
> +    int id, num_packets = 0;
>  
> -    id = multifd_recv_initial_packet(ioc, &local_err);
> -    if (id < 0) {
> -        multifd_recv_terminate_threads(local_err);
> -        error_propagate_prepend(errp, local_err,
> -                                "failed to receive packet"
> -                                " via multifd channel %d: ",
> -                                qatomic_read(&multifd_recv_state->count));
> -        return;
> +    if (use_packets) {
> +        id = multifd_recv_initial_packet(ioc, &local_err);
> +        if (id < 0) {
> +            multifd_recv_terminate_threads(local_err);
> +            error_propagate_prepend(errp, local_err,
> +                                    "failed to receive packet"
> +                                    " via multifd channel %d: ",
> +                                    
> qatomic_read(&multifd_recv_state->count));
> +            return;
> +        }
> +        trace_multifd_recv_new_channel(id);
> +
> +        /* initial packet */
> +        num_packets = 1;
> +    } else {
> +        /* next patch gives this a meaningful value */
> +        id = 0;
>      }
> -    trace_multifd_recv_new_channel(id);
>  
>      p = &multifd_recv_state->params[id];
>      if (p->c != NULL) {
> @@ -1253,9 +1286,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error 
> **errp)
>          return;
>      }
>      p->c = ioc;
> +    p->num_packets = num_packets;
>      object_ref(OBJECT(ioc));
> -    /* initial packet */
> -    p->num_packets = 1;
>  
>      p->running = true;
>      qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> diff --git a/migration/options.c b/migration/options.c
> index 775428a8a5..10730b13ba 100644
> --- a/migration/options.c
> +++ b/migration/options.c
> @@ -385,6 +385,11 @@ bool migrate_multifd_flush_after_each_section(void)
>      return s->multifd_flush_after_each_section;
>  }
>  
> +bool migrate_multifd_packets(void)

Maybe multifd_use_packets()?  Dropping the migrate_ prefix as this is not a
global API but multifd-only.  Then if multifd_packets() reads too weird and
unclear, "add" makes it clear.

> +{
> +    return !migrate_fixed_ram();
> +}
> +
>  bool migrate_postcopy(void)
>  {
>      return migrate_postcopy_ram() || migrate_dirty_bitmaps();
> diff --git a/migration/options.h b/migration/options.h
> index 8680a10b79..8a19d6939c 100644
> --- a/migration/options.h
> +++ b/migration/options.h
> @@ -56,6 +56,7 @@ bool migrate_zero_copy_send(void);
>   */
>  
>  bool migrate_multifd_flush_after_each_section(void);
> +bool migrate_multifd_packets(void);
>  bool migrate_postcopy(void);
>  bool migrate_rdma(void);
>  bool migrate_tls(void);
> -- 
> 2.35.3
> 

-- 
Peter Xu


Reply via email to