* Juan Quintela (quint...@redhat.com) wrote: > "Dr. David Alan Gilbert" <dgilb...@redhat.com> wrote: > > * Juan Quintela (quint...@redhat.com) wrote: > >> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap > >> synchronizations don't happen inside a ram section, so we are safe > >> about two channels trying to overwrite the same memory. > > > > OK, that's quite neat - so you don't need any extra flags in the stream > > to do the sync; it probably needs a comment in the code somewhere so we > > don't forget! > > Thanks. > > >> Signed-off-by: Juan Quintela <quint...@redhat.com> > >> --- > >> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++---- > >> migration/trace-events | 6 +++ > >> 2 files changed, 113 insertions(+), 11 deletions(-) > >> > >> diff --git a/migration/ram.c b/migration/ram.c > >> index c4c185cc4c..398cb0af3b 100644 > >> --- a/migration/ram.c > >> +++ b/migration/ram.c > >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void) > >> #define MULTIFD_MAGIC 0x11223344U > >> #define MULTIFD_VERSION 1 > >> > >> +#define MULTIFD_FLAG_SYNC (1 << 0) > >> + > >> typedef struct { > >> uint32_t magic; > >> uint32_t version; > >> @@ -471,6 +473,8 @@ typedef struct { > >> uint32_t num_packets; > >> /* pages sent through this channel */ > >> uint32_t num_pages; > >> + /* syncs main thread and channels */ > >> + QemuSemaphore sem_sync; > >> } MultiFDSendParams; > >> > >> typedef struct { > >> @@ -507,6 +511,8 @@ typedef struct { > >> uint32_t num_packets; > >> /* pages sent through this channel */ > >> uint32_t num_pages; > >> + /* syncs main thread and channels */ > >> + QemuSemaphore sem_sync; > >> } MultiFDRecvParams; > >> > >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > >> @@ -682,6 +688,10 @@ struct { > >> int count; > >> /* array of pages to sent */ > >> MultiFDPages_t *pages; > >> + /* syncs main thread and channels */ > >> + QemuSemaphore sem_sync; > >> + /* global number of generated multifd packets */ > >> + uint32_t seq; > > > > It's interesting you use the same comment for 'seq' in > > MultiFDSendParams - but I guess that means only this one is the global > > version and the others aren't really global number - they're just > > local to that thread? > > Only place that "increases/generates" seq is multifd_send_pages(), that > is what creates a new packet to be sent. So, if we see _any_ packet on > the wire, we know the real global ordering. They are only used for > traces, to se that packet 42 was sent through channel 3, and on > reception you check that packet 42 is what you received through channel > 3. They only appears on traces, but I find they useful for debugging > synchcronization errors.
Ah, and multifd_send_pages is the main thread, and it always operates on the multifd_send_state->seq and then passes it to the SendParams; OK. I'm not sure how to explain that better; but it's a little confusing. > >> + for (i = 0; i < migrate_multifd_channels(); i++) { > >> + MultiFDSendParams *p = &multifd_send_state->params[i]; > >> + > >> + trace_multifd_send_sync_main_signal(p->id); > >> + > >> + qemu_mutex_lock(&p->mutex); > >> + p->flags |= MULTIFD_FLAG_SYNC; > >> + p->pending_job++; > >> + qemu_mutex_unlock(&p->mutex); > >> + qemu_sem_post(&p->sem); > >> + } > > [1] > > >> + for (i = 0; i < migrate_multifd_channels(); i++) { > >> + MultiFDSendParams *p = &multifd_send_state->params[i]; > >> + > >> + trace_multifd_send_sync_main_wait(p->id); > >> + qemu_sem_wait(&multifd_send_state->sem_sync); > >> + } > > [2] > > >> + trace_multifd_send_sync_main(multifd_send_state->seq); > >> +} > >> + > > > > OK, so this just makes each of the sending threads ack, so that seems > > OK. > > But what happens with an error? multifd_send_sync_main exits it's > > loop with a 'break' if the writes fail, and that could mean they never > > come and post the flag-sync sem. > > Let's see. > > [1]: we are just doing mutex_lock/sem_post(), if we are not able to do > that, we have got a big race that needs to be fixed. So that bit is ok. > > [2]: We do an unconditional sem_wait(). Looking at the worker code. > In this patch level, we are ok, but I agree with you than in later > patches, we need to also do the post on the error case. Changing. K. > >> + > >> + trace_multifd_recv_sync_main_wait(p->id); > >> + qemu_sem_wait(&multifd_recv_state->sem_sync); > >> + qemu_mutex_lock(&p->mutex); > >> + if (multifd_recv_state->seq < p->seq) { > >> + multifd_recv_state->seq = p->seq; > >> + } > > > > Can you explain what this is for? > > Something like the latest received block? > > When we are at a synhronization point, we don't know on the main thread > when that synchronization happened (at what packet considered as a > logical list of packages). So, we choose 'seq' from the channel with > the highest number. That is the one that we want. We only use this > for tracing, so we can "match" that we did a synchronization on the send > side at packet N and we see the trace at reception side that we did it > at packet N also. OK, I think I see; again, this code is main thread, and it's going around all the subthreads; so it's updating the central copy seeing who has been received - OK. > Remember than in a previous patch you asked me what happened if this > does a wark around? At that point nothing. But now I need to change > this code to be. > > > multifd_recv_state->seq = 0; > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > ... > if (multifd_recv_state->seq < p->seq) { > multifd_recv_state->seq = p->seq; > } > > And I have fixed the workaround problem, no? Yes. Adding a note somewhat saying it's just for debug would help as well. > >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque) > >> trace_multifd_recv_thread_start(p->id); > >> > >> while (true) { > >> - qemu_sem_wait(&p->sem); > >> qemu_mutex_lock(&p->mutex); > >> - if (p->pending_job) { > >> + if (true || p->pending_job) { > > > > A TODO I guess??? > > Oops, that should be out. > > Fixed on next version. > > >> uint32_t used; > >> uint32_t flags; > >> qemu_mutex_unlock(&p->mutex); > >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque) > >> p->num_packets++; > >> p->num_pages += used; > >> qemu_mutex_unlock(&p->mutex); > >> + > >> + if (flags & MULTIFD_FLAG_SYNC) { > >> + qemu_sem_post(&multifd_recv_state->sem_sync); > >> + qemu_sem_wait(&p->sem_sync); > >> + } > > > > Can you explain the receive side logic - I think this is waiting for all > > receive threads to 'ack' - but how do we know that they've finished > > receiving all data that was sent? > > Because they need to receive a packet with MULTIFD_FLAG_SYNC sent. And > if they receive that flag, we know that is the last one of the sequence. > > synchrconization works like (2 channels to make things easy): > > main thread: > we finish a RAM_SECTION; > flush pending packets to one of the channels > send packet with MULTIFD_FLAG_SYNC for all the channels > wait unil all the channels have processesed the FLAG_SYNC > At this point send the RAM_SECTION_EOS footer. > > worker1 worker 2 > > if there is a pending packet, send it if there is a pending > packet, send it > (notice that there can't be more than one ever) > send a pacet with SYNC flag set send a pacet with SYNC > flag set > > On recetpion side > > > main thread > receives RAM_SECTION_EOS footer > wait for works to receive a sync > > worker1 worker1 > process any pending packet(no sync) process any pending > packet(no sync) > process packet with SYNC process packet with > SYNC > post main thread post main thread > > now main thread can continue > > Notice that we don't care what happens first, receiving packet with SYNC > in workeers or RAM_SECTION_EOS on main thread, all works as expected. > > Noticing how long took to explain this, I think that I am going to add > this to migration documentation. Will wait for any question you had > before adding it. Thanks; that I think makes sense. Dave > Later, Juan. -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK