Signed-off-by: Chuan Zheng <[email protected]>
---
migration/multifd.c | 4 ++--
migration/multifd.h | 2 ++
migration/rdma.c | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index ae0b7f0..919a414 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -176,7 +176,7 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
multifd_ops[method] = ops;
}
-static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
+int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg = {};
int ret;
@@ -503,7 +503,7 @@ int multifd_queue_page(QEMUFile *f, RAMBlock *block,
ram_addr_t offset)
return 1;
}
-static void multifd_send_terminate_threads(Error *err)
+void multifd_send_terminate_threads(Error *err)
{
int i;
diff --git a/migration/multifd.h b/migration/multifd.h
index b17a2c1..26d4489 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -184,6 +184,8 @@ typedef struct {
#ifdef CONFIG_RDMA
extern MultiFDSetup multifd_rdma_ops;
#endif
+void multifd_send_terminate_threads(Error *err);
+int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp);
int get_multifd_send_param(int id, MultiFDSendParams **param);
int get_multifd_recv_param(int id, MultiFDRecvParams **param);
MultiFDSetup *multifd_setup_ops_init(void);
diff --git a/migration/rdma.c b/migration/rdma.c
index 9654b87..cff9446 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -4261,9 +4261,54 @@ err:
g_free(rdma_return_path);
}
+static int multifd_channel_rdma_connect(void *opaque)
+{
+ MultiFDSendParams *p = opaque;
+ Error *local_err = NULL;
+ int ret = 0;
+ MigrationState *s = migrate_get_current();
+
+ p->rdma = qemu_rdma_data_init(s->host_port, &local_err);
+ if (p->rdma == NULL) {
+ goto out;
+ }
+
+ ret = qemu_rdma_source_init(p->rdma,
+ migrate_rdma_pin_all(),
+ &local_err);
+ if (ret) {
+ goto out;
+ }
+
+ ret = qemu_rdma_connect(p->rdma, &local_err);
+ if (ret) {
+ goto out;
+ }
+
+ p->file = qemu_fopen_rdma(p->rdma, "wb");
+ if (p->file == NULL) {
+ goto out;
+ }
+
+ p->c = QIO_CHANNEL(getQIOChannel(p->file));
+
+out:
+ if (local_err) {
+ trace_multifd_send_error(p->id);
+ }
+
+ return ret;
+}
+
static void *multifd_rdma_send_thread(void *opaque)
{
MultiFDSendParams *p = opaque;
+ Error *local_err = NULL;
+
+ trace_multifd_send_thread_start(p->id);
+ if (multifd_send_initial_packet(p, &local_err) < 0) {
+ goto out;
+ }
while (true) {
WITH_QEMU_LOCK_GUARD(&p->mutex) {
@@ -4274,6 +4319,12 @@ static void *multifd_rdma_send_thread(void *opaque)
qemu_sem_wait(&p->sem);
}
+out:
+ if (local_err) {
+ trace_multifd_send_error(p->id);
+ multifd_send_terminate_threads(local_err);
+ }
+
WITH_QEMU_LOCK_GUARD(&p->mutex) {
p->running = false;
}
@@ -4285,6 +4336,12 @@ static void
multifd_rdma_send_channel_setup(MultiFDSendParams *p)
{
Error *local_err = NULL;
+ if (multifd_channel_rdma_connect(p)) {
+ error_setg(&local_err, "multifd: rdma channel %d not established",
+ p->id);
+ return ;
+ }
+
if (p->quit) {
error_setg(&local_err, "multifd: send id %d already quit", p->id);
return ;
--
1.8.3.1