commit:     389429d798a186bdbeb11354d7f1299f628851fd
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Apr  9 04:45:16 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Apr  9 06:01:27 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=389429d7

Scheduler: wakeup for empty merge queue (bug 711322)

Add a wakeup callback to schedule a new merge when the merge queue
becomes empty. This prevents the scheduler from hanging in cases
where the order of _merge_exit callback invocation may cause the
the merge queue to appear non-empty when it is about to become
empty.

Bug: https://bugs.gentoo.org/711322
Bug: https://bugs.gentoo.org/716636
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/_emerge/Scheduler.py           | 23 +++++++++++++++++++++++
 lib/_emerge/SequentialTaskQueue.py | 22 ++++++++++++----------
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index ee8f3dd5e..2c0483230 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -27,6 +27,7 @@ bad = create_color_func("BAD")
 from portage._sets import SETPREFIX
 from portage._sets.base import InternalPackageSet
 from portage.util import ensure_dirs, writemsg, writemsg_level
+from portage.util.futures import asyncio
 from portage.util.SlotObject import SlotObject
 from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.util._eventloop.EventLoop import EventLoop
@@ -241,6 +242,7 @@ class Scheduler(PollScheduler):
                self._completed_tasks = set()
                self._main_exit = None
                self._main_loadavg_handle = None
+               self._schedule_merge_wakeup_task = None
 
                self._failed_pkgs = []
                self._failed_pkgs_all = []
@@ -1440,6 +1442,9 @@ class Scheduler(PollScheduler):
                if self._job_delay_timeout_id is not None:
                        self._job_delay_timeout_id.cancel()
                        self._job_delay_timeout_id = None
+               if self._schedule_merge_wakeup_task is not None:
+                       self._schedule_merge_wakeup_task.cancel()
+                       self._schedule_merge_wakeup_task = None
 
        def _choose_pkg(self):
                """
@@ -1614,6 +1619,24 @@ class Scheduler(PollScheduler):
                        self._main_loadavg_handle = self._event_loop.call_later(
                                self._loadavg_latency, self._schedule)
 
+               # Failure to schedule *after* self._task_queues.merge becomes
+               # empty will cause the scheduler to hang as in bug 711322.
+               # Do not rely on scheduling which occurs via the _merge_exit
+               # method, since the order of callback invocation may cause
+               # self._task_queues.merge to appear non-empty when it is
+               # about to become empty.
+               if (self._task_queues.merge and 
(self._schedule_merge_wakeup_task is None
+                       or self._schedule_merge_wakeup_task.done())):
+                       self._schedule_merge_wakeup_task = 
asyncio.ensure_future(
+                               self._task_queues.merge.wait(), 
loop=self._event_loop)
+                       self._schedule_merge_wakeup_task.add_done_callback(
+                               self._schedule_merge_wakeup)
+
+       def _schedule_merge_wakeup(self, future):
+               if not future.cancelled():
+                       future.result()
+                       self._schedule()
+
        def _sigcont_handler(self, signum, frame):
                self._sigcont_time = time.time()
 

diff --git a/lib/_emerge/SequentialTaskQueue.py 
b/lib/_emerge/SequentialTaskQueue.py
index 656e5cf7c..d2551b1c6 100644
--- a/lib/_emerge/SequentialTaskQueue.py
+++ b/lib/_emerge/SequentialTaskQueue.py
@@ -1,9 +1,11 @@
-# Copyright 1999-2012 Gentoo Foundation
+# Copyright 1999-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from collections import deque
 import sys
 
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
 from portage.util.SlotObject import SlotObject
 
 class SequentialTaskQueue(SlotObject):
@@ -41,12 +43,6 @@ class SequentialTaskQueue(SlotObject):
                                cancelled = getattr(task, "cancelled", None)
                                if not cancelled:
                                        self.running_tasks.add(task)
-                                       # This callback will be invoked as soon 
as the task
-                                       # exits (before the future's done 
callback is called),
-                                       # and this is required in order for 
bool(self) to have
-                                       # an updated value for 
Scheduler._schedule to base
-                                       # assumptions upon. Delayed updates to 
bool(self) is
-                                       # what caused Scheduler to hang as in 
bug 711322.
                                        task.addExitListener(self._task_exit)
                                        task.start()
                finally:
@@ -73,12 +69,18 @@ class SequentialTaskQueue(SlotObject):
                for task in list(self.running_tasks):
                        task.cancel()
 
+       @coroutine
        def wait(self):
                """
-               Synchronously wait for all running tasks to exit.
+               Wait for the queue to become empty. This method is a coroutine.
                """
-               while self.running_tasks:
-                       next(iter(self.running_tasks)).wait()
+               while self:
+                       task = next(iter(self.running_tasks), None)
+                       if task is None:
+                               # Wait for self.running_tasks to populate.
+                               yield asyncio.sleep(0)
+                       else:
+                               yield task.async_wait()
 
        def __bool__(self):
                return bool(self._task_queue or self.running_tasks)

Reply via email to