On 01.10.18 17:33, Vladimir Sementsov-Ogievskiy wrote: > 27.09.2018 21:35, Max Reitz wrote: >> On 07.08.18 19:43, Vladimir Sementsov-Ogievskiy wrote: >>> Start several async requests instead of read chunk by chunk. >>> >>> Signed-off-by: Vladimir Sementsov-Ogievskiy <[email protected]> >>> --- >>> block/qcow2.c | 208 >>> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- >>> 1 file changed, 204 insertions(+), 4 deletions(-) >>> >>> diff --git a/block/qcow2.c b/block/qcow2.c >>> index 5e7f2ee318..a0df8d4e50 100644 >>> --- a/block/qcow2.c >>> +++ b/block/qcow2.c >>> @@ -1869,6 +1869,197 @@ out: >>> return ret; >>> } >>> +typedef struct Qcow2WorkerTask { >>> + uint64_t file_cluster_offset; >>> + uint64_t offset; >>> + uint64_t bytes; >>> + uint64_t bytes_done; >>> +} Qcow2WorkerTask; >> Why don't you make this a union of request-specific structs? > > ok, will try > >> >>> + >>> +typedef int (*Qcow2DoWorkFunc)(BlockDriverState *bs, QEMUIOVector >>> *qiov, >>> + Qcow2WorkerTask *task); >>> + >>> +typedef struct Qcow2RWState { >>> + BlockDriverState *bs; >>> + QEMUIOVector *qiov; >>> + uint64_t bytes; >> Maybe make it total_bytes so it doesn't conflict with the value in >> Qcow2WorkerTask? > > ok > >> >>> + int ret; >>> + bool waiting_one; >>> + bool waiting_all; >>> + bool finalize; >>> + Coroutine *co; >>> + QSIMPLEQ_HEAD(, Qcow2Worker) free_workers; >>> + QSIMPLEQ_HEAD(, Qcow2Worker) busy_workers; >>> + int online_workers; >>> + Qcow2DoWorkFunc do_work_func; >>> +} Qcow2RWState; >>> + >>> +typedef struct Qcow2Worker { >>> + Qcow2RWState *rws; >>> + Coroutine *co; >>> + Qcow2WorkerTask task; >>> + bool busy; >>> + QSIMPLEQ_ENTRY(Qcow2Worker) entry; >>> +} Qcow2Worker; >>> +#define QCOW2_MAX_WORKERS 64 >> That's really a bit hidden here. I think it should go into the header. >> >> Also I'm not quite sure about the number. In other places we've always >> used 16. >> >> (With the encryption code always allocating a new bounce buffer, this >> can mean quite a bit of memory usage.) > > No doubts. > >> >>> + >>> +static coroutine_fn void qcow2_rw_worker(void *opaque); >>> +static Qcow2Worker *qcow2_new_worker(Qcow2RWState *rws) >>> +{ >>> + Qcow2Worker *w = g_new0(Qcow2Worker, 1); >>> + w->rws = rws; >>> + w->co = qemu_coroutine_create(qcow2_rw_worker, w); >>> + >>> + return w; >>> +} >>> + >>> +static void qcow2_free_worker(Qcow2Worker *w) >>> +{ >>> + g_free(w); >>> +} >>> + >>> +static coroutine_fn void qcow2_rw_worker(void *opaque) >>> +{ >>> + Qcow2Worker *w = opaque; >>> + Qcow2RWState *rws = w->rws; >>> + >>> + rws->online_workers++; >>> + >>> + while (!rws->finalize) { >>> + int ret = rws->do_work_func(rws->bs, rws->qiov, &w->task); >>> + if (ret < 0 && rws->ret == 0) { >>> + rws->ret = ret; >>> + } >>> + >>> + if (rws->waiting_all || rws->ret < 0) { >>> + break; >>> + } >>> + >>> + w->busy = false; >>> + QSIMPLEQ_REMOVE(&rws->busy_workers, w, Qcow2Worker, entry); >>> + QSIMPLEQ_INSERT_TAIL(&rws->free_workers, w, entry); >>> + if (rws->waiting_one) { >>> + rws->waiting_one = false; >>> + /* we must unset it here, to prevent queuing rws->co in >>> several >>> + * workers (it may happen if other worker already waits >>> us on mutex, >>> + * so it will be entered after our yield and before >>> rws->co enter) >>> + * >>> + * TODO: rethink this comment, as here (and in other >>> places in the >>> + * file) we moved from qemu_coroutine_add_next to >>> aio_co_wake. >>> + */ >>> + aio_co_wake(rws->co); >>> + } >>> + >>> + qemu_coroutine_yield(); >>> + } >>> + >>> + if (w->busy) { >>> + w->busy = false; >>> + QSIMPLEQ_REMOVE(&rws->busy_workers, w, Qcow2Worker, entry); >>> + } >>> + qcow2_free_worker(w); >>> + rws->online_workers--; >>> + >>> + if (rws->waiting_all && rws->online_workers == 0) { >>> + aio_co_wake(rws->co); >>> + } >>> +} >>> + >>> +static coroutine_fn void qcow2_rws_add_task(Qcow2RWState *rws, >>> + uint64_t >>> file_cluster_offset, >>> + uint64_t offset, >>> + uint64_t bytes, >>> + uint64_t bytes_done) >> I'd propose just taking a const Qcow2WorkerTask * here. (Makes even >> more sense if you make it a union.) > > ok, I'll try this way > >> >>> +{ >>> + Qcow2Worker *w; >>> + >>> + assert(rws->co == qemu_coroutine_self()); >>> + >>> + if (bytes_done == 0 && bytes == rws->bytes) { >>> + Qcow2WorkerTask task = { >>> + .file_cluster_offset = file_cluster_offset, >>> + .offset = offset, >>> + .bytes = bytes, >>> + .bytes_done = bytes_done >>> + }; >>> + rws->ret = rws->do_work_func(rws->bs, rws->qiov, &task); >> (If so, you'd just pass the pointer along here) >> >>> + return; >>> + } >> I like this fast path, but I think it deserves a small comment. (That >> is a fast path and bypasses the whole worker infrastructure.) >> >>> + >>> + if (!QSIMPLEQ_EMPTY(&rws->free_workers)) { >>> + w = QSIMPLEQ_FIRST(&rws->free_workers); >>> + QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry); >>> + } else if (rws->online_workers < QCOW2_MAX_WORKERS) { >>> + w = qcow2_new_worker(rws); >>> + } else { >>> + rws->waiting_one = true; >>> + qemu_coroutine_yield(); >>> + assert(!rws->waiting_one); /* already unset by worker */ >> Sometimes I hate coroutines. OK. So, how does the yield ensure that >> any worker is scheduled? Doesn't yield just give control to the parent? > > hm. I don't follow. All workers are busy - we sure, because there no > free workers, > and we can't create one more, second condition isn't satisfied too. > So, we give control to the parent. And only worker can wake us up.
Ah, I see. And then something at the bottom just continues to ppoll()
or whatever.
>> Right now I think it would be clearer to me if you'd just wake all busy
>> coroutines (looping over them) until one has settled.
>
> but all workers are busy, we should not touch them (they may be yielded
> in io operation)..
I would have assumed that if they are yielded in an I/O operation, they
could handle spurious wakeups. But I'm very likely wrong.
>> This would also save you the aio_co_wake() in the worker itself, as
>> they'd just have to yield in all cases.
>>
>>> +
>>> + w = QSIMPLEQ_FIRST(&rws->free_workers);
>>> + QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry);
>>> + }
>>> + w->busy = true;
>>> + QSIMPLEQ_INSERT_TAIL(&rws->busy_workers, w, entry);
>>> +
>>> + w->task.file_cluster_offset = file_cluster_offset;
>>> + w->task.offset = offset;
>>> + w->task.bytes = bytes;
>>> + w->task.bytes_done = bytes_done;
>> (And you'd copy it with w->task = *task here)
>>
>>> +
>>> + qemu_coroutine_enter(w->co);
>>> +}
>>> +
>>> +static void qcow2_init_rws(Qcow2RWState *rws, BlockDriverState *bs,
>>> + QEMUIOVector *qiov, uint64_t bytes,
>>> + Qcow2DoWorkFunc do_work_func)
>>> +{
>>> + memset(rws, 0, sizeof(*rws));
>>> + rws->bs = bs;
>>> + rws->qiov = qiov;
>>> + rws->bytes = bytes;
>>> + rws->co = qemu_coroutine_self();
>>> + rws->do_work_func = do_work_func;
>> Maybe you'd like to use
>>
>> *rws = (Qcow2RWState) {
>> .bs = bs,
>> ...
>> };
>>
>> Then you could save yourself the memset().
>
> ok
>
>>
>>> + QSIMPLEQ_INIT(&rws->free_workers);
>>> + QSIMPLEQ_INIT(&rws->busy_workers);
>>> +}
>>> +
>>> +static void qcow2_finalize_rws(Qcow2RWState *rws)
>>> +{
>>> + assert(rws->co == qemu_coroutine_self());
>>> +
>>> + /* kill waiting workers */
>>> + rws->finalize = true;
>>> + while (!QSIMPLEQ_EMPTY(&rws->free_workers)) {
>>> + Qcow2Worker *w = QSIMPLEQ_FIRST(&rws->free_workers);
>>> + QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry);
>>> + qemu_coroutine_enter(w->co);
>>> + }
>>> +
>>> + /* wait others */
>>> + if (rws->online_workers > 0) {
>>> + rws->waiting_all = true;
>>> + qemu_coroutine_yield();
>>> + rws->waiting_all = false;
>> Why don't you enter the busy workers here? (And keep doing so until
>> online_workers is 0.) That way, you could save yourself the other
>> aio_co_wake() in qcow2_rw_worker().
>
> We shouldn't enter busy workers, as they may yielded on io operation.
> The operation should complete.
Yes.
I think my misunderstanding was that I like to assume that everything
that yields checks whether I/O is done by itself, whereas in reality
that's probably usually done with some central polling and those
yielding coroutines assume they only wake up when that polling assures
them the I/O is done.
So I have no objections to the control flow now.
Max
signature.asc
Description: OpenPGP digital signature
