On Tue, Jan 16, 2024 at 12:05:57PM +0800, Peter Xu wrote:
> On Mon, Nov 27, 2023 at 05:25:55PM -0300, Fabiano Rosas wrote:
> > Allow multifd to open file-backed channels. This will be used when
> > enabling the fixed-ram migration stream format which expects a
> > seekable transport.
> >
> > The QIOChannel read and write methods will use the preadv/pwritev
> > versions which don't update the file offset at each call so we can
> > reuse the fd without re-opening for every channel.
> >
> > Note that this is just setup code and multifd cannot yet make use of
> > the file channels.
> >
> > Signed-off-by: Fabiano Rosas <[email protected]>
> > ---
> > - open multifd channels with O_WRONLY and no mode
> > - stop cancelling migration and propagate error via qio_task
> > ---
> > migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++--
> > migration/file.h | 5 +++++
> > migration/multifd.c | 14 +++++++++++--
> > migration/options.c | 7 +++++++
> > migration/options.h | 1 +
> > migration/qemu-file.h | 1 -
> > 6 files changed, 70 insertions(+), 5 deletions(-)
> >
> > diff --git a/migration/file.c b/migration/file.c
> > index 5d4975f43e..67d6f42da7 100644
> > --- a/migration/file.c
> > +++ b/migration/file.c
> > @@ -17,6 +17,10 @@
> >
> > #define OFFSET_OPTION ",offset="
> >
> > +static struct FileOutgoingArgs {
> > + char *fname;
> > +} outgoing_args;
> > +
> > /* Remove the offset option from @filespec and return it in @offsetp. */
> >
> > int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp)
> > @@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp,
> > Error **errp)
> > return 0;
> > }
> >
> > +static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque)
> > +{
> > + /* noop */
> > +}
> > +
> > +int file_send_channel_destroy(QIOChannel *ioc)
> > +{
> > + if (ioc) {
> > + qio_channel_close(ioc, NULL);
> > + object_unref(OBJECT(ioc));
> > + }
> > + g_free(outgoing_args.fname);
> > + outgoing_args.fname = NULL;
> > +
> > + return 0;
> > +}
> > +
> > +void file_send_channel_create(QIOTaskFunc f, void *data)
> > +{
> > + QIOChannelFile *ioc;
> > + QIOTask *task;
> > + Error *err = NULL;
> > + int flags = O_WRONLY;
> > +
> > + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err);
> > +
> > + task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL);
> > + if (!ioc) {
> > + qio_task_set_error(task, err);
> > + return;
> > + }
> > +
> > + qio_task_run_in_thread(task, qio_channel_file_connect_worker,
> > + (gpointer)data, NULL, NULL);
>
> This is pretty weird. This invokes a thread, but it'll run a noop. It
> seems meaningless to me.
>
> I assume you wanted to keep using the same async model as the socket typed
> multifd, but I don't think that works anyway, because file open blocks at
> qio_channel_file_new_path() so it's sync anyway.
>
> AFAICT we still share the code, as long as the file path properly invokes
> multifd_channel_connect() after the iochannel is setup.
>
> > +}
> > +
> > void file_start_outgoing_migration(MigrationState *s,
> > FileMigrationArgs *file_args, Error
> > **errp)
> > {
> > @@ -43,15 +83,18 @@ void file_start_outgoing_migration(MigrationState *s,
> > g_autofree char *filename = g_strdup(file_args->filename);
> > uint64_t offset = file_args->offset;
> > QIOChannel *ioc;
> > + int flags = O_CREAT | O_TRUNC | O_WRONLY;
> > + mode_t mode = 0660;
> >
> > trace_migration_file_outgoing(filename);
> >
> > - fioc = qio_channel_file_new_path(filename, O_CREAT | O_WRONLY |
> > O_TRUNC,
> > - 0600, errp);
> > + fioc = qio_channel_file_new_path(filename, flags, mode, errp);
> > if (!fioc) {
> > return;
> > }
> >
> > + outgoing_args.fname = g_strdup(filename);
> > +
> > ioc = QIO_CHANNEL(fioc);
> > if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
> > return;
> > diff --git a/migration/file.h b/migration/file.h
> > index 37d6a08bfc..511019b319 100644
> > --- a/migration/file.h
> > +++ b/migration/file.h
> > @@ -9,10 +9,15 @@
> > #define QEMU_MIGRATION_FILE_H
> >
> > #include "qapi/qapi-types-migration.h"
> > +#include "io/task.h"
> > +#include "channel.h"
> >
> > void file_start_incoming_migration(FileMigrationArgs *file_args, Error
> > **errp);
> >
> > void file_start_outgoing_migration(MigrationState *s,
> > FileMigrationArgs *file_args, Error
> > **errp);
> > int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp);
> > +
> > +void file_send_channel_create(QIOTaskFunc f, void *data);
> > +int file_send_channel_destroy(QIOChannel *ioc);
> > #endif
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 123ff0dec0..427740aab6 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -17,6 +17,7 @@
> > #include "exec/ramblock.h"
> > #include "qemu/error-report.h"
> > #include "qapi/error.h"
> > +#include "file.h"
> > #include "ram.h"
> > #include "migration.h"
> > #include "migration-stats.h"
> > @@ -28,6 +29,7 @@
> > #include "threadinfo.h"
> > #include "options.h"
> > #include "qemu/yank.h"
> > +#include "io/channel-file.h"
> > #include "io/channel-socket.h"
> > #include "yank_functions.h"
> >
> > @@ -511,7 +513,11 @@ static void multifd_send_terminate_threads(Error *err)
> >
> > static int multifd_send_channel_destroy(QIOChannel *send)
> > {
> > - return socket_send_channel_destroy(send);
> > + if (migrate_to_file()) {
> > + return file_send_channel_destroy(send);
> > + } else {
> > + return socket_send_channel_destroy(send);
> > + }
> > }
> >
> > void multifd_save_cleanup(void)
> > @@ -904,7 +910,11 @@ static void multifd_new_send_channel_async(QIOTask
> > *task, gpointer opaque)
> >
> > static void multifd_new_send_channel_create(gpointer opaque)
> > {
> > - socket_send_channel_create(multifd_new_send_channel_async, opaque);
> > + if (migrate_to_file()) {
> > + file_send_channel_create(multifd_new_send_channel_async, opaque);
> > + } else {
> > + socket_send_channel_create(multifd_new_send_channel_async, opaque);
> > + }
> > }
> >
> > int multifd_save_setup(Error **errp)
> > diff --git a/migration/options.c b/migration/options.c
> > index 10730b13ba..f671e24758 100644
> > --- a/migration/options.c
> > +++ b/migration/options.c
> > @@ -409,6 +409,13 @@ bool migrate_tls(void)
> > return s->parameters.tls_creds && *s->parameters.tls_creds;
> > }
> >
> > +bool migrate_to_file(void)
> > +{
> > + MigrationState *s = migrate_get_current();
> > +
> > + return qemu_file_is_seekable(s->to_dst_file);
> > +}
>
> Would this migrate_to_file() == migrate_multifd_packets()?
>
> Maybe we can keep using the other one and drop migrate_to_file?
Or perhaps the other way round; as migrate_to_file() is a migration global
helper, so applicable without multifd.
>
> > +
> > typedef enum WriteTrackingSupport {
> > WT_SUPPORT_UNKNOWN = 0,
> > WT_SUPPORT_ABSENT,
> > diff --git a/migration/options.h b/migration/options.h
> > index 8a19d6939c..84628a76e8 100644
> > --- a/migration/options.h
> > +++ b/migration/options.h
> > @@ -60,6 +60,7 @@ bool migrate_multifd_packets(void);
> > bool migrate_postcopy(void);
> > bool migrate_rdma(void);
> > bool migrate_tls(void);
> > +bool migrate_to_file(void);
> >
> > /* capabilities helpers */
> >
> > diff --git a/migration/qemu-file.h b/migration/qemu-file.h
> > index 32fd4a34fd..78ea21ab98 100644
> > --- a/migration/qemu-file.h
> > +++ b/migration/qemu-file.h
> > @@ -83,5 +83,4 @@ size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t
> > *buf, size_t buflen,
> > off_t pos);
> >
> > QIOChannel *qemu_file_get_ioc(QEMUFile *file);
> > -
> > #endif
> > --
> > 2.35.3
> >
>
> --
> Peter Xu
--
Peter Xu